Page MenuHomePhabricator

Table maintenance for wmf_dumps.wikitext_inconsistent_row is failing
Closed, ResolvedPublic

Description

As per Airflow, the run of remove_orphan_files() has failed multiple times.

Related:

The table file count has grown very big:

spark.sql("""
SELECT COUNT(1) as count
FROM wmf_dumps.wikitext_inconsistent_rows_rc1.files
""").show()

+-------+
|  count|
+-------+
|1496453|
+-------+

This is unexpected. Will investigate.

Event Timeline

Looks like an OOM:

sudo -u analytics yarn logs -appOwner analytics -applicationId application_1727783536357_360700

Spark master: yarn, Application Id: application_1727783536357_360701
24/10/21 17:14:28 WARN TaskSetManager: Stage 0 contains a task of very large size (17332 KiB). The maximum recommended task size is 1000 KiB.
24/10/21 17:15:28 ERROR SparkSQLDriver: Failed in [CALL spark_catalog.system.remove_orphan_files( table => 'wmf_dumps.wikitext_inconsistent_rows_rc1', older_than => TIMESTAMP '2024-09-29T00:00:00', max_concurrent_deletes => 10 )]
org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1085)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1083)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1083)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2463)
...

On T373694, we added the abilty to set spark_kwargs and spark_conf settings, but only for rewrite_data_files() maintenance tasks. That was an oversight. So let's fix that so that we can add more memory to this remove_orphan_files() task.

Regarding:

spark.sql("""
SELECT COUNT(1) as count
FROM wmf_dumps.wikitext_inconsistent_rows_rc1.files
""").show()

+-------+
|  count|
+-------+
|1496453|
+-------+

We now have:

+-----+
|count|
+-----+
|  904|
+-----+

After running:

kerberos-run-command analytics spark3-submit  \
--driver-cores 4  \
--conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3  \
--conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf  \
--master yarn \
--conf spark.sql.autoBroadcastJoinThreshold=-1  \
--conf spark.dynamicAllocation.maxExecutors=32 \
--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2  \
--conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3  \
--executor-cores 2  \
--executor-memory 20g  \
--driver-memory 20g  \
--name table_maintenance_iceberg_weekly__wmf_dumps__wikitext_inconsistent_rows_rc1.rewrite_data_files__20240901_xcollazo \
--class org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver  \
--queue production \
--deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics/wmf-sparksqlclidriver-1.0.0.jar \
-e 'CALL spark_catalog.system.rewrite_data_files( table => '"'"'wmf_dumps.wikitext_inconsistent_rows_rc1'"'"', strategy => '"'"'sort'"'"', sort_order => '"'"'wiki_db ASC NULLS FIRST, computation_dt ASC NULLS FIRST, revision_timestamp ASC NULLS FIRST'"'"' )'


rewritten_data_files_count	added_data_files_count	rewritten_bytes_count
1497512	901	5651215718
Time taken: 3648.22 seconds, Fetched 1 row(s)

On hindsight, this accumulation of small files makes sense since we have ~900 wikis, and we were running with the default spark.sql.shuffle.partitions = 200. Thus we could easily accumulate close to 180k files per day.

To fix this, we will set spark.sql.shuffle.partitions = 64 on the dumps_reconcile_wikitext_raw_daily dag, and run a weekly rewrite_data_files() to compact the files continuously.

xcollazo opened https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/902

Draft: Add weekly rewrite_data_files() maintenance for wikitext_inconsistent_rows.

xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/902

Add weekly rewrite_data_files() maintenance for wikitext_inconsistent_rows.