Page MenuHomePhabricator

Figure out how to deal with MERGE conflicts
Closed, ResolvedPublic5 Estimated Story Points

Description

We now have 3 Airflow DAGs that do MERGE INTO statements into wmf_dumps.wikitext_raw_rc1. Those DAGs are:

dumps_merge_backfill_to_wikitext_raw
dumps_merge_events_to_wikitext_raw
dumps_merge_visibility_events_to_wikitext_raw

I have noticed however that MERGE conflicts are common, and they make the jobs retry quite a lot.

Here is an example stack:

Caused by: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching ((((((((ref(name="revision_timestamp") >= 1695081600000000 and ref(name="revision_timestamp") < 1695168000000000) or (ref(name="revision_timestamp") >= 1694995200000000 and ref(name="revision_timestamp") < 1695081600000000)) or
...
	at org.apache.iceberg.MergingSnapshotProducer.validateAddedDataFiles(MergingSnapshotProducer.java:350)
	at org.apache.iceberg.BaseOverwriteFiles.validate(BaseOverwriteFiles.java:142)
	at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:215)
	at org.apache.iceberg.BaseOverwriteFiles.apply(BaseOverwriteFiles.java:31)
	at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:365)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:363)
	at org.apache.iceberg.BaseOverwriteFiles.commit(BaseOverwriteFiles.java:31)
	at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:192)
	at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:92)
	at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteMergeWrite.commitWithSerializableIsolation(SparkWrite.java:385)
	at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteMergeWrite.commit(SparkWrite.java:361)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371)
	... 32 more

Iceberg writes using optimistic concurrency:

Iceberg supports multiple concurrent writes using optimistic concurrency.
Each writer assumes that no other writers are operating and writes out new table metadata for an operation. Then, the writer attempts to commit by atomically swapping the new table metadata file for the existing metadata file.
If the atomic swap fails because another writer has committed, the failed writer retries by writing a new metadata tree based on the new current table state.

This mechanism works perfectly well for append operations. However, whenever we are doing overwrite operations, like with a MERGE, the first MERGE to write wins, and the other one must fail if they are overwriting the same files. Because from the event streams we are always consuming typically recent data, dumps_merge_events_to_wikitext_raw and dumps_merge_visibility_events_to_wikitext_raw step on each other a lot.

In this task, we should:

  • Consider strategies to avoid MERGE conflicts, and thus avoid unnecessarily taxing the cluster with jobs that retry a lot.
    • One easy thing that comes to mind is serialization of MERGEs. We may need, for example, to merge (no pun intended) DAGs dumps_merge_events_to_wikitext_raw, and dumps_merge_visibility_events_to_wikitext_raw to be one DAG so that one doesn't step on each other, especially since they're both hourly.
    • Backfills are especially problematic to retry given the heavy computational cost. We should explore whether we can pause other MERGE DAGs automatically when the backfill is happening.

Details

TitleReferenceAuthorSource BranchDest Branch
Programatic declaration of Airflow Pools. Single slot pool for MERGEs into wmf_dumps.wikitext_rawrepos/data-engineering/airflow-dags!508xcollazoadd-pool-def-to-dumps-dagsmain
Customize query in GitLab

Event Timeline

Just read a bit about Airflow task pools, it looks like a feature we could use to serialize MERGEs with very little code: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/pools.html

xcollazo changed the task status from Open to In Progress.Oct 4 2023, 6:38 PM

Had a conversation with @Milimetric in which we considered whether changing the Iceberg table property write.merge.isolation-level would help here. It would be a more elegant solution given the concurrency issues would be solved at the table level rather than at Airflow level.

Iceberg does not have much documentation about the semantics of write.merge.isolation-level = serializable vs write.merge.isolation-level = snapshot, but there is a good explanation in the mailing list. The TL;DR is that in our case we would still have MERGE failures since our two conflicting writes can delete existing data files (i.e. they are doing overwrites as per the description of this ticket).

So, apart from redesigning the table, to avoid these conflicts then we must serialize the MERGEs, which gets us back to the Airflow pool mechanism.

Discussed this ticket with @JAllemandou, and there are some correctness issues that this work does not address. For example, we want to force dumps_merge_visibility_events_to_wikitext_raw to run after the same hour has been ingested for dumps_merge_events_to_wikitext_raw, because those visibility changes may be mutating new (wiki_db, revision_id ) tuples from the same hour.

I will tackle this correctness issues separately though, as part of T340863, as we also discussed that even when having correct order, we want to be able to detect if something is not looking good.

xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/508

Programatic declaration of Airflow Pools. Single slot pool for MERGEs into wmf_dumps.wikitext_raw

xcollazo set the point value for this task to 5.Oct 13 2023, 4:59 PM
xcollazo set Final Story Points to 8.