Page MenuHomePhabricator

Figure out if an intermediary table while backfilling is beneficial
Closed, ResolvedPublic5 Estimated Story Points

Description

We currently do backfilling of wmf_dumps.wikitext_raw_rc1 with a job that ingests data on a per year basis. Details on rationale are available at T340861: Implement a backfill job for the dumps hourly table.

The number one consumer of time for this approach is the constant re-reading of the wmf.mediawiki_wikitext_history table, which, for, say, enwiki, is ~11TB. A MERGE INTO wants to read this table 3 times, so it quickly becomes expensive time wise.

In this task we should explore whether having a temporary table that partitions and transforms wmf.mediawiki_wikitext_history to a more efficient form gains us performance.

Event Timeline

xcollazo changed the task status from Open to In Progress.EditedSep 15 2023, 4:45 PM

Schema being used is the same as the current merge SELECT, but notice the PARTITIONED BY clause:

spark.sql("""
CREATE TABLE xcollazo_iceberg.temporary_wikitext_raw_source_data_from_2023_07
USING ICEBERG
PARTITIONED BY (s_wiki_db, months(s_revision_timestamp))
TBLPROPERTIES (
    'write.format.default' = 'avro',               -- avoid OOMs due to revision_content_slots
    'write.distribution-mode' = 'hash'             -- avoid millions of small files
)
COMMENT 'Temporary table to transform and paertition source data for faster backfills.'
AS

WITH visibility AS (
    SELECT
        rev_id                                        AS v_revision_id,
        wiki_db                                       AS v_wiki_db,
        to_timestamp(rev_timestamp, 'yyyyMMddkkmmss') AS v_revision_timestamp,
        rev_deleted & 1 = 0                           AS v_is_text_visible,
        rev_deleted & 2 = 0                           AS v_is_user_visible,
        rev_deleted & 4 = 0                           AS v_is_comment_visible
    FROM
        wmf_raw.mediawiki_revision
    WHERE
        rev_deleted > 0
        AND snapshot = '2023-07'
)

SELECT /*+ BROADCAST(v) */
    page_id                             AS s_page_id,
    page_namespace                      AS s_page_namespace,
    page_title                          AS s_page_title,
    page_redirect_title                 AS s_page_redirect_title,
    user_id                             AS s_user_id,
    user_text                           AS s_user_text,
    v_is_user_visible                   AS s_user_is_visible,
    revision_id                         AS s_revision_id,
    revision_parent_id                  AS s_revision_parent_id,
    to_timestamp(revision_timestamp)    AS s_revision_timestamp,
    revision_minor_edit                 AS s_revision_is_minor_edit,
    revision_comment                    AS s_revision_comment,
    v_is_comment_visible                AS s_revision_comment_is_visible,
    revision_text_sha1                  AS s_revision_sha1,
    revision_text_bytes                 AS s_revision_size,
    v_is_text_visible                   AS s_revision_content_is_visible,
    MAP(
        'main',
        ( revision_text                 AS content_body,
          revision_content_format       AS content_format,
          revision_content_model        AS content_model,
          revision_text_sha1            AS content_sha1,
          revision_text_bytes           AS content_size
        )
    )                                   AS s_revision_content_slots,
    wiki_db                             AS s_wiki_db
FROM wmf.mediawiki_wikitext_history s
LEFT JOIN visibility v ON s.wiki_db = v_wiki_db AND s.revision_id = v_revision_id
WHERE snapshot = '2023-07'
LIMIT 0
"""
).collect()

A successful INSERT:

# this took 18.9 h
# 1.4h in the suffle, and the rest (17.5h) on the sort, which retried many times
# presumably the sort would only take 7.5h withouth the retries.
# all retries were due to FetchFailedExceptions

spark.sql("""
INSERT INTO xcollazo_iceberg.temporary_wikitext_raw_source_data_from_2023_07

WITH visibility AS (
    SELECT
        rev_id                                        AS v_revision_id,
        wiki_db                                       AS v_wiki_db,
        to_timestamp(rev_timestamp, 'yyyyMMddkkmmss') AS v_revision_timestamp,
        rev_deleted & 1 = 0                           AS v_is_text_visible,
        rev_deleted & 2 = 0                           AS v_is_user_visible,
        rev_deleted & 4 = 0                           AS v_is_comment_visible
    FROM
        wmf_raw.mediawiki_revision
    WHERE
        rev_deleted > 0
        AND snapshot = '2023-07'
)

SELECT /*+ BROADCAST(v) */
    page_id                             AS s_page_id,
    page_namespace                      AS s_page_namespace,
    page_title                          AS s_page_title,
    page_redirect_title                 AS s_page_redirect_title,
    user_id                             AS s_user_id,
    user_text                           AS s_user_text,
    v_is_user_visible                   AS s_user_is_visible,
    revision_id                         AS s_revision_id,
    revision_parent_id                  AS s_revision_parent_id,
    to_timestamp(revision_timestamp)    AS s_revision_timestamp,
    revision_minor_edit                 AS s_revision_is_minor_edit,
    revision_comment                    AS s_revision_comment,
    v_is_comment_visible                AS s_revision_comment_is_visible,
    revision_text_sha1                  AS s_revision_sha1,
    revision_text_bytes                 AS s_revision_size,
    v_is_text_visible                   AS s_revision_content_is_visible,
    MAP(
        'main',
        ( revision_text                 AS content_body,
          revision_content_format       AS content_format,
          revision_content_model        AS content_model,
          revision_text_sha1            AS content_sha1,
          revision_text_bytes           AS content_size
        )
    )                                   AS s_revision_content_slots,
    wiki_db                             AS s_wiki_db
FROM wmf.mediawiki_wikitext_history s
LEFT JOIN visibility v ON s.wiki_db = v_wiki_db AND s.revision_id = v_revision_id
WHERE snapshot = '2023-07'
"""
).collect()

Some stats:

SELECT count(1) FROM xcollazo_iceberg.temporary_wikitext_raw_source_data_from_2023_07
+----------+
|  count(1)|
+----------+
|6252573034|
+----------+

SELECT summary FROM xcollazo_iceberg.temporary_wikitext_raw_source_data_from_2023_07.snapshots
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|summary                                                                                                                                                                                                                                                                                                                                                          |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{spark.app.id -> application_1694521537759_11590, changed-partition-count -> 0, total-records -> 0, total-files-size -> 0, total-data-files -> 0, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}                                                                                                                             |
|{spark.app.id -> application_1694521537759_11590, added-data-files -> 218191, added-records -> 6252573034, added-files-size -> 25552586038578, changed-partition-count -> 176749, total-records -> 6252573034, total-files-size -> 25552586038578, total-data-files -> 218191, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Successful MERGE INTO from the intermediary table to wikitext_raw_rc1:

# this run of *all* wikis for the year of 2022 took 2.4hs
# most of that was the final sort (1.3h)
# no evidence of failures.
# this is good evidence that with the intermediary table, we can run the backfill significantly faster

spark.sql("""
MERGE INTO xcollazo_iceberg.wikitext_raw_rc1 t
USING (
    SELECT * FROM (
        SELECT *
        FROM xcollazo_iceberg.temporary_wikitext_raw_source_data_from_2023_07 s
        -- AND wiki_db IN ('simplewiki')
    )
    -- time bound backfill to avoid having a window where we may not have revision visibility data
    WHERE s_revision_timestamp < to_timestamp('2023-07') + INTERVAL 1 MONTH
    AND ( 
s_revision_timestamp >= to_timestamp('2022') AND s_revision_timestamp < to_timestamp('2022') + INTERVAL 1 YEAR )
)

ON  s_wiki_db = t.wiki_db
AND s_revision_id = t.revision_id
-- AND t.wiki_db IN ('simplewiki')
AND ( 
t.revision_timestamp >= to_timestamp('2022') AND t.revision_timestamp < to_timestamp('2022') + INTERVAL 1 YEAR )

WHEN MATCHED AND to_timestamp('2023-07') > t.row_last_update THEN
    UPDATE SET
      t.page_id = s_page_id,
      t.page_namespace = s_page_namespace,
      t.page_title = s_page_title,
      t.page_redirect_title = s_page_redirect_title,
      t.user_id = s_user_id,
      t.user_text = s_user_text,
      t.user_is_visible = COALESCE(s_user_is_visible, TRUE),
      t.revision_id = s_revision_id,
      t.revision_parent_id = s_revision_parent_id,
      t.revision_timestamp = s_revision_timestamp,
      t.revision_is_minor_edit = s_revision_is_minor_edit,
      t.revision_comment = s_revision_comment,
      t.revision_comment_is_visible = COALESCE(s_revision_comment_is_visible, TRUE),
      t.revision_sha1 = s_revision_sha1,                       -- from backfill, revision_sha1 == main slot sha1
      t.revision_size = s_revision_size,                       -- from backfill, revision_size == main slot size
      t.revision_content_slots = s_revision_content_slots,
      t.revision_content_is_visible = COALESCE(s_revision_content_is_visible, TRUE),
      t.wiki_db = s_wiki_db,
      t.row_last_update = to_timestamp('2023-07')
WHEN NOT MATCHED THEN
    INSERT (
        page_id,
        page_namespace,
        page_title,
        page_redirect_title,
        user_id,
        user_text,
        user_is_visible,
        revision_id,
        revision_parent_id,
        revision_timestamp,
        revision_is_minor_edit,
        revision_comment,
        revision_comment_is_visible,
        revision_sha1,
        revision_size,
        revision_content_slots,
        revision_content_is_visible,
        wiki_db,
        row_last_update
    ) VALUES (
      s_page_id,
      s_page_namespace,
      s_page_title,
      s_page_redirect_title,
      s_user_id,
      s_user_text,
      COALESCE(s_user_is_visible, TRUE),
      s_revision_id,
      s_revision_parent_id,
      s_revision_timestamp,
      s_revision_is_minor_edit,
      s_revision_comment,
      COALESCE(s_revision_comment_is_visible, TRUE),
      s_revision_sha1,                                         -- from backfill, revision_sha1 == main slot sha1
      s_revision_size,                                         -- from backfill, revision_size == main slot size
      s_revision_content_slots,
      COALESCE(s_revision_content_is_visible, TRUE),
      s_wiki_db,
      to_timestamp('2023-07')
    )"""
).show(100, truncate=False)

Let's do some math:

The current backfill takes ~7.5h per group per year on recent years. As stated before, mosf of that time is wasted time reading wmf.mediawiki_wikitext_history over and over. This comes to be ~7.5h * 22 years * 4 groups = 660 hours = 27.5 days if run sequentially. Because we run each group in parallel, its actually ~6.8 days using 75% of cluster resources.

But if we have the intermediary table (with schema as in T346281#9170438), we have the following: ~19h (create intermediary table T346281#9170445) + 2.4h * 22 years = ~72 hours = ~3 days ! 🎉

All of this was done by using ~18.4% of the cluster resources (100 executors with 24GB RAM, 2 cores each, and in this case spark.sql.adaptive.coalescePartitions.enabled=true):

# this seems like a winning config!
spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.driver.memory": "32g",
        #"spark.dynamicAllocation.maxExecutors": 64,
        #"spark.executor.memory": "26g",
        #"spark.executor.memoryOverhead": "6g",
        "spark.executor.memory": "20g",
        "spark.executor.memoryOverhead": "4g",
        "spark.executor.cores": 2,
        "spark.sql.shuffle.partitions": 131072,
        "spark.shuffle.io.retryWait": "15s",
        "spark.shuffle.io.maxRetries": "15",
        "spark.network.timeout": "600s",

        # maxResultSize default is 1g, and its giviing us problems with MERGE INTO tasks
        "spark.driver.maxResultSize": "8g",

        # extra settings as per https://wikitech.wikimedia.org/wiki/Data_Engineering/Systems/Cluster/Spark#Executors
        "spark.shuffle.file.buffer": "1m",
        "spark.shuffle.service.index.cache.size": "256m",
        "spark.io.compression.lz4.blockSize": "512KB",

        ##
        # extras to make Iceberg work on 3.3.2:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1", # bumped to 1.3.1 to test hash
        # we need to clear spark.yarn.archive so that we don't pick up old 3.1.2 one
        # for this I commented out spark.yarn.archive from spark-defaults.conf
        # 3.1 Shuffle service is not happy with Spark 3.3, so have to disable it (and dynamic allocation)
        "spark.shuffle.service.enabled": False,
        "spark.dynamicAllocation.enabled": False,
        "spark.executor.instances": 100,

        # extra settings to mmke adaptive query execution work in our favor
        # "spark.sql.adaptive.coalescePartitions.enabled" : False
    }
)

I know they're just computers and they don't have feelings and stuff, but something about this makes me so happy, just picturing free RAM and CPU resources frolicking in the YARN clouds...

just picturing free RAM and CPU resources frolicking in the YARN clouds

Had to doctor your prompt to "Computers dancing in the clouds", but something like this?:

craiyon_161633_Computers_dancing_in_the_clouds.png (1×1 px, 676 KB)

https://www.craiyon.com/image/mO7HDHJDSt-7ucQ5qvJucg

Hmmm... no my prompt for that would be something more like "in the theme of Tron crossed with Lawnmower man but replacing any sadness or darkness with joy and dance"

xcollazo set the point value for this task to 5.Sep 19 2023, 1:44 PM

xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/497

Draft: Use an intermediate table when backfilling wmf_dumps.wikitext_raw_rc1.

All right! This is now running in production!

Will update in a couple days with our findings.

Findings:

  • Out of the 23 years, 15 backfills were successful, 8 failed after multiple attempts.
  • There are two types of errors. One is related to an outage of Archiva. These were sporadic and transient. The other type of error is related to MERGE failures due to source data quality issues. Will expand on each type of error below.

Archiva outage:

Because we use Spark 3.2 for the backfill, we created a custom conda environment. To avoid complicating matters even more, I am using Spark's ivy mechanism to pull one dependency at runtime: iceberg-spark-runtime-3.3_2.12-1.2.1.jar. This is done using our Archiva instance as a proxy. This suddenly started failing with:

sudo -u analytics yarn logs -appOwner analytics -applicationId application_1694521537759_58660
...
:::: ERRORS
	SERVER ERROR: Server Error url=https://archiva.wikimedia.org/repository/mirrored/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.2.1/iceberg-spark-runtime-3.3_2.12-1.2.1.pom

	SERVER ERROR: Server Error url=https://archiva.wikimedia.org/repository/mirrored/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.2.1/iceberg-spark-runtime-3.3_2.12-1.2.1.jar
...

And effectively, when I tried to pull the artifact manually I got:

√ ~ % curl https://archiva.wikimedia.org/repository/mirrored/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.2.1/iceberg-spark-runtime-3.3_2.12-1.2.1.pom
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1"/>
<title>Error 500 Server Error</title>
</head>
<body><h2>HTTP ERROR 500</h2>
<p>Problem accessing /repository/mirrored/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.2.1/iceberg-spark-runtime-3.3_2.12-1.2.1.pom. Reason:
<pre>    Server Error</pre></p><h3>Caused by:</h3><pre>java.lang.RuntimeException: /var/cache/archiva/temp2871579391051196831: No space left on device
	at org.apache.archiva.proxy.DefaultRepositoryProxyConnectors.createWorkingDirectory(DefaultRepositoryProxyConnectors.java:1078)
...

I've reported this to the SRE team on Slack. As of this writing, a temporary fix has been applied.

MERGE failures:

The merge stacks look as follows:

sudo -u analytics yarn logs -appOwner analytics -applicationId application_1694521537759_62173 | grep -v INFO
...
py4j.protocol.Py4JJavaError: An error occurred while calling o79.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2165 in stage 13.0 failed 4 times, most recent failure: Lost task 2165.3 in stage 13.0 (TID 7534) (analytics1073.eqiad.wmnet executor 45): org.apache.spark.SparkException: The ON search condition of the MERGE statement matched a single row from the target table with multiple rows of the source table. This could result in the target row being operated on more than once with an update or delete operation and is not allowed.
	at org.apache.spark.sql.execution.datasources.v2.MergeRowsExec.processRowWithCardinalityCheck$1(MergeRowsExec.scala:163)
...

Our MERGE looks as follows:

MERGE INTO {target_table} t
USING (
    SELECT *
    FROM {intermediate_table}
    -- time bound backfill to avoid having a window where we may not have revision visibility data
    WHERE s_revision_timestamp < to_timestamp('{snapshot}') + INTERVAL 1 MONTH
    AND {generate_timestamp_predicate("s_revision_timestamp", years)}
)

ON  s_wiki_db = t.wiki_db
AND s_revision_id = t.revision_id
AND {generate_timestamp_predicate("t.revision_timestamp", years)}

WHEN MATCHED AND to_timestamp('{snapshot}') > t.row_last_update THEN
...

Considering the ON clause above, this suggests that we have a source data quality issue with the incoming data from wmf.mediawiki_wikitext_history where (wiki_db, revision_id) tuples are not unique as assumed. I will investigate this now.

I have confirmed that indeed we have some (wiki_db, revision_id) tuples that are duplicated. Here is an example that is easy to calculate since by year 2004 there was not much data:

select s_wiki_db, s_revision_id, count(1) as count
from tmp.tmp_dumps_merge_backfill_to_wikitext_raw_20230701
where s_revision_timestamp >= to_timestamp('2004')
and  s_revision_timestamp < to_timestamp('2005')
group by s_wiki_db, s_revision_id
having count(1) > 1

+---------+-------------+-----+
|s_wiki_db|s_revision_id|count|
+---------+-------------+-----+
|   enwiki|     13123563|    2|
|   enwiki|      6884215|    2|
|   enwiki|      6883838|    2|
|   enwiki|      6884201|    2|
+---------+-------------+-----+

Here is a drill down of the offending revision_ids:

select s_wiki_db, s_revision_id, s_revision_parent_id, s_page_id, s_user_id, s_revision_timestamp, s_revision_sha1
from tmp.tmp_dumps_merge_backfill_to_wikitext_raw_20230701
where s_revision_timestamp >= to_timestamp('2004')
and  s_revision_timestamp < to_timestamp('2005')
and s_wiki_db = 'enwiki'
and s_revision_id = 13123563

+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+
|s_wiki_db|s_revision_id|s_revision_parent_id|s_page_id|s_user_id|s_revision_timestamp|s_revision_sha1                |
+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+
|enwiki   |13123563     |6884215             |1106422  |13800    |2004-10-26 23:17:08 |kutq8isyomg408cqdygxosjptgv8r00|
|enwiki   |13123563     |6884215             |57286698 |13800    |2004-10-26 23:17:08 |kutq8isyomg408cqdygxosjptgv8r00|
+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+



select s_wiki_db, s_revision_id, s_revision_parent_id, s_page_id, s_user_id, s_revision_timestamp, s_revision_sha1
from tmp.tmp_dumps_merge_backfill_to_wikitext_raw_20230701
where s_revision_timestamp >= to_timestamp('2004')
and  s_revision_timestamp < to_timestamp('2005')
and s_wiki_db = 'enwiki'
and s_revision_id = 6884215

+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+
|s_wiki_db|s_revision_id|s_revision_parent_id|s_page_id|s_user_id|s_revision_timestamp|s_revision_sha1                |
+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+
|enwiki   |6884215      |6884201             |1106422  |13800    |2004-10-26 23:16:23 |0cqof88wc97z2l7nezaar4aw9ypbwst|
|enwiki   |6884215      |6884201             |57286698 |13800    |2004-10-26 23:16:23 |0cqof88wc97z2l7nezaar4aw9ypbwst|
+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+



select s_wiki_db, s_revision_id, s_revision_parent_id, s_page_id, s_user_id, s_revision_timestamp, s_revision_sha1
from tmp.tmp_dumps_merge_backfill_to_wikitext_raw_20230701
where s_revision_timestamp >= to_timestamp('2004')
and  s_revision_timestamp < to_timestamp('2005')
and s_wiki_db = 'enwiki'
and s_revision_id = 6883838

+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+
|s_wiki_db|s_revision_id|s_revision_parent_id|s_page_id|s_user_id|s_revision_timestamp|s_revision_sha1                |
+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+
|enwiki   |6883838      |-1                  |1106422  |-1       |2004-10-26 22:59:23 |fwwkr1vcaz89uqk6bmxsqg80n536sad|
|enwiki   |6883838      |-1                  |57286698 |-1       |2004-10-26 22:59:23 |fwwkr1vcaz89uqk6bmxsqg80n536sad|
+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+



select s_wiki_db, s_revision_id, s_revision_parent_id, s_page_id, s_user_id, s_revision_timestamp, s_revision_sha1
from tmp.tmp_dumps_merge_backfill_to_wikitext_raw_20230701
where s_revision_timestamp >= to_timestamp('2004')
and  s_revision_timestamp < to_timestamp('2005')
and s_wiki_db = 'enwiki'
and s_revision_id = 6884201

+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+
|s_wiki_db|s_revision_id|s_revision_parent_id|s_page_id|s_user_id|s_revision_timestamp|s_revision_sha1                |
+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+
|enwiki   |6884201      |6883838             |1106422  |-1       |2004-10-26 23:00:52 |7yytbakswgc26dgvu1dnc63qkozly6q|
|enwiki   |6884201      |6883838             |57286698 |-1       |2004-10-26 23:00:52 |7yytbakswgc26dgvu1dnc63qkozly6q|
+---------+-------------+--------------------+---------+---------+--------------------+-------------------------------+

After discussions with @JAllemandou, we figured that these kind of situations may happen due to database mutations that occurred while we were dumping the data (in the current Dumps process). Locking the database while dumping is not an option. @JAllemandou suggests that, for these cases, we make the tuple with highest page_id win, perhaps with a window function similar to the one we use for when merging in data from events.

Recent data from the wmf_raw.mediawiki_revision table agrees with choosing the higher page_id as the winner:

presto> select rev_id, wiki_db, rev_page, rev_user, rev_sha1 from wmf_raw.mediawiki_revision where snapshot = '2023-08' and wiki_db = 'enwiki' and rev_id IN (6883838, 6884201, 6884215, 13123563);
  rev_id  | wiki_db | rev_page | rev_user |            rev_sha1             
----------+---------+----------+----------+---------------------------------
 13123563 | enwiki  | 57286698 | NULL     | kutq8isyomg408cqdygxosjptgv8r00 
  6883838 | enwiki  | 57286698 | NULL     | fwwkr1vcaz89uqk6bmxsqg80n536sad 
  6884201 | enwiki  | 57286698 | NULL     | 7yytbakswgc26dgvu1dnc63qkozly6q 
  6884215 | enwiki  | 57286698 | NULL     | 0cqof88wc97z2l7nezaar4aw9ypbwst 
(4 rows)

So, I will attempt this with an eye on the performance of that window function.

Archiva outage:
...
I've reported this to the SRE team on Slack. As of this writing, a temporary fix has been applied.

For completeness, we now have T347343 to investigate root cause for that outage.

Copy pasting from mediawiki-content-dump/-/merge_requests/14, for completeness:

In T346281#9195431, we figured out that the current dumps process can inadvertently publish duplicate (wiki_db, revision_id) tuples. Presumably this happens since that process cannot lock the database when reading the data. These duplicates are rare: On the 2023-07 XML dump, year 2004 has 4 duplicates, while year 2019 has 48.

These duplicates trip the backfill though, as the MERGE mechanism is expecting 1:1 matches at most. In this MR we solve this issue by deduplicating the intermediate table on read. A first attempt on de-duplicating on write with a ROW_NUMBER window function got us into an expensive SORT node for all data. But since the duplicate data is very small, by doing it on read, we can do the following trick:

  1. Figure out the (wiki_db, revision_id) tuples that are duplicated with a separate query with a GROUP BY and a HAVING clause (Price: shuffle plus a hashed group read. Couple minutes.)
  2. Then collect() those tuples, jointly with their TIMESTAMPs.
  3. Now build a query that finds these duplicated tuples and then do a ROW_NUMBER window function on them, keeping only the ones we want to remove. Because we collected the TIMESTAMPs, and our intermediate table is partitioned by that field, this query reads very little data, and thus keeps the ROW_NUMBER and underlying SORT price very contained, a minute or two.
  4. Now take (3) and Broadcast LEFT ANTI JOIN it to the original data.

I tested this mechanism on 2004 (little data) and 2019 (lots of data) and its working great.

(Kudos to @Milimetric for the idea to avoid the write time sort!)

Query plan with these changes. Looks scary, but is all good:

+- == Initial Plan ==
   Sort (46)
   +- Exchange (45)
      +- Project (44)
         +- MergeRows (43)
            +- Sort (42)
               +- SortMergeJoin FullOuter (41)
                  :- Sort (26)
                  :  +- Exchange (25)
                  :     +- Project (24)
                  :        +- Project (23)
                  :           +- BatchScan (22)
                  +- Sort (40)
                     +- Exchange (39)
                        +- Project (38)
                           +- BroadcastHashJoin LeftAnti BuildRight (37)
                              :- Project (27)
                              :  +- BatchScan (7)
                              +- BroadcastExchange (36)
                                 +- Project (35)
                                    +- Filter (34)
                                       +- Window (33)
                                          +- Sort (32)
                                             +- Exchange (31)
                                                +- Project (30)
                                                   +- Filter (29)
                                                      +- BatchScan (28)

xcollazo opened https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/505

Draft: Bump mediawiki-content-dump artifact to pickup *second* deduplication fix.

xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/505

Bump mediawiki-content-dump artifact to pickup *second* deduplication fix.

(Kudos to @Milimetric for the idea to avoid the write time sort!)

+1000 :)

Ok folks we have a fully backfilled wmf_dumps.wikitext_raw_rc1 table!

Here is runtime data, without considering the failures that we fixed on the way:

TaskTime
wait_for_data_in_mw_wikitext_history0:00:03
wait_for_data_in_raw_mediawiki_revision0:00:04
spark_create_intermediate_table18:05:18
spark_backfill_merge_into_20010:01:23
spark_backfill_merge_into_20020:03:25
spark_backfill_merge_into_20030:06:47
spark_backfill_merge_into_20040:11:37
spark_backfill_merge_into_20050:31:24
spark_backfill_merge_into_20060:55:03
spark_backfill_merge_into_20071:16:07
spark_backfill_merge_into_20081:24:49
spark_backfill_merge_into_20090:46:01
spark_backfill_merge_into_20100:43:58
spark_backfill_merge_into_20111:36:13
spark_backfill_merge_into_20120:49:18
spark_backfill_merge_into_20130:48:45
spark_backfill_merge_into_20140:46:30
spark_backfill_merge_into_20152:58:49
spark_backfill_merge_into_20161:15:22
spark_backfill_merge_into_20171:31:38
spark_backfill_merge_into_20181:41:19
spark_backfill_merge_into_20194:08:39
spark_backfill_merge_into_20202:04:40
spark_backfill_merge_into_20212:22:15
spark_backfill_merge_into_20222:16:11
spark_backfill_merge_into_20231:47:22
drop_intermediate_table0:00:25

Total time: 48:13:25

There is one more fix to be made: the drop_intermediate_table failed on first try. The table metadata from hive was gone, put the data was still there (and we have the PURGE clause which should delete the data). Need to investigate why this happened.

The merge requests above take care of the remaining issues for this effort. Closing.