Page MenuHomePhabricator

Implement S3 based logging for Airflow tasks on dse-k8s
Closed, ResolvedPublic

Description

We have decided to implement the S3 interface of our Ceph cluster for Airflow system and task logging.

We will need to refer to: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/logging/s3-task-handler.html

Event Timeline

BTullis triaged this task as Medium priority.
BTullis added a project: Data-Platform-SRE.
BTullis renamed this task from Implement S3 based logging for Airflow on dse-k8s to Implement S3 based logging for Airflow tasks on dse-k8s.Sep 6 2024, 9:13 AM

I've created an S3 user for the airflow-test-k8s app:

root@cephosd1001:~# radosgw-admin user create --uid=airflow-test-k8s --display-name=airflow-test-k8s
{
    "user_id": "airflow-test-k8s",
    "display_name": "airflow-test-k8s",
    "email": "",
    "suspended": 0,
    "max_buckets": 1000,
    "subusers": [],
    "keys": [
        {
            "user": "airflow-test-k8s",
            "access_key": "[REDACTED]",
            "secret_key": "[REDACTED]"
        }
    ],
    "swift_keys": [],
    "caps": [],
    "op_mask": "read, write, delete",
    "default_placement": "",
    "default_storage_class": "",
    "placement_tags": [],
    "bucket_quota": {
        "enabled": false,
        "check_on_raw": false,
        "max_size": -1,
        "max_size_kb": 0,
        "max_objects": -1
    },
    "user_quota": {
        "enabled": false,
        "check_on_raw": false,
        "max_size": -1,
        "max_size_kb": 0,
        "max_objects": -1
    },
    "temp_url_keys": [],
    "type": "rgw",
    "mfa_ids": []
}

Change #1071908 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow: store the connections.yaml content in a secret

https://gerrit.wikimedia.org/r/1071908

Change #1071909 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow: enable s3 logging

https://gerrit.wikimedia.org/r/1071909

Change #1071920 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/puppet@production] global_config: add the s3-eqiad-dpe external service

https://gerrit.wikimedia.org/r/1071920

Change #1071920 merged by Brouberol:

[operations/puppet@production] global_config: add the s3-eqiad-dpe external service

https://gerrit.wikimedia.org/r/1071920

Change #1071908 merged by Brouberol:

[operations/deployment-charts@master] airflow: store the connections.yaml content in a secret

https://gerrit.wikimedia.org/r/1071908

Change #1071909 merged by Brouberol:

[operations/deployment-charts@master] airflow: enable s3 logging

https://gerrit.wikimedia.org/r/1071909

Change #1072183 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow: fix the s3 logging integration

https://gerrit.wikimedia.org/r/1072183

I generated a working .s3cfg s3cmdconfig on `stat1008

[default]
access_key = [REDACTED]
access_token =
add_encoding_exts =
add_headers =
bucket_location = dpe
ca_certs_file =
cache_file =
check_ssl_certificate = True
check_ssl_hostname = True
cloudfront_host = cloudfront.amazonaws.com
connection_pooling = True
content_disposition =
content_type =
default_mime_type = binary/octet-stream
delay_updates = False
delete_after = False
delete_after_fetch = False
delete_removed = False
dry_run = False
enable_multipart = True
encoding = UTF-8
encrypt = False
expiry_date =
expiry_days =
expiry_prefix =
follow_symlinks = False
force = False
get_continue = False
gpg_command = None
gpg_decrypt = %(gpg_command)s -d --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s
gpg_encrypt = %(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s
gpg_passphrase =
guess_mime_type = True
host_base = rgw.eqiad.dpe.anycast.wmnet
host_bucket = n
human_readable_sizes = False
invalidate_default_index_on_cf = False
invalidate_default_index_root_on_cf = True
invalidate_on_cf = False
kms_key =
limit = -1
limitrate = 0
list_md5 = False
log_target_prefix =
long_listing = False
max_delete = -1
mime_type =
multipart_chunk_size_mb = 15
multipart_max_chunks = 10000
preserve_attrs = True
progress_meter = True
proxy_host =
proxy_port = 0
public_url_use_https = False
put_continue = False
recursive = False
recv_chunk = 65536
reduced_redundancy = False
requester_pays = False
restore_days = 1
restore_priority = Standard
secret_key = [REDACTED]
send_chunk = 65536
server_side_encryption = False
signature_v2 = False
signurl_use_https = False
simpledb_host = sdb.amazonaws.com
skip_existing = False
socket_timeout = 300
stats = False
stop_on_error = False
storage_class =
throttle_max = 100
upload_id =
urlencoding_mode = normal
use_http_expect = False
use_https = True
use_mime_magic = True
verbosity = WARNING
website_endpoint = http://%(bucket)s.s3-website-%(location)s.amazonaws.com/
website_error =
website_index = index.html

After which, I was able to create the s3://airflow bucket:

brouberol@stat1008:~$ s3cmd --bucket-location=":default-placement" mb s3://airflow
Bucket 's3://airflow/' created

Change #1072183 merged by Brouberol:

[operations/deployment-charts@master] airflow: fix the s3 logging integration

https://gerrit.wikimedia.org/r/1072183

We have confirmed that our airflow-test-k8s instance is now logging DAG ids to S3!

brouberol@stat1008:~$  s3cmd ls s3://airflow/k8s/test-k8s/logs/dag_id=addition/
                          DIR  s3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=manual__2024-09-11T11:52:12.128960+00:00/
                          DIR  s3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=manual__2024-09-11T11:59:04.808059+00:00/
                          DIR  s3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=scheduled__2024-09-11T11:00:00+00:00/
brouberol@stat1008:~$ s3cmd get s3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=manual__2024-09-11T11:52:12.128960+00:00/task_id=add_one_and_two/attempt=1.log attempt1.log
download: 's3://airflow/k8s/test-k8s/logs/dag_id=addition/run_id=manual__2024-09-11T11:52:12.128960+00:00/task_id=add_one_and_two/attempt=1.log' -> 'attempt1.log'  [1 of 1]
 2848 of 2848   100% in    0s   221.49 KB/s  done
brouberol@stat1008:~$ cat attempt1.log
[2024-09-11T11:52:12.848+0000] {local_task_job_runner.py:123} INFO - ::group::Pre task execution logs
[2024-09-11T11:52:12.892+0000] {taskinstance.py:2603} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: addition.add_one_and_two manual__2024-09-11T11:52:12.128960+00:00 [queued]>
[2024-09-11T11:52:12.902+0000] {taskinstance.py:2603} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: addition.add_one_and_two manual__2024-09-11T11:52:12.128960+00:00 [queued]>
[2024-09-11T11:52:12.902+0000] {taskinstance.py:2856} INFO - Starting attempt 1 of 2
[2024-09-11T11:52:12.915+0000] {taskinstance.py:2879} INFO - Executing <Task(PythonOperator): add_one_and_two> on 2024-09-11 11:52:12.128960+00:00
[2024-09-11T11:52:12.921+0000] {standard_task_runner.py:72} INFO - Started process 488 to run task
[2024-09-11T11:52:12.924+0000] {standard_task_runner.py:104} INFO - Running: ['airflow', 'tasks', 'run', 'addition', 'add_one_and_two', 'manual__2024-09-11T11:52:12.128960+00:00', '--job-id', '348', '--raw', '--subdir', 'DAGS_FOLDER/dags/addition_dag.py', '--cfg-path', '/tmp/tmpx557th5x']
[2024-09-11T11:52:12.925+0000] {standard_task_runner.py:105} INFO - Job 348: Subtask add_one_and_two
[2024-09-11T11:52:13.053+0000] {task_command.py:467} INFO - Running <TaskInstance: addition.add_one_and_two manual__2024-09-11T11:52:12.128960+00:00 [running]> on host 10.67.25.39
[2024-09-11T11:52:13.215+0000] {taskinstance.py:3122} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='addition' AIRFLOW_CTX_TASK_ID='add_one_and_two' AIRFLOW_CTX_EXECUTION_DATE='2024-09-11T11:52:12.128960+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-09-11T11:52:12.128960+00:00'
[2024-09-11T11:52:13.271+0000] {datajob.py:115} WARNING - cluster prod is not a valid environment type so Environment filter won't work.
[2024-09-11T11:52:13.456+0000] {taskinstance.py:731} INFO - ::endgroup::
[2024-09-11T11:52:13.466+0000] {python.py:240} INFO - Done. Returned value was: 3
[2024-09-11T11:52:13.540+0000] {taskinstance.py:340} INFO - ::group::Post task execution logs
[2024-09-11T11:52:13.540+0000] {taskinstance.py:352} INFO - Marking task as SUCCESS. dag_id=addition, task_id=add_one_and_two, run_id=manual__2024-09-11T11:52:12.128960+00:00, execution_date=20240911T115212, start_date=20240911T115212, end_date=20240911T115213
[2024-09-11T11:52:13.594+0000] {datajob.py:115} WARNING - cluster prod is not a valid environment type so Environment filter won't work.
[2024-09-11T11:52:13.738+0000] {local_task_job_runner.py:261} INFO - Task exited with return code 0
[2024-09-11T11:52:13.783+0000] {taskinstance.py:3891} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-09-11T11:52:13.784+0000] {local_task_job_runner.py:240} INFO - ::endgroup::