We have decided to implement the S3 interface of our Ceph cluster for Airflow system and task logging.
We will need to refer to: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/s3-task-handler.html
| BTullis | |
| Aug 19 2024, 3:11 PM |
| F57499444: image.png | |
| Sep 11 2024, 12:20 PM |
We have decided to implement the S3 interface of our Ceph cluster for Airflow system and task logging.
We will need to refer to: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/s3-task-handler.html
| Status | Subtype | Assigned | Task | ||
|---|---|---|---|---|---|
| Resolved | Gehel | T327267 Create a DSE Kubernetes cluster with support for persistent storage from Ceph | |||
| Resolved | Gehel | T324660 Install Ceph Cluster for Data Platform Engineering | |||
| Resolved | brouberol | T362788 Migrate Airflow to the dse-k8s cluster | |||
| Resolved | BTullis | T330152 Deploy ceph radosgw processes to data-engineering cluster | |||
| Resolved | BTullis | T330153 Configure Anycast load-balancing ceph radosgw services on the data-engineering cluster | |||
| Resolved | BTullis | T369634 Decide how to do DAG logging on dse-k8s | |||
| Resolved | brouberol | T372787 Implement S3 based logging for Airflow tasks on dse-k8s |
I've created an S3 user for the airflow-test-k8s app:
root@cephosd1001:~# radosgw-admin user create --uid=airflow-test-k8s --display-name=airflow-test-k8s
{
"user_id": "airflow-test-k8s",
"display_name": "airflow-test-k8s",
"email": "",
"suspended": 0,
"max_buckets": 1000,
"subusers": [],
"keys": [
{
"user": "airflow-test-k8s",
"access_key": "[REDACTED]",
"secret_key": "[REDACTED]"
}
],
"swift_keys": [],
"caps": [],
"op_mask": "read, write, delete",
"default_placement": "",
"default_storage_class": "",
"placement_tags": [],
"bucket_quota": {
"enabled": false,
"check_on_raw": false,
"max_size": -1,
"max_size_kb": 0,
"max_objects": -1
},
"user_quota": {
"enabled": false,
"check_on_raw": false,
"max_size": -1,
"max_size_kb": 0,
"max_objects": -1
},
"temp_url_keys": [],
"type": "rgw",
"mfa_ids": []
}brouberol opened https://gitlab.wikimedia.org/repos/data-engineering/airflow/-/merge_requests/16
Install the amazon provider to be able to store DAG logs in S3
Change #1071908 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/deployment-charts@master] airflow: store the connections.yaml content in a secret
Change #1071909 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/deployment-charts@master] airflow: enable s3 logging
brouberol merged https://gitlab.wikimedia.org/repos/data-engineering/airflow/-/merge_requests/16
Install the amazon provider to be able to store DAG logs in S3
Change #1071920 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/puppet@production] global_config: add the s3-eqiad-dpe external service
Change #1071920 merged by Brouberol:
[operations/puppet@production] global_config: add the s3-eqiad-dpe external service
Change #1071908 merged by Brouberol:
[operations/deployment-charts@master] airflow: store the connections.yaml content in a secret
Change #1071909 merged by Brouberol:
[operations/deployment-charts@master] airflow: enable s3 logging
Change #1072183 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/deployment-charts@master] airflow: fix the s3 logging integration
I generated a working .s3cfg s3cmdconfig on `stat1008
[default] access_key = [REDACTED] access_token = add_encoding_exts = add_headers = bucket_location = dpe ca_certs_file = cache_file = check_ssl_certificate = True check_ssl_hostname = True cloudfront_host = cloudfront.amazonaws.com connection_pooling = True content_disposition = content_type = default_mime_type = binary/octet-stream delay_updates = False delete_after = False delete_after_fetch = False delete_removed = False dry_run = False enable_multipart = True encoding = UTF-8 encrypt = False expiry_date = expiry_days = expiry_prefix = follow_symlinks = False force = False get_continue = False gpg_command = None gpg_decrypt = %(gpg_command)s -d --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s gpg_encrypt = %(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s gpg_passphrase = guess_mime_type = True host_base = rgw.eqiad.dpe.anycast.wmnet host_bucket = n human_readable_sizes = False invalidate_default_index_on_cf = False invalidate_default_index_root_on_cf = True invalidate_on_cf = False kms_key = limit = -1 limitrate = 0 list_md5 = False log_target_prefix = long_listing = False max_delete = -1 mime_type = multipart_chunk_size_mb = 15 multipart_max_chunks = 10000 preserve_attrs = True progress_meter = True proxy_host = proxy_port = 0 public_url_use_https = False put_continue = False recursive = False recv_chunk = 65536 reduced_redundancy = False requester_pays = False restore_days = 1 restore_priority = Standard secret_key = [REDACTED] send_chunk = 65536 server_side_encryption = False signature_v2 = False signurl_use_https = False simpledb_host = sdb.amazonaws.com skip_existing = False socket_timeout = 300 stats = False stop_on_error = False storage_class = throttle_max = 100 upload_id = urlencoding_mode = normal use_http_expect = False use_https = True use_mime_magic = True verbosity = WARNING website_endpoint = http://%(bucket)s.s3-website-%(location)s.amazonaws.com/ website_error = website_index = index.html
After which, I was able to create the s3://airflow bucket:
brouberol@stat1008:~$ s3cmd --bucket-location=":default-placement" mb s3://airflow Bucket 's3://airflow/' created
Change #1072183 merged by Brouberol:
[operations/deployment-charts@master] airflow: fix the s3 logging integration
We have confirmed that our airflow-test-k8s instance is now logging DAG ids to S3!
brouberol@stat1008:~$ s3cmd ls s3://airflow/k8s/test-k8s/logs/dag_id=addition/
DIR s3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=manual__2024-09-11T11:52:12.128960+00:00/
DIR s3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=manual__2024-09-11T11:59:04.808059+00:00/
DIR s3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=scheduled__2024-09-11T11:00:00+00:00/
brouberol@stat1008:~$ s3cmd get s3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=manual__2024-09-11T11:52:12.128960+00:00/task_id=add_one_and_two/attempt=1.log attempt1.log
download: 's3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=manual__2024-09-11T11:52:12.128960+00:00/task_id=add_one_and_two/attempt=1.log' -> 'attempt1.log' [1 of 1]
2848 of 2848 100% in 0s 221.49 KB/s done
brouberol@stat1008:~$ cat attempt1.log
[2024-09-11T11:52:12.848+0000] {local_task_job_runner.py:123} INFO - ::group::Pre task execution logs
[2024-09-11T11:52:12.892+0000] {taskinstance.py:2603} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: addition.add_one_and_two manual__2024-09-11T11:52:12.128960+00:00 [queued]>
[2024-09-11T11:52:12.902+0000] {taskinstance.py:2603} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: addition.add_one_and_two manual__2024-09-11T11:52:12.128960+00:00 [queued]>
[2024-09-11T11:52:12.902+0000] {taskinstance.py:2856} INFO - Starting attempt 1 of 2
[2024-09-11T11:52:12.915+0000] {taskinstance.py:2879} INFO - Executing <Task(PythonOperator): add_one_and_two> on 2024-09-11 11:52:12.128960+00:00
[2024-09-11T11:52:12.921+0000] {standard_task_runner.py:72} INFO - Started process 488 to run task
[2024-09-11T11:52:12.924+0000] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', 'addition', 'add_one_and_two', 'manual__2024-09-11T11:52:12.128960+00:00', '--job-id', '348', '--raw', '--subdir', 'DAGS_FOLDER/dags/addition_dag.py', '--cfg-path', '/tmp/tmpx557th5x']
[2024-09-11T11:52:12.925+0000] {standard_task_runner.py:105} INFO - Job 348: Subtask add_one_and_two
[2024-09-11T11:52:13.053+0000] {task_command.py:467} INFO - Running <TaskInstance: addition.add_one_and_two manual__2024-09-11T11:52:12.128960+00:00 [running]> on host 10.67.25.39
[2024-09-11T11:52:13.215+0000] {taskinstance.py:3122} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='addition' AIRFLOW_CTX_TASK_ID='add_one_and_two' AIRFLOW_CTX_EXECUTION_DATE='2024-09-11T11:52:12.128960+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-09-11T11:52:12.128960+00:00'
[2024-09-11T11:52:13.271+0000] {datajob.py:115} WARNING - cluster prod is not a valid environment type so Environment filter won't work.
[2024-09-11T11:52:13.456+0000] {taskinstance.py:731} INFO - ::endgroup::
[2024-09-11T11:52:13.466+0000] {python.py:240} INFO - Done. Returned value was: 3
[2024-09-11T11:52:13.540+0000] {taskinstance.py:340} INFO - ::group::Post task execution logs
[2024-09-11T11:52:13.540+0000] {taskinstance.py:352} INFO - Marking task as SUCCESS. dag_id=addition, task_id=add_one_and_two, run_id=manual__2024-09-11T11:52:12.128960+00:00, execution_date=20240911T115212, start_date=20240911T115212, end_date=20240911T115213
[2024-09-11T11:52:13.594+0000] {datajob.py:115} WARNING - cluster prod is not a valid environment type so Environment filter won't work.
[2024-09-11T11:52:13.738+0000] {local_task_job_runner.py:261} INFO - Task exited with return code 0
[2024-09-11T11:52:13.783+0000] {taskinstance.py:3891} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-09-11T11:52:13.784+0000] {local_task_job_runner.py:240} INFO - ::endgroup::