Page MenuHomePhabricator

NEW BUG REPORT Some DAG run attempts fail because File *_temporary/0 does not exist.
Closed, ResolvedPublicBUG REPORT

Description

Data Engineering Bug Report or Data Problem Form.

Please fill out the following

What kind of problem are you reporting?

  • Access related problem
  • Service related problem
  • Data related problem
For a service related problem:

What is the nature of the issue?
The compute step of a DAG fails occasionally when the same DAG has multiple tasks executing concurrently (3 by default) during an automatically Airflow-triggered backfill.

From the failed attempt:

script: spark3-submit --driver-cores 2 --conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3
    --conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf --master yarn --conf
    spark.dynamicAllocation.maxExecutors=16 --conf spark.yarn.executor.memoryOverhead=2048
    --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf --conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3
    --executor-cores 4 --executor-memory 8G --driver-memory 4G --keytab analytics-product.keytab
    --principal analytics-product/an-airflow1006.eqiad.wmnet@WIKIMEDIA --name wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627
    --class org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver --queue production
    --deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics_product/wmf-sparksqlclidriver-1.0.0.jar
    -f https://gitlab.wikimedia.org/repos/product-analytics/data-pipelines/-/raw/40fcc4126c1029a94992cef3c74600ef92741ec8/wikipedia_chatgpt_plugin/searches/generate_wikipedia_chatgpt_plugin_searches_daily.hql
    -d source_table=event.mediawiki_cirrussearch_request -d destination_table=wmf_product.wikipedia_chatgpt_plugin_searches
    -d year=2023 -d month=6 -d day=27

Full Airflow log:

an-airflow1006.eqiad.wmnet
*** Found local files:
***   * /srv/airflow-analytics_product/logs/dag_id=wikipedia_chatgpt_plugin_searches_daily/run_id=scheduled__2023-06-27T00:00:00+00:00/task_id=compute_wikipedia_chatgpt_plugin_searches/attempt=3.log
[2023-09-20, 20:21:45 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: wikipedia_chatgpt_plugin_searches_daily.compute_wikipedia_chatgpt_plugin_searches scheduled__2023-06-27T00:00:00+00:00 [queued]>
[2023-09-20, 20:21:45 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: wikipedia_chatgpt_plugin_searches_daily.compute_wikipedia_chatgpt_plugin_searches scheduled__2023-06-27T00:00:00+00:00 [queued]>
[2023-09-20, 20:21:45 UTC] {taskinstance.py:1308} INFO - Starting attempt 3 of 6
[2023-09-20, 20:21:45 UTC] {taskinstance.py:1327} INFO - Executing <Task(SparkSqlOperator): compute_wikipedia_chatgpt_plugin_searches> on 2023-06-27 00:00:00+00:00
[2023-09-20, 20:21:45 UTC] {standard_task_runner.py:57} INFO - Started process 13463 to run task
[2023-09-20, 20:21:45 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'wikipedia_chatgpt_plugin_searches_daily', 'compute_wikipedia_chatgpt_plugin_searches', 'scheduled__2023-06-27T00:00:00+00:00', '--job-id', '750', '--raw', '--subdir', 'DAGS_FOLDER/wikipedia_chatgpt_plugin/wikipedia_chatgpt_plugin_searches_daily_dag.py', '--cfg-path', '/tmp/tmply_gg6gm']
[2023-09-20, 20:21:45 UTC] {standard_task_runner.py:85} INFO - Job 750: Subtask compute_wikipedia_chatgpt_plugin_searches
[2023-09-20, 20:21:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: wikipedia_chatgpt_plugin_searches_daily.compute_wikipedia_chatgpt_plugin_searches scheduled__2023-06-27T00:00:00+00:00 [running]> on host an-airflow1006.eqiad.wmnet
[2023-09-20, 20:21:46 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='product-analytics+alerts@wikimedia.org,data-engineering-alerts@lists.wikimedia.org' AIRFLOW_CTX_DAG_OWNER='analytics-product' AIRFLOW_CTX_DAG_ID='wikipedia_chatgpt_plugin_searches_daily' AIRFLOW_CTX_TASK_ID='compute_wikipedia_chatgpt_plugin_searches' AIRFLOW_CTX_EXECUTION_DATE='2023-06-27T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-06-27T00:00:00+00:00'
[2023-09-20, 20:21:46 UTC] {spark.py:399} INFO - SparkSkeinSubmitHook(wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627) - Converted file for spark _keytab from '/etc/security/keytabs/analytics-product/analytics-product.keytab' to 'analytics-product.keytab' for use with skein.
[2023-09-20, 20:21:46 UTC] {spark_submit.py:341} INFO - Spark-Submit cmd: spark3-submit --master yarn --conf spark.dynamicAllocation.maxExecutors=16 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf --conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3 --executor-cores 4 --executor-memory 8G --driver-memory 4G --keytab analytics-product.keytab --principal analytics-product/an-airflow1006.eqiad.wmnet@WIKIMEDIA --name wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627 --class org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver --queue production --deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics_product/wmf-sparksqlclidriver-1.0.0.jar -f https://gitlab.wikimedia.org/repos/product-analytics/data-pipelines/-/raw/40fcc4126c1029a94992cef3c74600ef92741ec8/wikipedia_chatgpt_plugin/searches/generate_wikipedia_chatgpt_plugin_searches_daily.hql -d source_table=event.mediawiki_cirrussearch_request -d destination_table=wmf_product.wikipedia_chatgpt_plugin_searches -d year=2023 -d month=6 -d day=27
[2023-09-20, 20:21:46 UTC] {spark.py:189} INFO - SparkSkeinSubmitHook(wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627) spark-submit final cmd: spark3-submit --driver-cores 2 --conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3 --conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf --master yarn --conf spark.dynamicAllocation.maxExecutors=16 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf --conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3 --executor-cores 4 --executor-memory 8G --driver-memory 4G --keytab analytics-product.keytab --principal analytics-product/an-airflow1006.eqiad.wmnet@WIKIMEDIA --name wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627 --class org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver --queue production --deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics_product/wmf-sparksqlclidriver-1.0.0.jar -f https://gitlab.wikimedia.org/repos/product-analytics/data-pipelines/-/raw/40fcc4126c1029a94992cef3c74600ef92741ec8/wikipedia_chatgpt_plugin/searches/generate_wikipedia_chatgpt_plugin_searches_daily.hql -d source_table=event.mediawiki_cirrussearch_request -d destination_table=wmf_product.wikipedia_chatgpt_plugin_searches -d year=2023 -d month=6 -d day=27
[2023-09-20, 20:21:46 UTC] {skein.py:205} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627 - Submitting with Skein ApplicationSpec:
acls:
  enable: false
  modify_groups: []
  modify_users: []
  ui_users: []
  view_groups: []
  view_users: []
file_systems: []
master:
  env:
    SPARK_CONF_DIR: /etc/spark3/conf
    SPARK_HOME: /usr/lib/spark3
  files:
    analytics-product.keytab:
      size: 0
      source: file:///etc/security/keytabs/analytics-product/analytics-product.keytab
      timestamp: 0
      type: FILE
      visibility: APPLICATION
  log_level: INFO
  resources:
    fpgas: 0
    gpus: 0
    memory: 3815
    vcores: 2
  script: spark3-submit --driver-cores 2 --conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3
    --conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf --master yarn --conf
    spark.dynamicAllocation.maxExecutors=16 --conf spark.yarn.executor.memoryOverhead=2048
    --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf --conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3
    --executor-cores 4 --executor-memory 8G --driver-memory 4G --keytab analytics-product.keytab
    --principal analytics-product/an-airflow1006.eqiad.wmnet@WIKIMEDIA --name wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627
    --class org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver --queue production
    --deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics_product/wmf-sparksqlclidriver-1.0.0.jar
    -f https://gitlab.wikimedia.org/repos/product-analytics/data-pipelines/-/raw/40fcc4126c1029a94992cef3c74600ef92741ec8/wikipedia_chatgpt_plugin/searches/generate_wikipedia_chatgpt_plugin_searches_daily.hql
    -d source_table=event.mediawiki_cirrussearch_request -d destination_table=wmf_product.wikipedia_chatgpt_plugin_searches
    -d year=2023 -d month=6 -d day=27
max_attempts: 1
name: Airflow SparkSkeinSubmitHook skein launcher wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627
node_label: ''
queue: production
services: {}
tags: []
user: ''
[2023-09-20, 20:21:46 UTC] {skein.py:93} INFO - Constructing skein Client with kwargs: {'principal': 'analytics-product/an-airflow1006.eqiad.wmnet@WIKIMEDIA', 'keytab': '/etc/security/keytabs/analytics-product/analytics-product.keytab'}
[2023-09-20, 20:21:56 UTC] {skein.py:238} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627 application_1694521537759_43681 status: RUNNING - Waiting until finished.
[2023-09-20, 20:23:11 UTC] {skein.py:269} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher wikipedia_chatgpt_plugin_searches_daily__compute_wikipedia_chatgpt_plugin_searches__20230627 application_1694521537759_43681 - YARN application log collection is disabled. To view logs for the YARN App Master, run the following command:
	sudo -u analytics-product yarn logs -appOwner analytics-product -applicationId application_1694521537759_43681
If your App Master launched other YARN applications (e.g. a Spark app), you will need to look at these logs and run a simliar command but with the appropriate YARN application_id.
[2023-09-20, 20:23:12 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=wikipedia_chatgpt_plugin_searches_daily, task_id=compute_wikipedia_chatgpt_plugin_searches, execution_date=20230627T000000, start_date=20230920T202145, end_date=20230920T202312
[2023-09-20, 20:23:12 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-09-20, 20:23:12 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

The key line from the log is:

java.io.FileNotFoundException: File hdfs://analytics-hadoop/user/analytics-product/data/wikipedia_chatgpt_plugin_searches/daily/_temporary/0 does not exist.

The full log:

$ sudo -u analytics-product yarn logs -appOwner analytics-product -applicationId application_1694521537759_43662
23/09/21 15:13:03 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
23/09/21 15:13:03 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
Container: container_e91_1694521537759_43662_01_000001 on an-worker1133.eqiad.wmnet_8041_1695240992213
LogAggregationType: AGGREGATED
======================================================================================================
LogType:application.driver.log
LogLastModifiedTime:Wed Sep 20 20:16:32 +0000 2023
LogLength:20136
LogContents:
Running /opt/conda-analytics/bin/spark-submit $@
SPARK_HOME: /usr/lib/spark3
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3
23/09/20 20:15:29 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
23/09/20 20:15:30 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/20 20:15:31 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
23/09/20 20:15:31 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
23/09/20 20:15:31 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
23/09/20 20:15:31 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
23/09/20 20:15:32 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/09/20 20:15:32 WARN Utils: Service 'sparkDriver' could not bind on port 12001. Attempting port 12002.
23/09/20 20:15:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/09/20 20:15:32 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/09/20 20:15:41 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
23/09/20 20:15:41 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13001. Attempting port 13002.
23/09/20 20:15:41 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
ADD JAR file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar
23/09/20 20:15:43 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
Added [file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar] to class path
Added resources: [file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar]
Spark master: yarn, Application Id: application_1694521537759_43665
23/09/20 20:16:29 ERROR FileFormatWriter: Aborting job 6d747be9-fd55-456c-95b8-06baf42d0949.
java.io.FileNotFoundException: File hdfs://analytics-hadoop/user/analytics-product/data/wikipedia_chatgpt_plugin_searches/daily/_temporary/0 does not exist.
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1058)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
	at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1118)
	at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1115)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1125)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:381)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:500)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:494)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:494)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
	at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver.processFile(WMFSparkSQLCLIDriver.scala:349)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver$.main(WMFSparkSQLCLIDriver.scala:216)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver.main(WMFSparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/09/20 20:16:29 ERROR SparkSQLDriver: Failed in [

INSERT OVERWRITE TABLE ${destination_table}
    PARTITION(year=${year}, month=${month}, day=${day})
SELECT /*+ COALESCE(1) */
    `database` AS wiki_db,
    COUNT(1) AS search_count
FROM
    ${source_table}
WHERE
    http.request_headers['user-agent'] = 'wikipedia-chagpt-plugin bot'
    AND year = ${year}
    AND month = ${month}
    AND day = ${day}
GROUP BY `database`
]
org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:381)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:500)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:494)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:494)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
	at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver.processFile(WMFSparkSQLCLIDriver.scala:349)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver$.main(WMFSparkSQLCLIDriver.scala:216)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver.main(WMFSparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: File hdfs://analytics-hadoop/user/analytics-product/data/wikipedia_chatgpt_plugin_searches/daily/_temporary/0 does not exist.
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1058)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
	at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1118)
	at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1115)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1125)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
	... 48 more
org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:381)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:500)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:494)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:494)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
	at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver.processFile(WMFSparkSQLCLIDriver.scala:349)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver$.main(WMFSparkSQLCLIDriver.scala:216)
	at org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver.main(WMFSparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: File hdfs://analytics-hadoop/user/analytics-product/data/wikipedia_chatgpt_plugin_searches/daily/_temporary/0 does not exist.
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1058)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
	at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1118)
	at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1115)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1125)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:220)
	... 48 more


End of LogType:application.driver.log
***************************************************************************************

Container: container_e91_1694521537759_43662_01_000001 on an-worker1133.eqiad.wmnet_8041_1695240992213
LogAggregationType: AGGREGATED
======================================================================================================
LogType:application.master.log
LogLastModifiedTime:Wed Sep 20 20:16:32 +0000 2023
LogLength:1800
LogContents:
23/09/20 20:15:26 INFO skein.ApplicationMaster: Starting Skein version 0.8.2
23/09/20 20:15:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/20 20:15:27 INFO skein.ApplicationMaster: Running as user analytics-product
23/09/20 20:15:27 INFO conf.Configuration: resource-types.xml not found
23/09/20 20:15:27 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
23/09/20 20:15:27 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
23/09/20 20:15:27 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
23/09/20 20:15:27 INFO skein.ApplicationMaster: Application specification successfully loaded
23/09/20 20:15:28 INFO skein.ApplicationMaster: gRPC server started at an-worker1133.eqiad.wmnet:32777
23/09/20 20:15:28 INFO skein.ApplicationMaster: WebUI server started at an-worker1133.eqiad.wmnet:33873
23/09/20 20:15:28 INFO skein.ApplicationMaster: Registering application with resource manager
23/09/20 20:15:28 INFO skein.ApplicationMaster: Starting application driver
23/09/20 20:16:30 INFO skein.ApplicationMaster: Shutting down: Application driver failed with exit code 1, see logs for more information.
23/09/20 20:16:30 INFO skein.ApplicationMaster: Unregistering application with status FAILED
23/09/20 20:16:30 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
23/09/20 20:16:30 INFO skein.ApplicationMaster: Deleted application directory hdfs://analytics-hadoop/user/analytics-product/.skein/application_1694521537759_43662
23/09/20 20:16:30 INFO skein.ApplicationMaster: WebUI server shut down
23/09/20 20:16:30 INFO skein.ApplicationMaster: gRPC server shut down

End of LogType:application.master.log
***************************************************************************************


End of LogType:prelaunch.err
******************************************************************************

Container: container_e91_1694521537759_43662_01_000001 on an-worker1133.eqiad.wmnet_8041_1695240992213
LogAggregationType: AGGREGATED
======================================================================================================
LogType:prelaunch.out
LogLastModifiedTime:Wed Sep 20 20:16:32 +0000 2023
LogLength:70
LogContents:
Setting up env variables
Setting up job resources
Launching container

End of LogType:prelaunch.out
******************************************************************************

Container: container_e91_1694521537759_43662_01_000001 on an-worker1133.eqiad.wmnet_8041_1695240992213
LogAggregationType: AGGREGATED
======================================================================================================
LogType:container-localizer-syslog
LogLastModifiedTime:Wed Sep 20 20:16:32 +0000 2023
LogLength:184
LogContents:
2023-09-20 20:15:25,218 INFO [main] org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer: Disk Validator: yarn.nodemanager.disk-validator is loaded.

End of LogType:container-localizer-syslog
*******************************************************************************************

What are the steps to reproduce the issue? Please include links and screenshots where applicable.

Based on the nature of the issue, any DAG that is being run concurrently to backfill data will likely result in this issue.

What happens?

Some runs require up to 3 attempts before they finally succeed.

What should happen instead?

They should only require 1 attempt.

For the DE Team to fill out
Which systems does this effect?
  • Hive
  • Druid
  • Superset
  • Turnilo
  • WikiDumps
  • Wikistats
  • Airflow
  • HDFS
  • Goblin
  • Scqoop
  • Dashiki
  • DataHub
  • Spark
  • Jupyter
  • Modern Event Platform
  • Event Logging
  • Other
Impact Assessment:

Does this problem qualify as an incident?

  • Yes
  • No

Does this violate an SLO?

  • Yes
  • No
Value CalculatorRank
Will this improve the efficiency of a teams workflow?1-3
Does this have an effect of our Core Metrics?1-3
Does this align with our strategic goals?1-3
Is this a blocker for another team?1-3

Event Timeline

@Milimetric and I have a hypothesis that what's happening here is a race condition where the multiple concurrent runs of a DAG are all using the same temporary file (for something?) and when one run gets done it deletes the file but a concurrent run of the DAG (backfilling a different date) still expects it to exist.

If that's the case, I wonder what the temporary file is used for and whether backfilled data could be contaminated. Let's say one run is writing data for 2023-07-01 to it while a concurrent run is writing data for 2023-07-02 to it – could the final version of 2023-07-01 data also include 2023-07-02 data somehow?

Super interesting finding!

Tl;DR: No cross-job data issues, but potential failures when running parallel spark jobs onto the same table.

I have done some investigations and here's what I found:

  • When writing into an SQL table, even if in partitions, spark creates a temporary folder at the table-root path, named _temporary
  • If multiple spark applications insert into the same table at the same time, they share the same _temporary folder (what a mess!)
  • Data written by spark in the temporary folder is of the form: _temporary/ATTEMPT_NUMBER/_temporary/ATTEMPT_ID when a task is still running, and _temporary/ATTEMPT_NUMBER/TASK_ID when the task is done.
  • When the overall job is done, files are moved from the various _temporary/ATTEMPT_NUMBER/TASK_ID folders into the correct place on HDFS. This step is not moving files from other jobs task id's as the driver knows which tasks names to expect.
  • When the spark session is closed, the _temporary folder is deleted - This is the step leading to an issue, as if the folder is still being used by another job, well it's being deleted for that other job :)

Thank you for the great investigation, Joseph! Phew, I'm glad to learn that there's no risk of data contamination.

Oh hey, looks like we're not alone https://community.cloudera.com/t5/Support-Questions/How-to-change-Spark-temporary-directory-when-writing-data/m-p/237389 There appears to be a possible solution (a minor config change) which may fix this.

Oh hey, looks like we're not alone https://community.cloudera.com/t5/Support-Questions/How-to-change-Spark-temporary-directory-when-writing-data/m-p/237389 There appears to be a possible solution (a minor config change) which may fix this.

The config change describe in the link is SET mapreduce.fileoutputcommitter.algorithm.version = 2.
This change makes the temporary files lifecycle change a bit at the end:

  • Data written by spark in the temporary folder is of the form: _temporary/ATTEMPT_NUMBER/_temporary/ATTEMPT_ID when a task is still running
  • Data is moved straight away from the _temporary/ATTEMPT_NUMBER/_temporary/ATTEMPT_ID folders to the destination folders when tasks are done, instead of waiting in another temporary folder for all tasks to be done.

Pros:

  • When concurrent jobs use the same _temporary folder, if one deletes it at the end, the other doesn't fail. This is because what makes job fail in the output-committer=1 case are the already computed files not yet moved to final destination (_temporary/ATTEMPT_NUMBER/TASK_ID format files). Spark tasks for those files have successfully finished, and Spark doesn't expect to have to rerun them if the files are deleted. When output-committer=2 there only are files for tasks in flight in the _temporary folder, and if they are deleted spark will fail the task and retry it (up to 4 times before failing globally).

Cons:

  • Data shows up in the destination directory before all the data for that directory is computed. This shouldn't be a problem for us as what matters in our sensors is the Hive partition in the metastore, which is created after the all tasks are successfully done (and therefore all files are being made available in the destination folder).

I've created https://phabricator.wikimedia.org/T351388 to add the config globally to spark.

lbowmaker claimed this task.