Page MenuHomePhabricator

Another instance of duplicate rows on wmf_content.mediawiki_content_history_v1
Closed, ResolvedPublic

Description

@BTullis reports:

[There is an] Ops Week issue around mw_content_merge_events_to_mw_content_history_daily.spark_process_events
The error messages from the yarn logs don't mean much to me and they're also gigantic.
I'm using this to filter out the INFO level messages: yarn logs -applicationId application_1757517622464_163684|grep -v INFO|less
There are several errors shown, but possibly the most meaningful is:

Caused by: org.apache.spark.SparkException: The ON search condition of the MERGE statement matched a single row from the target table with multiple rows of the source table. This could result in the target row being operated on more than once with an update or delete operation and is not allowed.

Similar to:
T397525: Duplicate rows on wmf_content.mediawiki_content_history_v1 after big reconcile
and
T388715: Investigate and fix duplicate data on wmf_content.mediawiki_content_history_v1 for muswiki

Event Timeline

xcollazo changed the task status from Open to In Progress.Sep 18 2025, 2:06 PM
xcollazo triaged this task as High priority.
xcollazo updated the task description. (Show Details)

Upon close inspection, this is indeed a duplicate rows situation similar to T397525.

spark.sql("""
SELECT count(1) as count FROM (
  SELECT count(1) as count,
         wiki_id,
         revision_id
  FROM wmf_content.mediawiki_content_history_v1
  GROUP BY wiki_id, revision_id
  HAVING count > 1
)
""").show(300, truncate=False)
[Stage 82:===================================================>(1023 + 1) / 1024]
+-----+
|count|
+-----+
|1832 |
+-----+

Distribution:

spark.sql("""
SELECT count(1) as count,
       wiki_id
FROM (
    SELECT
         count(1) as count,
         wiki_id,
         revision_id
    FROM wmf_content.mediawiki_content_history_v1
    GROUP BY wiki_id, revision_id
    HAVING count > 1
)
GROUP BY wiki_id
ORDER BY count DESC
""").show(3000, truncate=False)

+-----+-------------+
|count|wiki_id      |
+-----+-------------+
|1181 |enwiki       |
|161  |zhwiki       |
|116  |hiwiki       |
|56   |hewiki       |
|31   |commonswiki  |
|25   |eswiki       |
|21   |idwiki       |
|19   |elwiki       |
|18   |etwiki       |
|17   |fawiki       |
|16   |viwiki       |
|13   |srwiki       |
|12   |azwiki       |
|11   |enwiktionary |
|10   |trwiki       |
|10   |bnwiki       |
|10   |arwiki       |
|9    |metawiki     |
|7    |cawiki       |
|7    |itwiki       |
|6    |trwikibooks  |
|6    |mediawikiwiki|
|5    |kowiki       |
|5    |dewiki       |
|5    |frwiki       |
|5    |pawiki       |
|4    |brwiki       |
|4    |ukwiki       |
|4    |ptwiki       |
|3    |ruwiki       |
|3    |plwiki       |
|2    |rowiki       |
|2    |nlwiki       |
|2    |cswiki       |
|2    |kowikisource |
|2    |itwikibooks  |
|2    |uzwiki       |
|2    |labswiki     |
|2    |pswiki       |
|1    |skwiki       |
|1    |specieswiki  |
|1    |simplewiki   |
|1    |arwikisource |
|1    |ruwikisource |
|1    |ruwiktionary |
|1    |siwiktionary |
|1    |iswiki       |
|1    |itwikiquote  |
|1    |kuwiki       |
|1    |kuwiktionary |
|1    |svwiki       |
|1    |cebwiki      |
|1    |nlwikisource |
|1    |bnwikiquote  |
|1    |arzwiki      |
+-----+-------------+

Similar to T397525#10935336, we attempted the below query to unblock this pipeline, but it failed. Over at T397525#10935336, we had not detected issues in the 3 biggest wikis, thus the data was much smaller. Here we have hits on them:

hostname -f
an-launcher1002.eqiad.wmnet

sudo -u analytics bash

screen -S xcollazo_mw_content_history_fix_T404975

spark3-sql \
--master yarn \
--conf spark.driver.maxResultSize=8G \
--conf spark.dynamicAllocation.maxExecutors=256 \
--conf spark.sql.shuffle.partitions=2048 \
--conf spark.sql.iceberg.locality.enabled=true \
--conf spark.reducer.maxReqsInFlight=1 \
--conf spark.shuffle.io.retryWait=180s \
--conf spark.shuffle.io.maxRetries=10  \
--executor-cores 2 \
--executor-memory 16G \
--driver-cores 4 \
--driver-memory 16G \
--name xcollazo_mw_content_history_fix_T404975




MERGE INTO wmf_content.mediawiki_content_history_v1 t
USING(
    SELECT /*+ BROADCASTJOIN(br) */
           s.wiki_id,
           s.revision_id,
           row_move_update_dt,
           row_number() over ( PARTITION BY s.wiki_id, s.revision_id ORDER BY s.row_move_update_dt ASC NULLS FIRST) AS row_num
    FROM wmf_content.mediawiki_content_history_v1 s
    INNER JOIN (
      SELECT wiki_id,
             revision_id
      FROM wmf_content.mediawiki_content_history_v1
      GROUP BY wiki_id, revision_id
      HAVING count(1) > 1
    ) br ON (
           s.wiki_id = br.wiki_id
           AND s.revision_id = br.revision_id
         )
) s

ON  t.wiki_id = s.wiki_id
AND t.revision_id = s.revision_id
AND t.row_move_update_dt = s.row_move_update_dt
AND s.row_num = 1

I am now going to attempt to run the s query manually, capture the ~1300 revisions that need deletion, and submit a DELETE instead of a MERGE INTO.

Here is the script we put together to fix this:

offending_rows = spark.sql("""
    SELECT * FROM (
        SELECT /*+ BROADCASTJOIN(br) */
               s.wiki_id,
               s.revision_id,
               s.page_id,
               s.row_content_update_dt,
               s.row_visibility_update_dt,
               s.row_move_update_dt,
               row_number() over ( PARTITION BY s.wiki_id, s.revision_id ORDER BY s.row_move_update_dt ASC NULLS FIRST) AS row_num
        FROM wmf_content.mediawiki_content_history_v1 s
        INNER JOIN (
          SELECT wiki_id,
                 revision_id
          FROM wmf_content.mediawiki_content_history_v1
          GROUP BY wiki_id, revision_id
          HAVING count(1) > 1
        ) br ON (
               s.wiki_id = br.wiki_id
               AND s.revision_id = br.revision_id
             )
    )
    WHERE row_num = 1
""").collect()


len(offending_rows)
1832

def dt_equals_or_null(dt):
    return "= '" + str(dt) + "'" if dt else "IS NULL"


ored_rows = '\nOR\n'.join(
    [f"(wiki_id = '{row['wiki_id']}' "
     f"AND revision_id = {row['revision_id']} "
     f"AND page_id = {row['page_id']} "
     f"AND row_content_update_dt {dt_equals_or_null(row['row_content_update_dt'])} "
     f"AND row_visibility_update_dt {dt_equals_or_null(row['row_visibility_update_dt'])} "
     f"AND row_move_update_dt {dt_equals_or_null(row['row_move_update_dt'])} )"
     for row in offending_rows
    ]
)

print(ored_rows[:500])
(wiki_id = 'arwiki' AND revision_id = 10845367 AND page_id = 356359 AND row_content_update_dt = '2024-07-01 00:00:00' AND row_visibility_update_dt = '2024-07-01 00:00:00' AND row_move_update_dt IS NULL )
OR
(wiki_id = 'arwiki' AND revision_id = 17267976 AND page_id = 2396830 AND row_content_update_dt = '2024-07-01 00:00:00' AND row_visibility_update_dt = '2024-07-01 00:00:00' AND row_move_update_dt IS NULL )
OR
(wiki_id = 'arwiki' AND revision_id = 45359749 AND page_id = 7367444 AND row_content_


print(f"""
DELETE
FROM wmf_content.mediawiki_content_history_v1
WHERE (
{ored_rows}
)
""")

This last print() generated a quite long DELETE statement:

DELETE
FROM wmf_content.mediawiki_content_history_v1
WHERE (
(wiki_id = 'arwiki' AND revision_id = 10845367 AND page_id = 356359 AND row_content_update_dt = '2024-07-01 00:00:00' AND row_visibility_update_dt = '2024-07-01 00:00:00' AND row_move_update_dt IS NULL )
OR
(wiki_id = 'arwiki' AND revision_id = 17267976 AND page_id = 2396830 AND row_content_update_dt = '2024-07-01 00:00:00' AND row_visibility_update_dt = '2024-07-01 00:00:00' AND row_move_update_dt IS NULL )
)
...

We successfully ran this:

(Query in YARN: https://yarn.wikimedia.org/proxy/application_1757517622464_210186/SQL/execution/?id=0)

hostname -f
an-launcher1002.eqiad.wmnet

sudo -u analytics bash

screen -S xcollazo_mw_content_history_fix_T404975

spark3-sql \
--master yarn \
--conf spark.driver.maxResultSize=8G \
--conf spark.dynamicAllocation.maxExecutors=200 \
--conf spark.sql.shuffle.partitions=2048 \
--conf spark.sql.iceberg.locality.enabled=true \
--conf spark.reducer.maxReqsInFlight=1 \
--conf spark.shuffle.io.retryWait=180s \
--conf spark.shuffle.io.maxRetries=10  \
--executor-cores 2 \
--executor-memory 16G \
--driver-cores 4 \
--driver-memory 16G \
--name xcollazo_mw_content_history_fix_T404975




DELETE
FROM wmf_content.mediawiki_content_history_v1
WHERE (
(wiki_id = 'arwiki' AND revision_id = 10845367 AND page_id = 356359 AND row_content_update_dt = '2024-07-01 00:00:00' AND row_visibility_update_dt = '2024-07-01 00:00:00' AND row_move_update_dt IS NULL )
OR
(wiki_id = 'arwiki' AND revision_id = 17267976 AND page_id = 2396830 AND row_content_update_dt = '2024-07-01 00:00:00' AND row_visibility_update_dt = '2024-07-01 00:00:00' AND row_move_update_dt IS NULL )
...
Response code
Time taken: 3476.811 second

Verification:

# after fix
spark.sql("""
SELECT count(1) as count FROM (
  SELECT count(1) as count,
         wiki_id,
         revision_id
  FROM wmf_content.mediawiki_content_history_v1
  GROUP BY wiki_id, revision_id
  HAVING count > 1
)
""").show(300, truncate=False)
[Stage 2:====================================================>(1023 + 1) / 1024]
+-----+
|count|
+-----+
|0    |
+-----+

But of course, some of these duplicates made their way downstream to wmf_content.mediawiki_content_current_v1:

spark.sql("""
SELECT count(1) as count FROM (
  SELECT count(1) as count,
         wiki_id,
         revision_id
  FROM wmf_content.mediawiki_content_current_v1
  GROUP BY wiki_id, revision_id
  HAVING count > 1
)
""").show(300, truncate=False)

+-----+
|count|
+-----+
|28   |
+-----+

Thus we need a similar fix here.

xcollazo renamed this task from MW Content ingest fails with MERGE INTO error to Another instance of duplicate rows on wmf_content.mediawiki_content_history_v1.Sep 19 2025, 4:56 PM

Sample of duplicates from wmf_content.mediawiki_content_current_v1:

spark.sql("""
    SELECT * FROM (
        SELECT /*+ BROADCASTJOIN(br) */
               s.wiki_id,
               s.revision_id,
               s.page_id,
               s.row_update_dt,
               row_number() over ( PARTITION BY s.wiki_id, s.revision_id ORDER BY s.row_update_dt ASC NULLS FIRST) AS row_num
        FROM wmf_content.mediawiki_content_current_v1 s
        INNER JOIN (
          SELECT wiki_id,
                 revision_id
          FROM wmf_content.mediawiki_content_current_v1
          GROUP BY wiki_id, revision_id
          HAVING count(1) > 1
        ) br ON (
               s.wiki_id = br.wiki_id
               AND s.revision_id = br.revision_id
             )
    )
""").show()

+-----------+-----------+---------+--------------------+-------+
|    wiki_id|revision_id|  page_id|       row_update_dt|row_num|
+-----------+-----------+---------+--------------------+-------+
|     arwiki|   68276419|  9735403|2024-10-26 09:50:...|      1|
|     arwiki|   68276419|  9235719|2025-06-18 17:49:...|      2|
|commonswiki|  700872508| 34068293|2025-01-14 03:14:...|      1|
|commonswiki|  700872508|167201187|2025-06-18 18:22:...|      2|
|commonswiki| 1043301443|167502231|2025-06-12 22:47:...|      1|
|commonswiki| 1043301443|167502338|2025-06-16 15:44:...|      2|
|commonswiki| 1073549672|  7708380|2025-08-17 09:44:...|      1|
|commonswiki| 1073549672|172918917|2025-08-18 05:55:...|      2|
|     cswiki|   25090454|   927774|2025-07-29 21:14:...|      1|
|     cswiki|   25090454|  2009446|2025-07-30 13:00:...|      2|
|     dewiki|  255857462| 13481514|2025-05-09 10:16:...|      1|
|     dewiki|  255857462| 13509388|2025-06-18 18:34:...|      2|
|     dewiki|  256431468| 13481459|2025-05-28 13:29:...|      1|
|     dewiki|  256431468| 13509385|2025-06-18 18:34:...|      2|
|     enwiki| 1158447956| 72932120| 2024-07-01 00:00:00|      1|
|     enwiki| 1158447956| 79696269|2025-05-01 22:13:...|      2|
|     enwiki| 1229399139| 73485852| 2024-07-01 00:00:00|      1|
|     enwiki| 1229399139| 80227817|2025-06-18 19:11:...|      2|
|     enwiki| 1254014956| 77440006|2024-10-29 01:10:...|      1|
|     enwiki| 1254014956| 80013387|2025-06-01 19:06:...|      2|
+-----------+-----------+---------+--------------------+-------+
only showing top 20 rows

We will thus remove duplicates by the s.row_update_dt ASC NULLS FIRST heuristic, meaning we will remove the oldest row.

This is how we defined the DELETE statement for wmf_content.mediawiki_content_current_v1:

offending_current_rows = spark.sql("""
    SELECT * FROM (
        SELECT /*+ BROADCASTJOIN(br) */
               s.wiki_id,
               s.revision_id,
               s.page_id,
               s.row_update_dt,
               row_number() over ( PARTITION BY s.wiki_id, s.revision_id ORDER BY s.row_update_dt ASC NULLS FIRST) AS row_num
        FROM wmf_content.mediawiki_content_current_v1 s
        INNER JOIN (
          SELECT wiki_id,
                 revision_id
          FROM wmf_content.mediawiki_content_current_v1
          GROUP BY wiki_id, revision_id
          HAVING count(1) > 1
        ) br ON (
               s.wiki_id = br.wiki_id
               AND s.revision_id = br.revision_id
             )
    )
    WHERE row_num = 1
""").collect()

len(offending_current_rows)
28

offending_current_rows[0]
Row(wiki_id='arwiki', revision_id=68276419, page_id=9735403, row_update_dt=datetime.datetime(2024, 10, 26, 9, 50, 5, 399231), row_num=1)

def dt_equals_or_null(dt):
    return "= '" + str(dt) + "'" if dt else "IS NULL"


ored_curent_rows = '\nOR\n'.join(
    [f"(wiki_id = '{row['wiki_id']}' "
     f"AND revision_id = {row['revision_id']} "
     f"AND page_id = {row['page_id']} "
     f"AND row_update_dt {dt_equals_or_null(row['row_update_dt'])} )"
     for row in offending_current_rows
    ]
)

print(ored_curent_rows[:500])
(wiki_id = 'arwiki' AND revision_id = 68276419 AND page_id = 9735403 AND row_update_dt = '2024-10-26 09:50:05.399231' )
OR
(wiki_id = 'commonswiki' AND revision_id = 700872508 AND page_id = 34068293 AND row_update_dt = '2025-01-14 03:14:46.883942' )
OR
(wiki_id = 'commonswiki' AND revision_id = 1043301443 AND page_id = 167502231 AND row_update_dt = '2025-06-12 22:47:31.159820' )
OR
(wiki_id = 'commonswiki' AND revision_id = 1073549672 AND page_id = 7708380 AND row_update_dt = '2025-08-17 09:44:4

print(f"""
DELETE
FROM wmf_content.mediawiki_content_current_v1
WHERE (
{ored_curent_rows}
)
""")

DELETE
FROM wmf_content.mediawiki_content_current_v1
WHERE (
(wiki_id = 'arwiki' AND revision_id = 68276419 AND page_id = 9735403 AND row_update_dt = '2024-10-26 09:50:05.399231' )
OR
(wiki_id = 'commonswiki' AND revision_id = 700872508 AND page_id = 34068293 AND row_update_dt = '2025-01-14 03:14:46.883942' )
OR
...

What was run in prod:

hostname -f
an-launcher1002.eqiad.wmnet

sudo -u analytics bash

screen -S xcollazo_mw_content_history_fix_T404975

spark3-sql \
--master yarn \
--conf spark.driver.maxResultSize=8G \
--conf spark.dynamicAllocation.maxExecutors=256 \
--conf spark.sql.shuffle.partitions=2048 \
--conf spark.sql.iceberg.locality.enabled=true \
--conf spark.reducer.maxReqsInFlight=1 \
--conf spark.shuffle.io.retryWait=180s \
--conf spark.shuffle.io.maxRetries=10  \
--executor-cores 2 \
--executor-memory 16G \
--driver-cores 4 \
--driver-memory 16G \
--name xcollazo_mw_content_history_fix_T404975



DESCRIBE EXTENDED wmf_content.mediawiki_content_current_v1;
...
Table Properties        [current-snapshot-id=2035337351179519862,format=iceberg/parquet,write.delete.mode=merge-on-read,write.format.default=parquet,write.merge.mode=merge-on-read,write.metadata.delete-after-commit.enabled=true,write.target-file-size-bytes=134217728,write.update.mode=merge-on-read]
Time taken: 2.703 seconds, Fetched 36 row(s)


# we will run the fix on Iceberg 1.2.1, and that version can't do merge-on-read
ALTER TABLE wmf_content.mediawiki_content_current_v1 SET TBLPROPERTIES (
    'write.delete.mode'='copy-on-write'
);

DELETE
FROM wmf_content.mediawiki_content_current_v1
WHERE (
(wiki_id = 'arwiki' AND revision_id = 68276419 AND page_id = 9735403 AND row_update_dt = '2024-10-26 09:50:05.399231' )
OR
(wiki_id = 'commonswiki' AND revision_id = 700872508 AND page_id = 34068293 AND row_update_dt = '2025-01-14 03:14:46.883942' )
OR
(wiki_id = 'commonswiki' AND revision_id = 1043301443 AND page_id = 167502231 AND row_update_dt = '2025-06-12 22:47:31.159820' )
OR
...
OR
(wiki_id = 'zhwiki' AND revision_id = 88026643 AND page_id = 3522359 AND row_update_dt = '2025-07-01 19:01:29.972005' )
)
Response code
Time taken: 259.575 seconds


# we may want to keep this set to copy-on-write for if we get into this situation again....
# still, for now, reverting to previous state
ALTER TABLE wmf_content.mediawiki_content_current_v1 SET TBLPROPERTIES (
    'write.delete.mode'='merge-on-read'
);

Verification:

# after fix
spark.sql("""
SELECT count(1) as count FROM (
  SELECT count(1) as count,
         wiki_id,
         revision_id
  FROM wmf_content.mediawiki_content_current_v1
  GROUP BY wiki_id, revision_id
  HAVING count > 1
)
""").show(300, truncate=False)

+-----+
|count|
+-----+
|0    |
+-----+

Pausing all MW Content pipelines.

Enabling them back.

Ingest for mw_content_merge_events_to_mw_content_history_daily__spark_process_events__20250916 now running: https://yarn.wikimedia.org/proxy/application_1757517622464_213385/

Follow up work being done on T410431. Closing this one.