Page MenuHomePhabricator

Investigate why we consume empty partitions from webrequests
Open, Needs TriagePublic

Description

TLDR: Downstream Airflow jobs seem to consume webrequests partitions when such partitions may not be ready for downstream consumption. What can we do about this?

Longer version:

While debugging an Airflow failure on the projectview_hourly DAG, we figured that the root cause was that we were consuming empty Hive partititions. My strong speculation is that this happened because the partition on webrequests was created, but the data was actually not available. Data eventually landed, but downstream jobs had already triggered and thus they acted on the empty partition.

For details of the original investigation, please see https://lists.wikimedia.org/hyperkitty/list/data-engineering-alerts@lists.wikimedia.org/thread/AX27YJC4N4B7A5GFD6DUVR5R7IAEDVVN/.

The long term issue here is that perhaps the majority of our Airflow jobs depend on webrequests, and if the data there is not good, then all downstream jobs are not good either. Additionally, since we currently do not have a provenance mechanism, the exercise to restate the bad downstream jobs is manual and error prone (need rto figure which Airflow jobs are downstream of the parent manually).

Event Timeline

Some questions here are:

  1. Is there a way to know all the DAGs that consume 'pageview_actor'?
  1. I speculate this all happened because, for some reason, webrequest data came in late. I can see from webrequest's Airflow DAG documentation that:

We add the partition to the table before verification, and do not drop the partition if there is an error.
Hence, the table might contain partitions that contains duplicates/holes. This is for the ease of the developers when
trying to have a look at the data. The table is not meant for researchers.

But if downstream jobs depend on that, perhaps we should fail the Airflow job if our data quality checks fail? Alternatively, we could have two tables, one for staging and data quality checks, and one for prod? (Incidentally, I know Iceberg has a pretty neat feature for exactly this use case: https://www.dremio.com/blog/streamlining-data-quality-in-apache-iceberg-with-write-audit-publish-branching/ )

  1. I speculate this all happened because, for some reason, webrequest data came in late. I can see from webrequest's Airflow DAG documentation that:

We add the partition to the table before verification, and do not drop the partition if there is an error.
Hence, the table might contain partitions that contains duplicates/holes. This is for the ease of the developers when
trying to have a look at the data. The table is not meant for researchers.

The modification times agree with Airflow DAG documentation in which partition is created *first*, and then data is added:

Data landed between Jul 31 12:49 and Jul 31 12:451, and _SUCCESS file was created at 12:52:

xcollazo@stat1007:/mnt/hdfs/wmf/data/wmf/webrequest/webrequest_source=text/year=2023/month=7/day=31/hour=11$ pwd
/mnt/hdfs/wmf/data/wmf/webrequest/webrequest_source=text/year=2023/month=7/day=31/hour=11
xcollazo@stat1007:/mnt/hdfs/wmf/data/wmf/webrequest/webrequest_source=text/year=2023/month=7/day=31/hour=11$ ls -lsha
...
227M -rw-r-----   1 analytics analytics-privatedata-users 227M Jul 31 12:49 part-00251-b1b06fd8-d893-4d8a-b8ba-7ac9bbf69690.c000.snappy.parquet
227M -rw-r-----   1 analytics analytics-privatedata-users 227M Jul 31 12:49 part-00252-b1b06fd8-d893-4d8a-b8ba-7ac9bbf69690.c000.snappy.parquet
227M -rw-r-----   1 analytics analytics-privatedata-users 227M Jul 31 12:49 part-00253-b1b06fd8-d893-4d8a-b8ba-7ac9bbf69690.c000.snappy.parquet
227M -rw-r-----   1 analytics analytics-privatedata-users 227M Jul 31 12:51 part-00254-b1b06fd8-d893-4d8a-b8ba-7ac9bbf69690.c000.snappy.parquet
227M -rw-r-----   1 analytics analytics-privatedata-users 227M Jul 31 12:51 part-00255-b1b06fd8-d893-4d8a-b8ba-7ac9bbf69690.c000.snappy.parquet
...
   0 -rw-r-----   1 analytics analytics-privatedata-users    0 Jul 31 12:52 _SUCCESS

Partition hour=11 got moved (i.e committed) into the normal hierarchy on 12:52, which also tracks:

xcollazo@stat1007:/mnt/hdfs/wmf/data/wmf/webrequest/webrequest_source=text/year=2023/month=7/day=31$ pwd
/mnt/hdfs/wmf/data/wmf/webrequest/webrequest_source=text/year=2023/month=7/day=31
xcollazo@stat1007:/mnt/hdfs/wmf/data/wmf/webrequest/webrequest_source=text/year=2023/month=7/day=31$ ls -lsha
...
4.0K drwxr-x--- 259 analytics analytics-privatedata-users 4.0K Jul 31 11:30 'hour=10'
4.0K drwxr-x--- 259 analytics analytics-privatedata-users 4.0K Jul 31 12:52 'hour=11'
4.0K drwxr-x--- 259 analytics analytics-privatedata-users 4.0K Jul 31 13:33 'hour=12'
4.0K drwxr-x--- 259 analytics analytics-privatedata-users 4.0K Jul 31 14:33 'hour=13'
...

Hive metatstore knew about the partition way before data landed (Mon Jul 31 12:32:42 UTC 2023):

spark-sql (default)> DESCRIBE FORMATTED wmf.webrequest PARTITION(webrequest_source='text',year=2023,month=7,day=31,hour=11);
...
# Detailed Partition Information		
Database	wmf	
Table	webrequest	
Partition Values	[year=2023, hour=11, day=31, month=7, webrequest_source=text]	
Location	hdfs://analytics-hadoop/wmf/data/wmf/webrequest/webrequest_source=text/year=2023/month=7/day=31/hour=11	
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe	
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat	
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat	
Storage Properties	[serialization.format=1]	
Partition Parameters	{transient_lastDdlTime=1690806762}	
Created Time	Mon Jul 31 12:32:42 UTC 2023	
Last Access	UNKNOWN
...

Now the pageview_actor_hourly DAG waits on the following sensor definition:

hive_wmf_webrequest_text:
  datastore: hive
  table_name: wmf.webrequest
  partitioning: "@hourly"
  pre_partitions: ["webrequest_source=text"]

So there was a 12:52 - 12:32 = ~20 minute window in which this downstream job could have run on an empty partition. Looking at logs confirms that indeed we ran in this window, at [2023-07-31, 12:35:38 UTC]:

Airflow logs for sensor wait_for_wmf_webrequest_partitions for offending run :
( http://localhost:8600/log?dag_id=pageview_actor_hourly&task_id=wait_for_wmf_webrequest_partitions&execution_date=2023-07-31T11%3A00%3A00%2B00%3A00 )

...
[2023-07-31, 12:35:36 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: pageview_actor_hourly.wait_for_wmf_webrequest_partitions scheduled__2023-07-31T11:00:00+00:00 [queued]>
[2023-07-31, 12:35:36 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: pageview_actor_hourly.wait_for_wmf_webrequest_partitions scheduled__2023-07-31T11:00:00+00:00 [queued]>
[2023-07-31, 12:35:36 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 6
[2023-07-31, 12:35:36 UTC] {taskinstance.py:1327} INFO - Executing <Task(NamedHivePartitionSensor): wait_for_wmf_webrequest_partitions> on 2023-07-31 11:00:00+00:00
[2023-07-31, 12:35:36 UTC] {standard_task_runner.py:57} INFO - Started process 18133 to run task
[2023-07-31, 12:35:36 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'pageview_actor_hourly', 'wait_for_wmf_webrequest_partitions', 'scheduled__2023-07-31T11:00:00+00:00', '--job-id', '1810599', '--raw', '--subdir', 'DAGS_FOLDER/pageview/pageview_actor_hourly_dag.py', '--cfg-path', '/tmp/tmp7wevd9ec']
[2023-07-31, 12:35:36 UTC] {standard_task_runner.py:85} INFO - Job 1810599: Subtask wait_for_wmf_webrequest_partitions
[2023-07-31, 12:35:37 UTC] {task_command.py:410} INFO - Running <TaskInstance: pageview_actor_hourly.wait_for_wmf_webrequest_partitions scheduled__2023-07-31T11:00:00+00:00 [running]> on host an-launcher1002.eqiad.wmnet
[2023-07-31, 12:35:37 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='data-engineering-alerts@lists.wikimedia.org' AIRFLOW_CTX_DAG_OWNER='analytics' AIRFLOW_CTX_DAG_ID='pageview_actor_hourly' AIRFLOW_CTX_TASK_ID='wait_for_wmf_webrequest_partitions' AIRFLOW_CTX_EXECUTION_DATE='2023-07-31T11:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-31T11:00:00+00:00'
[2023-07-31, 12:35:38 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.
[2023-07-31, 12:35:38 UTC] {base.py:73} INFO - Using connection ID 'analytics-hive' for task execution.
[2023-07-31, 12:35:38 UTC] {hive.py:577} INFO - Trying to connect to analytics-hive.eqiad.wmnet:9083
[2023-07-31, 12:35:38 UTC] {hive.py:579} INFO - Connected to analytics-hive.eqiad.wmnet:9083
[2023-07-31, 12:35:38 UTC] {logging_mixin.py:150} WARNING - /usr/lib/airflow/lib/python3.10/site-packages/airflow/providers/apache/hive/hooks/hive.py:507 AirflowProviderDeprecationWarning: The 'authMechanism' option is deprecated. Please use 'auth_mechanism'.
[2023-07-31, 12:35:38 UTC] {named_hive_partition.py:94} INFO - Poking for wmf.webrequest/webrequest_source=text/year=2023/month=7/day=31/hour=11
[2023-07-31, 12:35:38 UTC] {base.py:255} INFO - Success criteria met. Exiting.
[2023-07-31, 12:35:38 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=pageview_actor_hourly, task_id=wait_for_wmf_webrequest_partitions, execution_date=20230731T110000, start_date=20230731T123536, end_date=20230731T123538
[2023-07-31, 12:35:38 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-07-31, 12:35:38 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

This is problematic!

Ok, upon further inspection of the refine_webrequest_hourly_text Airflow DAG, I had misread the comment of:

We add the partition to the table before verification, and do not drop the partition if there is an error.
Hence, the table might contain partitions that contains duplicates/holes. This is for the ease of the developers when
trying to have a look at the data. The table is not meant for researchers.

That comments refers to the add_raw_webrequest_partition Airflow task, and it adds a partition to wmf_raw.webrequest and not wmf.webrequest. That is good.

The behavior observed in T343238#9059722 stands, however. So not surprisingly, the only time the refine_webrequest Airflow task has failed recently and retried is for the offending hour=11 that created the temporarily empty partition.

So I will investigate now why did the partition commit happened while the Airflow job failed.

Airflow logs for http://localhost:8600/log?dag_id=refine_webrequest_hourly_text&task_id=refine_webrequest&execution_date=2023-07-31T11%3A00%3A00%2B00%3A00:

[2023-07-31, 12:15:02 UTC] {skein.py:93} INFO - Constructing skein Client with kwargs: {'principal': 'analytics/an-launcher1002.eqiad.wmnet@WIKIMEDIA', 'keytab': '/etc/security/keytabs/analytics/analytics.keytab'}
[2023-07-31, 12:15:11 UTC] {skein.py:238} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher refine_webrequest_hourly_text__refine_webrequest__20230731 application_1688722260742_121399 status: RUNNING - Waiting until finished.
[2023-07-31, 12:20:12 UTC] {skein.py:238} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher refine_webrequest_hourly_text__refine_webrequest__20230731 application_1688722260742_121399 status: RUNNING - Waiting until finished.
[2023-07-31, 12:25:13 UTC] {skein.py:238} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher refine_webrequest_hourly_text__refine_webrequest__20230731 application_1688722260742_121399 status: RUNNING - Waiting until finished.
[2023-07-31, 12:29:57 UTC] {taskinstance.py:1517} ERROR - Received SIGTERM. Terminating subprocesses.
[2023-07-31, 12:29:57 UTC] {local_task_job_runner.py:115} ERROR - Received SIGTERM. Terminating subprocesses
[2023-07-31, 12:29:57 UTC] {local_task_job_runner.py:115} ERROR - Received SIGTERM. Terminating subprocesses
[2023-07-31, 12:29:57 UTC] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 2403. PIDs of all processes in the group: [2948, 2403]
[2023-07-31, 12:29:57 UTC] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 2403
[2023-07-31, 12:29:57 UTC] {taskinstance.py:1517} ERROR - Received SIGTERM. Terminating subprocesses.
[2023-07-31, 12:29:57 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 239, in submit
    time.sleep(15)
  File "/usr/lib/airflow/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1518, in signal_handler
    self.task.on_kill()
 ...
skein.exceptions.ConnectionError: Unable to connect to driver
[2023-07-31, 12:29:57 UTC] {taskinstance.py:1345} INFO - Marking task as UP_FOR_RETRY. dag_id=refine_webrequest_hourly_text, task_id=refine_webrequest, execution_date=20230731T110000, start_date=20230731T121502, end_date=20230731T122957
[2023-07-31, 12:29:57 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 1810448 for task refine_webrequest (Unable to connect to driver; 2403)

It seems that the SIGTERM is targeting the Airflow process. And indeed, if we correlate to T336286#9055167, when we were upgrading the Airflow instance to 2.6.3, the dates match:

Date SIGTERM was received:

[2023-07-31, 12:29:57 UTC] {taskinstance.py:1517} ERROR - Received SIGTERM. Terminating subprocesses.

Date the Airflow DB upgrade was triggered:

[2023-07-31T12:29:14.286+0000] {migration.py:213} INFO - Context impl PostgresqlImpl.
[2023-07-31T12:29:14.287+0000] {migration.py:216} INFO - Will assume transactional DDL.
[2023-07-31T12:29:14.333+0000] {db.py:1591} INFO - Creating tables

So I think this is what happened here:

  1. refine_webrequest_hourly_text Airflow DAG triggers, and makes it all the way to the refine_webrequest task in which the INSERT OVERWRITE is done against wmf.webrequest.
  2. The Airflow migration starts, and cascades a SIGTERM to all running operators.
  3. The refine_webrequest receives the SIGTERM and promptly exits, marking the task for UP_FOR_RETRY, *BUT* the INSERT seemingly commits an empty partition?
  4. The Airflow migration ends.
  5. Because of (3), downstream jobs start doing their thing since they only depend on the Hive sensor telling them about a new partition on wmf.webrequest. They all write bad data.
  6. refine_webrequest, being marked as UP_FOR_RETRY, reruns successfully. Data is now properly available at wmf.webrequest.

The details on step (3) above are still murky, as I don't understand why an INSERT OVERWRITE would succeed if the driver is gone, or if the driver was not gone and kept going, then I do not understand why the commit would not have included the actual legit data.

However, an immediate action item is to have a process for upgrades in which we deliberately:
a) Note if any DAGs are currently paused, and if so, take note of them.
b) Pause all DAGs.
c) Wait until all current DAG tasks finish/fail.
d) Run the upgrade.
e) Unpause all DAGs, except the ones noted on (a).

CC @BTullis

The details on step (3) above are still murky, as I don't understand why an INSERT OVERWRITE would succeed if the driver is gone, or if the driver was not gone and kept going, then I do not understand why the commit would not have included the actual legit data.

Yarn logs suggest that the offending Yarn job indeed continued to finish successfully:

xcollazo@an-launcher1002:~/tmp/logs$ sudo -u analytics yarn logs -appOwner analytics -applicationId application_1688722260742_121399 > refine_webrequest_hourly_text.log

Master logs:

=======================================================================================================
LogType:application.master.log
LogLastModifiedTime:Mon Jul 31 12:32:44 +0000 2023
LogLength:1794
LogContents:
23/07/31 12:15:09 INFO skein.ApplicationMaster: Starting Skein version 0.8.2
23/07/31 12:15:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/31 12:15:09 INFO skein.ApplicationMaster: Running as user analytics/an-launcher1002.eqiad.wmnet@WIKIMEDIA
23/07/31 12:15:10 INFO conf.Configuration: resource-types.xml not found
23/07/31 12:15:10 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
23/07/31 12:15:10 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
23/07/31 12:15:10 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
23/07/31 12:15:10 INFO skein.ApplicationMaster: Application specification successfully loaded
23/07/31 12:15:10 INFO skein.ApplicationMaster: gRPC server started at an-worker1111.eqiad.wmnet:45795
23/07/31 12:15:11 INFO skein.ApplicationMaster: WebUI server started at an-worker1111.eqiad.wmnet:42667
23/07/31 12:15:11 INFO skein.ApplicationMaster: Registering application with resource manager
23/07/31 12:15:11 INFO skein.ApplicationMaster: Starting application driver
23/07/31 12:32:43 INFO skein.ApplicationMaster: Shutting down: Application driver completed successfully.
23/07/31 12:32:43 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED                 <<<<<<<<<<<<<<<<<<<<<<
23/07/31 12:32:43 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
23/07/31 12:32:43 INFO skein.ApplicationMaster: Deleted application directory hdfs://analytics-hadoop/user/analytics/.skein/application_1688722260742_121399
23/07/31 12:32:43 INFO skein.ApplicationMaster: WebUI server shut down
23/07/31 12:32:43 INFO skein.ApplicationMaster: gRPC server shut down

End of LogType:application.master.log

Application driver:

=======================================================================================================
LogType:application.driver.log
LogLastModifiedTime:Mon Jul 31 12:32:44 +0000 2023
LogLength:3368
LogContents:
Running /opt/conda-analytics/bin/spark-submit $@
SPARK_HOME: /usr/lib/spark3
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

(process:64213): dconf-CRITICAL **: 12:15:21.953: unable to create directory '/home/.cache/dconf': Permission denied.  dconf will not work properly.
...
(process:64213): dconf-CRITICAL **: 12:15:21.954: unable to create directory '/home/.cache/dconf': Permission denied.  dconf will not work properly.
ADD JAR file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar
23/07/31 12:15:22 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
Added [file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar] to class path
Added resources: [file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar]
Spark master: yarn, Application Id: application_1688722260742_121400
ADD JAR hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-hive-0.2.20-shaded.jar
Added [/tmp/8450030e-261a-40e3-864c-f4a8cda82b9a_resources/refinery-hive-0.2.20-shaded.jar] to class path
Added resources: [hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-hive-0.2.20-shaded.jar]
result
Time taken: 1.332 seconds
...
key	value
spark.sql.shuffle.partitions	256
Time taken: 0.254 seconds, Fetched 1 row(s)
23/07/31 12:15:26 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Response code
Time taken: 1038.106 seconds.               <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

End of LogType:application.driver.log

So it seems that the SIGTERM only killed the Airflow process, with no mechanism to forward the kill to the Spark Skein job. However the question is still: why was the data not available when this seemingly successful Yarn job committed?

Gah! Sorry Xabriel.

I agree on your proposal for an upgrade procedure like the one you have mentioned. We'll probably need a cookbook for it.