We now have 3 Airflow DAGs that do MERGE INTO statements into wmf_dumps.wikitext_raw_rc1. Those DAGs are:
dumps_merge_backfill_to_wikitext_raw dumps_merge_events_to_wikitext_raw dumps_merge_visibility_events_to_wikitext_raw
I have noticed however that MERGE conflicts are common, and they make the jobs retry quite a lot.
Here is an example stack:
Caused by: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching ((((((((ref(name="revision_timestamp") >= 1695081600000000 and ref(name="revision_timestamp") < 1695168000000000) or (ref(name="revision_timestamp") >= 1694995200000000 and ref(name="revision_timestamp") < 1695081600000000)) or ... at org.apache.iceberg.MergingSnapshotProducer.validateAddedDataFiles(MergingSnapshotProducer.java:350) at org.apache.iceberg.BaseOverwriteFiles.validate(BaseOverwriteFiles.java:142) at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:215) at org.apache.iceberg.BaseOverwriteFiles.apply(BaseOverwriteFiles.java:31) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:365) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:363) at org.apache.iceberg.BaseOverwriteFiles.commit(BaseOverwriteFiles.java:31) at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:192) at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:92) at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteMergeWrite.commitWithSerializableIsolation(SparkWrite.java:385) at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteMergeWrite.commit(SparkWrite.java:361) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371) ... 32 more
Iceberg writes using optimistic concurrency:
Iceberg supports multiple concurrent writes using optimistic concurrency.
Each writer assumes that no other writers are operating and writes out new table metadata for an operation. Then, the writer attempts to commit by atomically swapping the new table metadata file for the existing metadata file.
If the atomic swap fails because another writer has committed, the failed writer retries by writing a new metadata tree based on the new current table state.
This mechanism works perfectly well for append operations. However, whenever we are doing overwrite operations, like with a MERGE, the first MERGE to write wins, and the other one must fail if they are overwriting the same files. Because from the event streams we are always consuming typically recent data, dumps_merge_events_to_wikitext_raw and dumps_merge_visibility_events_to_wikitext_raw step on each other a lot.
In this task, we should:
- Consider strategies to avoid MERGE conflicts, and thus avoid unnecessarily taxing the cluster with jobs that retry a lot.
- One easy thing that comes to mind is serialization of MERGEs. We may need, for example, to merge (no pun intended) DAGs dumps_merge_events_to_wikitext_raw, and dumps_merge_visibility_events_to_wikitext_raw to be one DAG so that one doesn't step on each other, especially since they're both hourly.
- Backfills are especially problematic to retry given the heavy computational cost. We should explore whether we can pause other MERGE DAGs automatically when the backfill is happening.