Page MenuHomePhabricator

Data Quality Issue: Wikitext History Job fail / rerun in Airflow
Open, HighPublic8 Estimated Story Points

Description

Implementation Details:

As Skein ignores the setting of retries=0 the plan is as follows:

Short Term:

Long Term:

  • Migrate Airflow to k8s so we can use Docker and move away from Skein

The Wikitext History job failed 4 times and succeeded the 5th, causing data duplication in the 2023-05 snapshot:

$ hdfs dfs -du -h /wmf/data/wmf/mediawiki/wikitext/history/snapshot=2023-0* | grep ‘=enwiki$’

11.2 T  /wmf/data/wmf/mediawiki/wikitext/history/snapshot=2023-04/wiki_db=enwiki
44.7 T  /wmf/data/wmf/mediawiki/wikitext/history/snapshot=2023-05/wiki_db=enwiki
11.3 T  /wmf/data/wmf/mediawiki/wikitext/history/snapshot=2023-06/wiki_db=enwiki

We should do one or more of the following:

  • audit Airflow tasks for what can be safely rerun, and set retry to 0 for everything else
  • add Data Quality checks to everything

Details

TitleReferenceAuthorSource BranchDest Branch
Don't retry convert_history_xml_to_parquet.repos/data-engineering/airflow-dags!595xcollazodont-retry-mediawiki-wikitext-historymain
Customize query in GitLab

Event Timeline

Spark job script, for reference:

script: spark3-submit --driver-cores 2 --conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3
  --conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf --master yarn --conf
  spark.dynamicAllocation.maxExecutors=32 --conf spark.hadoop.io.compression.codecs=org.wikimedia.analytics.refinery.spark.bzip2.CorrectedBZip2Codec
  --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf --conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3
  --executor-cores 4 --executor-memory 32G --driver-memory 8G --keytab analytics.keytab
  --principal analytics/an-launcher1002.eqiad.wmnet@WIKIMEDIA --name mediawiki_wikitext_history__convert_history_xml_to_parquet__20230501
  --class org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiXMLDumpsConverter
  --queue production --deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.11-shaded.jar
  --xml_dumps_base_path hdfs://analytics-hadoop/wmf/data/raw/mediawiki/dumps/pages_meta_history/20230601
  --output_base_path hdfs://analytics-hadoop/wmf/data/wmf/mediawiki/wikitext/history/snapshot=2023-05
  --max_parallel_jobs 128 --output_format avro

Attempt #1:

Skein job ran from:

[2023-06-17, 13:11:57 UTC] {skein.py:237} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher mediawiki_wikitext_history__convert_history_xml_to_parquet__20230501 application_1686833367123_9750 status: RUNNING - Waiting until finished.
...
[2023-06-20, 19:53:02 UTC] {skein.py:237} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher mediawiki_wikitext_history__convert_history_xml_to_parquet__20230501 application_1686833367123_9750 status: RUNNING - Waiting until finished.

Then failed with:

[2023-06-20, 19:53:10 UTC] {local_task_job.py:112} ERROR - Received SIGTERM. Terminating subprocesses
[2023-06-20, 19:53:10 UTC] {taskinstance.py:1479} ERROR - Received SIGTERM. Terminating subprocesses.
[2023-06-20, 19:53:10 UTC] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 2872. PIDs of all processes in the group: [3103, 2872]
[2023-06-20, 19:53:10 UTC] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 2872
[2023-06-20, 19:53:10 UTC] {taskinstance.py:1479} ERROR - Received SIGTERM. Terminating subprocesses.
[2023-06-20, 19:53:10 UTC] {local_task_job.py:112} ERROR - Received SIGTERM. Terminating subprocesses
[2023-06-20, 19:53:10 UTC] {process_utils.py:129} INFO - Sending Signals.SIGTERM to group 2872. PIDs of all processes in the group: [3103, 2872]
[2023-06-20, 19:53:10 UTC] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 2872
[2023-06-20, 19:53:11 UTC] {taskinstance.py:1479} ERROR - Received SIGTERM. Terminating subprocesses.
[2023-06-20, 19:53:11 UTC] {taskinstance.py:1768} ERROR - Task failed with exception
...
  File "/usr/lib/airflow/lib/python3.10/site-packages/skein/core.py", line 280, in _call
    raise ConnectionError("Unable to connect to %s" % self._server_name)
skein.exceptions.ConnectionError: Unable to connect to driver

So this one likely wrote much of the output given it ran for 3+ days.

Attempt #2:

[2023-06-20, 19:58:14 UTC]
...
[2023-06-20, 21:45:07 UTC]

With similar stack:

  File "/usr/lib/airflow/lib/python3.10/site-packages/skein/core.py", line 280, in _call
    raise ConnectionError("Unable to connect to %s" % self._server_name)
skein.exceptions.ConnectionError: Unable to connect to driver
[2023-06-20, 21:45:07 UTC] {local_task_job.py:112} ERROR - Received SIGTERM. Terminating subprocesses

So a quick failure (~2 hours).

Attempt #3:

[2023-06-20, 21:45:15 UTC]
...
[2023-06-21, 09:45:17 UTC]

So ~12 hours of runtime. Same stack.

Attempt #4:

[2023-06-21, 09:50:25 UTC]
...
[2023-06-21, 10:14:37 UTC]

Died quick, in ~25 mins. Same stack, but this time noticed the usual telltale of a Yarn container OOM:

[2023-06-21, 10:14:45 UTC] {local_task_job.py:208} INFO - Task exited with return code 143

Attempt #5:

Successful, with:

[2023-06-21, 10:14:53 UTC]
...
[2023-06-24, 19:14:52 UTC]

So 3+ days of runtime, which tracks.

Yarn had purged #1 from logs:

xcollazo@an-launcher1002:~$ sudo -u analytics yarn logs -appOwner analytics -applicationId application_1686833367123_9750
Unable to get ApplicationState. Attempting to fetch logs directly from the filesystem.
File /var/log/hadoop-yarn/apps/analytics/logs/application_1686833367123_9750 does not exist.

Can not find any log file matching the pattern: [ALL] for the application: application_1686833367123_9750
Can not find the logs for the application: application_1686833367123_9750 with the appOwner: analytics

Same with #2, #3, #4.. and #5! This is concerning because, as we can see, sometimes we need to look back into logs.

Additionally, Airflow logs for these 3 day runs are ~5MBs of mostly heartbeats.

So as side job to make debugging easier, I think we should:

  1. Make Yarn logs have a longer retention period. Perhaps 3 months?
  2. Make Airflow Spark Skein jobs heartbeat less frequently.

Will open a separate task to pursue these.

Unfortunately though, this means that zeroing out to a root cause is impossible. I speculate however that the job was OOMing, but hard to tell if it was the executors or the driver.

Forwarding learnings from T343238#9060658 to this ticket: SIGTERMs on the Airflow instance only kill the Airflow process, with no (current) mechanism to forward the kill to the Spark/Skein job. So the tasks get marked as UP_FOR_RESCHEDULE while the underlying Spark jobs continue to execute and likely commit.

Perhaps we should catch the SIGTERM and do a best effort to forward it to Skein/Spark?

The code that does the conversion from dumps to the table, MediawikiXMLDumpsConverter, does the right thing:

// Make a dataframe using defined schema and write it
spark.
    createDataFrame(wikitextRows, wikiTextStructure).
    repartition(config.numberOutputPartitions).
    write.
    mode(SaveMode.Overwrite).   <<<<<<<<<<<<<<<
    format(config.outputFormat).
    save(config.outputPath)

Thus thus Spark save() would first remove all data and then add its own.

So my speculation with the behavior we found is as follows:

  1. All job attempts above, individually, did the right thing: before writing started, they deleted all data files from the output folder.
  2. However, let's take a look at the attempt start dates:
[2023-06-17, 13:11:57 UTC] {taskinstance.py:1280} INFO - Starting attempt 1 of 6
[2023-06-20, 19:58:13 UTC] {taskinstance.py:1280} INFO - Starting attempt 2 of 6
[2023-06-20, 21:45:15 UTC] {taskinstance.py:1280} INFO - Starting attempt 3 of 6
[2023-06-21, 09:50:24 UTC] {taskinstance.py:1280} INFO - Starting attempt 4 of 6
[2023-06-21T10:14:53.437+0000] {taskinstance.py:1280} INFO - Starting attempt 5 of 6
  1. Considering that this job takes 3+ days to finish, notice how all these jobs, which ran concurrently, would have overlapped.
  2. Thus, even though a later attempt would have "deleted all previous files", the files were still being written to the folder by a previous, still running, attempt.

This is all speculation, but I think it tracks.

Some options to avoid this behavior:
a) Move this code to do INSERT OVERWRITE on a table, rather than writing to a file path.
b) Move this code to do a DELETE and INSERT on an Iceberg table.

For some reason I strongly prefer (b) :D

@VirginiaPoundstone this issue came up again (thanks very much to @xcollazo who remembered this task). I support option b) in Xabriel's plan above, and I think this should be triaged with high importance as a production issue. This table is used by lots of people and it seems to me it'll keep failing. If the folks looking into it don't remember this, it's a lot of time wasted.

(+1 to prioritize this work.)

mforns subscribed.

Since we'd rather have a failed job than bad data, as a stop gap measure, we have merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/595, which is a one liner that doesn't let this job retry in the event of failure.

lbowmaker edited projects, added Data-Engineering (Sprint 8); removed Data-Engineering.
lbowmaker set the point value for this task to 8.
lbowmaker claimed this task.
lbowmaker added subscribers: JAllemandou, lbowmaker.

Resolving this ticket. Retries has been set to 0 for the job, data quality work is being done in this ticket: https://phabricator.wikimedia.org/T354692

After discussing with DE team it was decided that this is ok for now and at some point we should re-write this job to use some of the newer tools we are developing around the config store and dynamic dags.

cc - @JAllemandou @Ahoelzl

nshahquinn-wmf subscribed.

The most recent run of this job (which finished today) still had a retry. @JAllemandou and @xcollazo have been discussing this.

@xcollazo wrote:

Anyhow, I think a simple fix leveraging the python retry library will take care of this. Unfortunately the fix is in the skein library, so to do it, we will have to build skein ourselves. Here is how the fix would look: https://gitlab.wikimedia.org/xcollazo/skein/-/commit/1ffeb3d7366aee7c80f248461edecd7ca01203c2

Should we expect duplicate data in mediawiki_wikitext_history or has that been cleaned up?

The most recent run of this job (which finished today) still had a retry.
...
Should we expect duplicate data in mediawiki_wikitext_history or has that been cleaned up?

I took a look at some wikis: enwiki, commonswiki, eswiki, and they look good, with only one job UUID on all the files. Not sure if this is because @JAllemandou already cleaned it up?

Nothing done on my end - possibly one of the 2 jobs failed for real?

@lbowmaker please have a look at this and see if you can fit it in.

@Snwachukwu and I looked into this a bit and we figured that an alternative fix to avoid the cost of changing the skein library is to catch the exception higher in the stack trace.

We figured that catching it here could work as well.

@Snwachukwu and I looked into this a bit and we figured that an alternative fix to avoid the cost of changing the skein library is to catch the exception higher in the stack trace.

We figured that catching it here could work as well.

Actually, here is best, considering our loop calls report_string() from over here.