Page MenuHomePhabricator

Tune Reconciliation mechanism to do historic runs (all revisions, all wikis)
Closed, ResolvedPublic

Description

On T368754: Production PySpark job that can run consistency checks for wmf_dumps.wikitext_raw, we developed a mechanism to run consistency checks, but we only tested it for daily runs.

In this task, we want to tune this mechanism, and leverage the work done on T372677: Figure a performant way to read all data from revision table via Spark, to be able to run consistency checks against the full revision history of all wikis.

This likely requires:

Details

Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Hotfix: Change reconcile source event table, fix sensor.repos/data-engineering/airflow-dags!989xcollazohotfix-change-merge-targetmain
Add DAG to do monthly historic reconcile.repos/data-engineering/airflow-dags!981xcollazoT377852-optimize-reconcile-for-historic-runsmain
Optimize code to be able to handle historic runs.repos/data-engineering/mediawiki-content-pipelines!52xcollazodo-historic-reconcilemain
Customize query in GitLab

Event Timeline

After those are successfully, we also need to tune the work done on T368755: Python job that reads from wmf_dumps.wikitext_inconsistent_row and produced reconciliation events. to be able to emit the events successfully to EventGate.

Actually, if we pickup T374341, we should be able to avoid EventGate all together, and have one less point of tuning / failure.

xcollazo changed the task status from Open to In Progress.Dec 3 2024, 6:58 PM
xcollazo claimed this task.
xcollazo moved this task from Sprint Backlog to In Process on the Dumps 2.0 (Kanban Board) board.

Ok, first some good news: I am able to successfully do a consistency check on a small wiki like simplewiki, but also a big wiki like enwiki. Here are some stats after a historic run.

simplewiki:

spark.sql("""
SELECT count(1) as count
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04'
  AND computation_class = 'historic'
  AND wiki_db = 'simplewiki'
""").show(100)

+------+
| count|
+------+
|109047|
+------+

spark.sql("""
SELECT count(1) as count, reasons
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04'
  AND computation_class = 'historic'
  AND wiki_db = 'simplewiki'
GROUP BY reasons
ORDER BY count DESC
""").show(100, truncate=False)

+-----+------------------------------------------------------------------------------------+
|count|reasons                                                                             |
+-----+------------------------------------------------------------------------------------+
|60355|[missing_from_source, page_was_deleted]                                             |
|36363|[missing_from_source, page_was_deleted, page_was_restored]                          |
|11792|[missing_from_target]                                                               |
|160  |[missing_from_target, page_was_restored]                                            |
|148  |[mismatch_user_visibility, mismatch_comment_visibility]                             |
|144  |[missing_from_source]                                                               |
|44   |[mismatch_page]                                                                     |
|25   |[mismatch_comment_visibility]                                                       |
|9    |[mismatch_user_visibility, mismatch_comment_visibility, page_was_restored]          |
|2    |[mismatch_comment_visibility, page_was_restored]                                    |
|2    |[mismatch_user_visibility, page_was_restored]                                       |
|2    |[mismatch_user_visibility]                                                          |
|1    |[mismatch_user_visibility, mismatch_content_visibility, mismatch_comment_visibility]|
+-----+------------------------------------------------------------------------------------+

So about 100k inconsistencies, most of them page deletes. enwiki presents a similar picture:

spark.sql("""
SELECT count(1) as count
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04'
  AND computation_class = 'historic'
  AND wiki_db = 'enwiki'
  AND revision_timestamp <= TIMESTAMP '2024-12-04'
""").show(100)

+-------+
|  count|
+-------+
|9685871|
+-------+

spark.sql("""
SELECT count(1) as count, reasons
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04'
  AND computation_class = 'historic'
  AND wiki_db = 'enwiki'
  AND revision_timestamp <= TIMESTAMP '2024-12-04'
GROUP BY reasons
ORDER BY count DESC
""").show(100, truncate=False)

--------------+
|count  |reasons                                                                                                |
+-------+-------------------------------------------------------------------------------------------------------+
|4761109|[missing_from_source, page_was_deleted, page_was_restored]                                             |
|3962527|[missing_from_source, page_was_deleted]                                                                |
|884063 |[missing_from_target]                                                                                  |
|23975  |[missing_from_target, page_was_restored]                                                               |
|16693  |[missing_from_source]                                                                                  |
|15790  |[mismatch_page]                                                                                        |
|11299  |[mismatch_page, page_was_deleted]                                                                      |
|5117   |[mismatch_user_visibility, mismatch_comment_visibility]                                                |
|2928   |[mismatch_page, page_was_deleted, page_was_restored]                                                   |
|858    |[missing_from_source, page_was_restored]                                                               |
|380    |[mismatch_sha1]                                                                                        |
|379    |[mismatch_content_visibility]                                                                          |
|194    |[mismatch_user_visibility]             
...

So about 10M inconsistencies, again most of them page deletes. This is all great news!

TL;DR: I have had issues checking commonswiki though. The main issue is that I have noticed that we are putting way more strain on the replica than we should. I should be able to work around this by bumping up the num_partitions that we send to our mediawiki-jdbc reader, but still, it would be nice to have a longer term solution.

Longer:

For commonswiki, one of our biggest wikis, we want to run the following query to do historic reconcile:

SELECT `revision_id`,`page_id`,`revision_timestamp`,`rev_timestamp`,`revision_content_is_visible`,`user_is_visible`,`revision_comment_is_visible`,`revision_size`,`revision_sha1` FROM (
         
        SELECT rev_id                          AS revision_id,
               rev_page                        AS page_id,
               CAST(rev_timestamp as DATETIME) AS revision_timestamp,
               rev_timestamp,
               rev_deleted & 1 = 0             AS revision_content_is_visible,
               rev_deleted & 2 = 0             AS user_is_visible,
               rev_deleted & 4 = 0             AS revision_comment_is_visible,
               rev_len                         AS revision_size,
               rev_sha1                        AS revision_sha1
        
         FROM revision FORCE INDEX (rev_timestamp)
         WHERE rev_timestamp between '20000101000000' and '20250101000000'
        ) t  WHERE `rev_timestamp` >= 20050620150000 AND `rev_timestamp` < 20050830231500

Running an EXPLAIN yields:

+------+-------------+----------+-------+---------------+---------------+---------+------+-----------+-----------------------+
| id   | select_type | table    | type  | possible_keys | key           | key_len | ref  | rows      | Extra                 |
+------+-------------+----------+-------+---------------+---------------+---------+------+-----------+-----------------------+
|    1 | SIMPLE      | revision | range | rev_timestamp | rev_timestamp | 14      | NULL | 443329174 | Using index condition |
+------+-------------+----------+-------+---------------+---------------+---------+------+-----------+-----------------------+
1 row in set (0.001 sec)

It is good that we see this as a range on the index, but 443329174 is a lot of rows, in fact, its half the total rows:

MariaDB [commonswiki]> select cardinality
    ->                from information_schema.statistics
    ->               where TABLE_NAME = 'revision'
    ->                 and TABLE_SCHEMA = 'commonswiki'
    ->                 and INDEX_NAME = 'PRIMARY';
+-------------+
| cardinality |
+-------------+
|   886660051 |
+-------------+
1 row in set (0.001 sec)

Which would make this 2005-06-20 to 2005-08-30 date range contain half the rows, but that is not true:

SELECT count(1) as count
FROM revision
WHERE `rev_timestamp` >= 20050620150000 AND `rev_timestamp` < 20050830231500

+--------+
| count  |
+--------+
| 187397 |
+--------+
1 row in set (5 min 21.383 sec)

So effectively, we are reading half the table, and then filtering it with the second WHERE clause before putting it on the wire.

After inspecting the code better, I figured that we are not hitting the index on that second WHERE clause, and that to hit it, we need to send in the predicates as STRINGs:

SELECT `revision_id`,`page_id`,`revision_timestamp`,`rev_timestamp`,`revision_content_is_visible`,`user_is_visible`,`revision_comment_is_visible`,`revision_size`,`revision_sha1` FROM (
...
        ) t  WHERE `rev_timestamp` >= '20050620150000' AND `rev_timestamp` < '20050830231500'       <<<<<<<<<<<<<<< notice how predicates are now literal strings

This yield expected behavior from EXPLAIN:

EXPLAIN SELECT `revision_id`,`page_id`,`revision_timestamp`,`rev_timestamp`,`revision_content_is_visible`,`user_is_visible`,`revision_comment_is_visible`,`revision_size`,`revision_sha1` FROM (
         
        SELECT rev_id                          AS revision_id,
               rev_page                        AS page_id,
               CAST(rev_timestamp as DATETIME) AS revision_timestamp,
               rev_timestamp,
               rev_deleted & 1 = 0             AS revision_content_is_visible,
               rev_deleted & 2 = 0             AS user_is_visible,
               rev_deleted & 4 = 0             AS revision_comment_is_visible,
               rev_len                         AS revision_size,
               rev_sha1                        AS revision_sha1
        
         FROM revision FORCE INDEX (rev_timestamp)
         WHERE rev_timestamp between '20000101000000' and '20250101000000'
        ) t  WHERE `rev_timestamp` >= '20050620150000' AND `rev_timestamp` < '20050830231500'


+------+-------------+----------+-------+---------------+---------------+---------+------+--------+-----------------------+
| id   | select_type | table    | type  | possible_keys | key           | key_len | ref  | rows   | Extra                 |
+------+-------------+----------+-------+---------------+---------------+---------+------+--------+-----------------------+
|    1 | SIMPLE      | revision | range | rev_timestamp | rev_timestamp | 14      | NULL | 574464 | Using index condition |
+------+-------------+----------+-------+---------------+---------------+---------+------+--------+-----------------------+
1 row in set (0.001 sec)

Fixing this will require changes to mediawiki-jdbc, which was originally built on T372677: Figure a performant way to read all data from revision table via Spark.

I am now able to successfully do a consistency check on commonswiki by forcing num_partitions = 256.

The inconsistency counts, however, are way higher than in other wikis, we got 1/3 of the rows inconsistent:

spark.sql("""
SELECT count(1) as count
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04T00:00:00'
  AND computation_class = 'historic'
  AND wiki_db = 'commonswiki'
  AND revision_timestamp <= TIMESTAMP '2024-12-04'
""").show(100)

+---------+
|    count|
+---------+
|375109943|
+---------+

spark.sql("""
SELECT count(1) as count, reasons
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04T00:00:00'
  AND computation_class = 'historic'
  AND wiki_db = 'commonswiki'
  AND revision_timestamp <= TIMESTAMP '2024-12-04'
GROUP BY reasons
ORDER BY count DESC
""").show(100, truncate=False)

+---------+--------------------------------------------------------------------------+
|count    |reasons                                                                   |
+---------+--------------------------------------------------------------------------+
|368735155|[mismatch_size]                                                           |
|3121034  |[missing_from_source, page_was_deleted]                                   |
|1768789  |[missing_from_target]                                                     |
|1247862  |[missing_from_source, page_was_deleted, page_was_restored]                |
|232040   |[mismatch_size, page_was_restored]                                        |
|2696     |[missing_from_target, page_was_restored]                                  |
|748      |[mismatch_page, page_was_deleted, page_was_restored]                      |
|704      |[missing_from_source]                                                     |
|494      |[mismatch_size, mismatch_page, page_was_deleted, page_was_restored]       |
|196      |[mismatch_user_visibility, mismatch_comment_visibility]                   |
|112      |[mismatch_page, page_was_deleted]                                         |
|40       |[mismatch_size, mismatch_page, page_was_deleted]                          |
|37       |[mismatch_user_visibility, mismatch_comment_visibility, mismatch_size]    |
|18       |[mismatch_user_visibility]                                                |
|7        |[mismatch_user_visibility, mismatch_comment_visibility, page_was_restored]|
|6        |[mismatch_comment_visibility]                                             |
|3        |[missing_from_source, page_was_restored]                                  |
|2        |[mismatch_user_visibility, mismatch_size]                                 |
+---------+--------------------------------------------------------------------------+

This time, the category with the vast majority of inconsistencies is mismatch_size. I strongly suspect this is due to MCR, and the fact that the backfilled data would not have the slot mediainfo.

A cursory check confirms this hunch, telling us that it is only older data that has this issue:

spark.sql("""
SELECT max(revision_timestamp)
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04T00:00:00'
  AND computation_class = 'historic'
  AND wiki_db = 'commonswiki'
  AND revision_timestamp <= TIMESTAMP '2024-12-04'
  AND array_contains(reasons, 'mismatch_size')
""").show(100, truncate=False)

+-----------------------+
|max(revision_timestamp)|
+-----------------------+
|2024-07-31 23:50:00    |
+-----------------------+

Verifying for one such offending revision_id, on datalake table we see

spark.sql("""
SELECT revision_size, revision_sha1, map_keys(revision_content_slots)
FROM wmf_dumps.wikitext_raw_rc2
WHERE wiki_db = 'commonswiki'
 AND revision_id = 507816764
""").show(truncate=False)

+-------------+-------------------------------+--------------------------------+
|revision_size|revision_sha1                  |map_keys(revision_content_slots)|
+-------------+-------------------------------+--------------------------------+
|2374         |d2llokg4ojcnnshuilro3broz3kgf7m|[main]                          |
+-------------+-------------------------------+--------------------------------+

And on replica we see:

MariaDB [commonswiki]> select * from revision where rev_id = 507816764;
+-----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+
| rev_id    | rev_page | rev_comment_id | rev_actor | rev_timestamp  | rev_minor_edit | rev_deleted | rev_len | rev_parent_id | rev_sha1                        |
+-----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+
| 507816764 | 23765091 |      155847330 |       951 | 20201030185939 |              0 |           0 |    3113 |     474088345 | d2llokg4ojcnnshuilro3broz3kgf7m |
+-----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+
1 row in set (0.003 sec)

Further:

MariaDB [commonswiki]> select r.rev_id, sr.role_name
    -> from revision r
    -> inner join slots s on r.rev_id = s.slot_revision_id
    -> inner join slot_roles sr on s.slot_role_id = sr.role_id
    -> where rev_id = 507816764;
+-----------+-----------+
| rev_id    | role_name |
+-----------+-----------+
| 507816764 | main      |
| 507816764 | mediainfo |
+-----------+-----------+
2 rows in set (0.001 sec)

Which confirms the suspicion. We will have to brainstorm what do here... Perhaps @gmodena is game to push 375_109_943 rows thru the flink app!

I am now able to successfully do a consistency check on wikidatawiki by forcing num_partitions = 256 as well.

This one has an interesting 'data outage event':

spark.sql("""
SELECT count(1) as count
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04T00:00:00'
  AND computation_class = 'historic'
  AND wiki_db = 'wikidatawiki'
  AND revision_timestamp <= TIMESTAMP '2024-12-04'
""").show(100)

+--------+
|   count|
+--------+
|11892410|
+--------+

spark.sql("""
SELECT count(1) as count, reasons
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04T00:00:00'
  AND computation_class = 'historic'
  AND wiki_db = 'wikidatawiki'
  AND revision_timestamp <= TIMESTAMP '2024-12-04'
GROUP BY reasons
ORDER BY count DESC
""").show(100, truncate=False)

+--------+---------------------------------------------------------------------------------------------------+
|count   |reasons                                                                                            |
+--------+---------------------------------------------------------------------------------------------------+
|10498533|[missing_from_target]                                                                              |
|1236195 |[missing_from_source, page_was_deleted]                                                            |
|154207  |[missing_from_source, page_was_deleted, page_was_restored]                                         |
|2448    |[mismatch_user_visibility, mismatch_comment_visibility]                                            |
|957     |[missing_from_target, page_was_restored]                                                           |
|26      |[mismatch_user_visibility]                                                                         |
|16      |[mismatch_comment_visibility]                                                                      |
|11      |[mismatch_user_visibility, mismatch_content_visibility, mismatch_comment_visibility]               |
|11      |[missing_from_source]                                                                              |
|3       |[mismatch_content_visibility, mismatch_comment_visibility]                                         |
|2       |[mismatch_user_visibility, mismatch_content_visibility, mismatch_comment_visibility, mismatch_sha1]|
|1       |[mismatch_user_visibility, mismatch_comment_visibility, page_was_restored]                         |
+--------+---------------------------------------------------------------------------------------------------+

Notice how the most frequent reason is missing_from_target. Looking at the data, we find that all those missing_from_target are clustered in a set of months:

spark.sql("""
SELECT count(1) as count, date_trunc('MM', revision_timestamp) as month
FROM xcollazo.wikitext_inconsistent_rows_rc2
WHERE computation_dt = TIMESTAMP '2024-12-04T00:00:00'
  AND computation_class = 'historic'
  AND wiki_db = 'wikidatawiki'
  AND revision_timestamp <= TIMESTAMP '2024-12-04'
  AND array_contains(reasons, 'missing_from_target')
GROUP by month
ORDER BY month ASC
""").show(100, truncate=False)

+-------+-------------------+
|count  |month              |
+-------+-------------------+
|3      |2013-03-01 00:00:00|
|1      |2013-05-01 00:00:00|
|1      |2013-06-01 00:00:00|
|2      |2013-10-01 00:00:00|
|1      |2013-11-01 00:00:00|
|1      |2013-12-01 00:00:00|
|1      |2014-01-01 00:00:00|
|1      |2014-02-01 00:00:00|
|1      |2014-03-01 00:00:00|
|1      |2014-05-01 00:00:00|
|9      |2014-07-01 00:00:00|
|48     |2014-09-01 00:00:00|
|1      |2014-11-01 00:00:00|
|3      |2014-12-01 00:00:00|
|5      |2015-04-01 00:00:00|
|2      |2015-06-01 00:00:00|
|2      |2015-07-01 00:00:00|
|3      |2015-09-01 00:00:00|
|2      |2015-10-01 00:00:00|
|1      |2015-12-01 00:00:00|
|2      |2016-03-01 00:00:00|
|1      |2016-04-01 00:00:00|
|2030   |2024-02-01 00:00:00|
|789356 |2024-03-01 00:00:00|
|1637606|2024-04-01 00:00:00|
|2533176|2024-05-01 00:00:00|
|1418370|2024-06-01 00:00:00|
|2005605|2024-07-01 00:00:00|
|1310770|2024-08-01 00:00:00|
|793758 |2024-09-01 00:00:00|
|5986   |2024-10-01 00:00:00|
|2410   |2024-11-01 00:00:00|
|331    |2024-12-01 00:00:00|
+-------+-------------------+

Thus, something weird happened started happening on 2024-02, and we are now at the end of it.

Anyhow, with a total of 11_892_410 inconsistencies, we can totally run this thru the Flink app for reconcile.

[...]

Which confirms the suspicion. We will have to brainstorm what do here... Perhaps @gmodena is game to push 375_109_943 rows thru the flink app!

IIRC we've seen higher volumes when backfilling page_content_change, so I'm game :)
However, I'd warn SRE of a surge in Action API requests from this operation. Could you get some figures on how much of these rows would require fetching
content and redirect info (update and insert changelog kinds)?

FWIW: our Flink HTTP client hits internal routes, advertises as a bot and performs (client side) rate limiting.
We might want to increase task parallelism in Flink to speed things up (and that would translate in an increase of RPS).

@xcollazo on second thought. I'd suggest we wait till we wrap up app/stream renaming and enable HA before before doing an historic reconciliation run.
This is to avoid having to recompute the whole history in case of failures along the way.

cc / @tchin

@xcollazo on second thought. I'd suggest we wait till we wrap up app/stream renaming and enable HA before before doing an historic reconciliation run.
This is to avoid having to recompute the whole history in case of failures along the way.

cc / @tchin

Ok. ETA?

@xcollazo on second thought. I'd suggest we wait till we wrap up app/stream renaming and enable HA before before doing an historic reconciliation run.
This is to avoid having to recompute the whole history in case of failures along the way.

cc / @tchin

Ok. ETA?

mw-content-change-history-enrich is deployed on dse-k8s. The new streams too.
Summary of chat we had last wed:

  • we don't need to wait on HA. It would be nice to test it, but for what you want to do here relying on kafka offsets to handle consumer restarts should suffice.
  • we might need to bump pod resources since there's a chance of OOMs. This is a good excuse to exercise resizing and restarts.

All tested and ready to be deployed tomorrow.

Ran the following:

As hdfs:

sudo -u hdfs bash

hdfs dfs -mkdir /wmf/data/wmf_content
hdfs dfs -chown analytics /wmf/data/wmf_content
hdfs dfs -chgrp analytics-privatedata-users /wmf/data/wmf_content

As analytics:

sudo -u analytics bash

kerberos-run-command analytics spark3-sql


CREATE DATABASE wmf_content;


CREATE TABLE wmf_content.inconsistent_rows_of_mediawiki_content_history_v1 (
    wiki_db                        STRING              COMMENT 'the wiki project',
    page_id                        BIGINT              COMMENT 'foreign key to the page',
    revision_id                    BIGINT              COMMENT 'id of the revision',
    revision_timestamp             TIMESTAMP           COMMENT 'timestamp of the revision',
    reasons                        ARRAY<STRING>       COMMENT 'the set of reasons detected that make us think we need to reconcile this revision.',
    computation_dt                 TIMESTAMP           COMMENT 'the logical time at which this inconsistency was calculated. Useful to see trends over time, and also to be able to delete data efficiently.',
    computation_class              STRING              COMMENT 'One of "last-24h" or "all-of-wiki-time". This segregates between runs that cover one day of inconsistencies as of computation_dt, versus runs that retroactively check all revisions as of computation_dt.',
    reconcile_emit_dt              TIMESTAMP           COMMENT 'the time at which this inconsistency was emitted to the Jumbo instance of Kafka for eventual reconcile. If NULL, it has not been submitted yet.'
)
USING ICEBERG
PARTITIONED BY (wiki_db, computation_class)
TBLPROPERTIES (
    'format-version' = '2',                          -- allow merge-on-read if needed
    'write.format.default' = 'parquet',              -- parquet is currently the only format with min/max stats
    'write.target-file-size-bytes' = '134217728',    -- cap files at 128MB files
    'commit.retry.num-retries' = '10'                -- bump retries from default of 4 due to many concurrent INSERTs
)
COMMENT 'We make checks between wmf_content.mediawiki_content_history_v1 and the Analytics replicas to detect inconsistent rows. If we do detect any, we add them here, to be reconciled, alerted, and analyzed.'
LOCATION '/wmf/data/wmf_content/inconsistent_rows_of_mediawiki_content_history_v1';


ALTER TABLE wmf_content.inconsistent_rows_of_mediawiki_content_history_v1 WRITE ORDERED BY wiki_db, computation_class, computation_dt, revision_timestamp;

Mentioned in SAL (#wikimedia-operations) [2024-12-20T16:47:22Z] <xcollazo@deploy2002> Started deploy [airflow-dags/analytics@8c5744d]: Deploying latest analytics Airflow instance DAGs. T377852.

Mentioned in SAL (#wikimedia-analytics) [2024-12-20T16:48:09Z] <xcollazo> Deploying latest analytics Airflow instance DAGs. T377852.

Mentioned in SAL (#wikimedia-operations) [2024-12-20T16:48:20Z] <xcollazo@deploy2002> Finished deploy [airflow-dags/analytics@8c5744d]: Deploying latest analytics Airflow instance DAGs. T377852. (duration: 00m 58s)

Mentioned in SAL (#wikimedia-operations) [2024-12-20T18:00:39Z] <xcollazo@deploy2002> Started deploy [airflow-dags/analytics@7fecc64]: Pickup hotfix for T377852.

Mentioned in SAL (#wikimedia-operations) [2024-12-20T18:02:42Z] <xcollazo@deploy2002> Finished deploy [airflow-dags/analytics@7fecc64]: Pickup hotfix for T377852. (duration: 02m 03s)

Running the first all-of-wiki-time reconcile, we did fail on one wiki, labswiki, on schema mismatch issues with page_change schema. I've opened T382645, but this issue is unrelated to scale of reconcile.

As expected, wikidatawiki and commonswiki are the only wikis that has taken more than ~3mins to finish the spark_emit_reconcile_events_to_kafka task, due to T377852#10385240.

wikidatawiki took 1.5 hours and is now done. commonswiki has been running for ~7 hours now, and has made this progress:

spark.sql("""
SELECT count(1) as count, wiki_db, reconcile_emit_dt IS NOT NULL as emitted
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND wiki_db = 'commonswiki'
GROUP BY wiki_db, emitted
ORDER BY count DESC
""").show()

+---------+-----------+-------+
|    count|    wiki_db|emitted|
+---------+-----------+-------+
|354805980|commonswiki|  false|
| 18047711|commonswiki|   true|
+---------+-----------+-------+

354805980/18047711 = ~19, so it should continue running for 19*7 hours = 133 hours = ~6 days.

DAG dumps_merge_events_to_wikitext_raw_daily's spark_process_reconciliation_events task failed to process day 2024-12-21 due to the spark.sql.shuffle.partitions=1024, which works ok for day to day reconciliation intake, but is inadequate to reconcile the ~50M rows assigned to this day. I bumped temporarily to spark.sql.shuffle.partitions=102400 to clear that day, and the run is available here.

As expected, wikidatawiki and commonswiki are the only wikis that has taken more than ~3mins to finish the spark_emit_reconcile_events_to_kafka task, due to T377852#10385240.

wikidatawiki took 1.5 hours and is now done. commonswiki has been running for ~7 hours now, and has made this progress:

spark.sql("""
SELECT count(1) as count, wiki_db, reconcile_emit_dt IS NOT NULL as emitted
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND wiki_db = 'commonswiki'
GROUP BY wiki_db, emitted
ORDER BY count DESC
""").show()

+---------+-----------+-------+
|    count|    wiki_db|emitted|
+---------+-----------+-------+
|354805980|commonswiki|  false|
| 18047711|commonswiki|   true|
+---------+-----------+-------+

354805980/18047711 = ~19, so it should continue running for 19*7 hours = 133 hours = ~6 days.

commonswiki has now finished, taking 7d18:57:44.

DAG dumps_merge_events_to_wikitext_raw_daily's spark_process_reconciliation_events task failed to process day 2024-12-21 due to the spark.sql.shuffle.partitions=1024, which works ok for day to day reconciliation intake, but is inadequate to reconcile the ~50M rows assigned to this day. I bumped temporarily to spark.sql.shuffle.partitions=102400 to clear that day, and the run is available here.

This is actually more complicated. I did bump to spark.sql.shuffle.partitions=102400 and also set executor_memory=32G but it continues to fail. This only happens on the 2024-12-21 date, all others, which include the vast majority of the commonswiki reconcile events, finished successfully.

My current speculation is that Spark is unable to shuffle the data efficiently when the reconcile process emits many wikis with many events.

Here is how things look today in between all-of-wiki-time reconcile runs:

spark.sql("""
WITH top_10_wikis_with_inconsistencies AS (
  SELECT wiki_db, count(1) as count_before
  FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
  WHERE computation_class = 'all-of-wiki-time'
    AND computation_dt = '2024-12-01 00:00:00'
  GROUP BY wiki_db
  ORDER BY count_before DESC
  LIMIT 10
),

data_from_wikis_after_first_reconcile AS (
  SELECT tab.wiki_db, count(1) as count_after, FIRST(count_before) AS count_before
  FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1 tab
  JOIN top_10_wikis_with_inconsistencies top ON ( tab.wiki_db = top.wiki_db)
  WHERE computation_class = 'all-of-wiki-time'
    AND computation_dt = '2025-01-01 00:00:00'
    AND tab.wiki_db IN (SELECT wiki_db FROM top_10_wikis_with_inconsistencies)
  GROUP BY tab.wiki_db
  ORDER BY count_after DESC
)

SELECT wiki_db, count_before, count_after, count_after / count_before * 100 AS percentage_remaining
FROM data_from_wikis_after_first_reconcile
ORDER BY count_before DESC
""").show(20,truncate=False)

+------------+------------+-----------+--------------------+
|wiki_db     |count_before|count_after|percentage_remaining|
+------------+------------+-----------+--------------------+
|commonswiki |372853691   |2938936    |0.7882276804388669  |
|wikidatawiki|11891426    |1396978    |11.747775245794744  |
|enwiki      |9679709     |9333612    |96.42451028228226   |
|dewiki      |1948934     |1593256    |81.75012596629747   |
|frwiki      |1150852     |1090792    |94.78125771167795   |
|ruwiki      |996910      |936531     |93.9433850598349    |
|eswiki      |902614      |862409     |95.54571500109681   |
|zhwiki      |731798      |683148     |93.35199057663453   |
|labswiki    |646057      |579890     |89.75833401696754   |
|itwiki      |552517      |534201     |96.68498887817026   |
+------------+------------+-----------+--------------------+

This data supports my speculation, as all these wikis that still have a high percentage of remaining inconsistencies where emitted but failed to be merged into the target table on 2024-12-21. Thus I need to figure out how to make the dumps_merge_events_to_wikitext_raw_monthly spread the events more evenly. Anyhow, the biggest offenders, commonswiki, abd wikidatawiki are almost taken care of.

Ok I want to rerun an all-of-wiki-time job, but the Airflow DAG would rewrite the current computation_dt = '2025-01-01 00:00:00' run information if I do so.

I want to keep that info for comparison, thus, I will UPDATE wmf_content.inconsistent_rows_of_mediawiki_content_history_v1 so that that current run info lives instead under computation_dt = '2024-12-31 00:00:00' with:

UPDATE wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
SET computation_dt = TIMESTAMP '2024-12-31 00:00:00'
WHERE computation_class = 'all-of-wiki-time'
  AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'

Ran the following:

ssh an-launcher1002.eqiad.wmnet
sudo -u analytics bash

kerberos-run-command analytics spark3-sql --driver-cores 8 --driver-memory 32G --master yarn --conf spark.dynamicAllocation.maxExecutors=128 --conf spark.executor.memoryOverhead=3G --conf spark.sql.shuffle.partitions=2048 --executor-memory 16G --executor-cores 2

spark-sql (default)> UPDATE wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
                   > SET computation_dt = TIMESTAMP '2024-12-31 00:00:00'
                   > WHERE computation_class = 'all-of-wiki-time'
                   >   AND computation_dt = TIMESTAMP '2025-01-01 00:00:00';
Response code
Time taken: 209.16 seconds

Confirming everything looks good:

spark-sql (default)> SELECT count(1) as count,
                   >        computation_class,
                   >        computation_dt
                   > FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
                   > WHERE computation_class = 'all-of-wiki-time'
                   > GROUP BY computation_class, computation_dt
                   > ORDER BY computation_dt;
count	computation_class	computation_dt
409715199	all-of-wiki-time	2024-12-01 00:00:00
411021414	all-of-wiki-time	2024-12-31 00:00:00
Time taken: 13.007 seconds, Fetched 2 row(s)

Started manual all-of-wiki-time run at https://airflow-analytics.wikimedia.org/dags/mw_content_reconcile_mw_content_history_monthly/grid?dag_run_id=manual__2025-01-21T19%3A12%3A53.139399%2B00%3A00.

It indeed picked up computation_dt = TIMESTAMP '2025-01-01T00:00:00'.

Marking all of spark_emit_reconcile_events_to_kafka as failed until I get a chance to manually inspect output of spark_consistency_check.

As expected, we can see that:

  • All commonswiki inconsistencies are gone because they were properly ingested.
  • Other wikis with high inconsistencies are still there because we failed to ingest their reconcile events as per T377852#10426892.
spark.sql("""
SELECT count(1) as count,
       wiki_id
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
GROUP BY wiki_id
ORDER BY count DESC
""").show(20)
[Stage 51:===================================================>(1325 + 2) / 1327]
+-------+-------------+
|  count|      wiki_id|
+-------+-------------+
|9070354|       enwiki|
|1583356|       dewiki|
|1431801| wikidatawiki|
|1067187|       frwiki|
| 852422|       ruwiki|
| 845298|       eswiki|
| 678679|       zhwiki|
| 595187|     labswiki|
| 498755|       itwiki|
| 400572|       ptwiki|
| 394258|       arwiki|
| 337737|       trwiki|
| 334756|       azwiki|
| 318882|       plwiki|
| 288733|       jawiki|
| 273310|incubatorwiki|
| 250543|       fawiki|
| 196557|       hewiki|
| 190513|       nlwiki|
| 171788|       ukwiki|
+-------+-------------+
only showing top 20 rows

Looking more closely, we see that the vast majority of these inconsistencies are page_was_deleted events:

spark.sql("""
SELECT count(1) as count_all,
       SUM(IF(ARRAY_CONTAINS(reasons, 'page_was_deleted'), 1, 0)) as count_deletes,
       wiki_id
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
  AND wiki_id IN ('enwiki', 'dewiki', 'wikidatawiki', 'frwiki', 'ruwiki', 'eswiki', 'zhwiki', 'labswiki', 'itwiki', 'ptwiki')
GROUP BY wiki_id
ORDER BY count_all DESC
""").show(truncate=False)
[Stage 57:=====================================================>  (19 + 1) / 20]
+---------+-------------+------------+
|count_all|count_deletes|wiki_id     |
+---------+-------------+------------+
|9070354  |8721817      |enwiki      |
|1583356  |1479626      |dewiki      |
|1431801  |1390371      |wikidatawiki|
|1067187  |1006281      |frwiki      |
|852422   |813632       |ruwiki      |
|845298   |796798       |eswiki      |
|678679   |622074       |zhwiki      |
|595187   |15           |labswiki    |
|498755   |475805       |itwiki      |
|400572   |371483       |ptwiki      |
+---------+-------------+------------+

This agrees with run dumps_merge_events_to_wikitext_raw_daily__spark_process_reconciliation_events__20250107, in which we tried to push down a gigantic amount of page_ids and failed because this effectively meant we'd have to do a full table scan.

It is impractical to tune the spark_process_reconciliation_events job for such a big amount of deletes, especially since we believe that getting ourselves in such a situation again is unlikely.

To fix this issue, I will instead apply the page_was_deleted events manually, on a per wiki basis, at least for the top 10 offenders above.

enwiki:

ssh an-launcher1002.eqiad.wmnet
screen -S manual-removal-of-historic-page-deletes-v2
sudo -u analytics bash

kerberos-run-command analytics spark3-sql --driver-cores 4 --driver-memory 32G --master yarn --conf spark.dynamicAllocation.maxExecutors=200 --conf spark.executor.memoryOverhead=3G --conf spark.sql.shuffle.partitions=5120 --executor-memory 16G --executor-cores 1 --conf spark.sql.autoBroadcastJoinThreshold=209715200


DELETE
FROM wmf_content.mediawiki_content_history_v1
WHERE wiki_id = 'enwiki'
  AND page_id IN (
    SELECT page_id
    FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
    WHERE computation_class = 'all-of-wiki-time'
      AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
      AND wiki_id IN ('enwiki')
      AND ARRAY_CONTAINS(reasons, 'page_was_deleted')
    GROUP BY page_id
    ORDER BY page_id
  )

This does a full partition scan for enwiki, but at least we are broadcasting the page_ids.

Response code
Time taken: 6493.814 seconds

So enwiki took 1.8h.

As in T377852#10486889, I've run this template:

DELETE
FROM wmf_content.mediawiki_content_history_v1
WHERE wiki_id = 'TARGET_WIKI'
  AND page_id IN (
    SELECT page_id
    FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
    WHERE computation_class = 'all-of-wiki-time'
      AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
      AND wiki_id IN ('TARGET_WIKI')
      AND ARRAY_CONTAINS(reasons, 'page_was_deleted')
    GROUP BY page_id
    ORDER BY page_id
  )

For the following wikis:

dewiki
wikidatawiki
frwiki
ruwiki
eswiki
zhwiki
itwiki
ptwiki
trwiki
azwiki
plwiki
jawiki
incubatorwiki
bnwikisource
zhwiki
enwiktionary
trwiki

And as per T377852#10484943, this should have taken care of the biggest offenders.

I am now going to rerun mw_content_reconcile_mw_content_history_monthly's spark_consistency_check, and if it looks much better, which I expect it will, then I will let it do the subsequent spark_emit_reconcile_events_to_kafka for the rest of the events to be taken care of automatically.

Things indeed look much better now:

# run after manual DELETEs
spark.sql("""
SELECT count(1) as count,
       wiki_id
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
GROUP BY wiki_id
ORDER BY count DESC
""").show(20)

+------+--------+
| count| wiki_id|
+------+--------+
|595187|labswiki|
|364328|  enwiki|
|250802|  fawiki|
|196592|  hewiki|
|190791|  nlwiki|
|171888|  ukwiki|
|165096|  kowiki|
|159126|  idwiki|
|157980|metawiki|
|150459|  svwiki|
|137586|  viwiki|
|107752|  srwiki|
|105893|  dewiki|
|104994|  bswiki|
| 87264|  huwiki|
| 82026|  mlwiki|
| 75184|  thwiki|
| 69395|  tawiki|
| 67674|  nowiki|
| 66918|  bnwiki|
+------+--------+
only showing top 20 rows

Although the #1 reason for inconsistencies continue to be deleted pages:

# run after manual DELETEs
spark.sql("""
SELECT count(1) as count,
       reasons
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
GROUP BY reasons
ORDER BY count DESC
""").show(20, truncate=False)

+-------+------------------------------------------------------------------------------------+
|count  |reasons                                                                             |
+-------+------------------------------------------------------------------------------------+
|2279964|[missing_from_source, page_was_deleted]                                             |
|1654665|[missing_from_source, page_was_deleted, page_was_restored]                          |
|1541277|[missing_from_target]                                                               |
|299163 |[missing_from_source]                                                               |
|32285  |[mismatch_page]                                                                     |
|24430  |[missing_from_target, page_was_restored]                                            |
|8495   |[mismatch_user_visibility, mismatch_comment_visibility]                             |
|6844   |[mismatch_content_visibility]                                                       |
|5871   |[mismatch_page, page_was_deleted]                                                   |
|4294   |[missing_from_source, page_was_restored]                                            |
|832    |[mismatch_page, page_was_deleted, page_was_restored]                                |
|815    |[mismatch_comment_visibility]                                                       |
|437    |[mismatch_page, page_was_restored]                                                  |
|426    |[mismatch_user_visibility]                                                          |
|279    |[mismatch_user_visibility, mismatch_content_visibility]                             |
|234    |[mismatch_content_visibility, page_was_restored]                                    |
|230    |[mismatch_user_visibility, mismatch_content_visibility, mismatch_comment_visibility]|
|163    |[mismatch_size, mismatch_page, page_was_deleted, page_was_restored]                 |
|122    |[mismatch_user_visibility, mismatch_comment_visibility, page_was_restored]          |
|27     |[mismatch_size, mismatch_page, page_was_deleted]                                    |
+-------+------------------------------------------------------------------------------------+
only showing top 20 rows

That is still too many deleted pages for comfort, so I am running a MERGE INTO to finish applying these deletes:

wiki_ids_rows = spark.sql("""
SELECT DISTINCT wiki_id
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
  AND ARRAY_CONTAINS(reasons, 'page_was_deleted')
ORDER BY wiki_id
""").collect()

wiki_ids = [r['wiki_id'] for r in wiki_ids_rows]

", ".join([f"'{wiki}'" for wiki in wiki_ids])

I made sure that final string did not include enwiki or commonswiki or wikidatawiki, otherwise the query planning would take forever.

With this, I constructed the following MERGE INTO which is now running:

MERGE INTO wmf_content.mediawiki_content_history_v1 t
USING (
    SELECT DISTINCT wiki_id, page_id
    FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
    WHERE computation_class = 'all-of-wiki-time'
      AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
      AND ARRAY_CONTAINS(reasons, 'page_was_deleted')
    ORDER BY wiki_id, page_id
) s
ON t.wiki_id = s.wiki_id AND t.page_id = s.page_id
   AND t.wiki_id IN ('acewiki', 'aewikimedia', 'afwiki', ... 'zuwiki', 'zuwiktionary')  -- truncated for brevity here
WHEN MATCHED THEN DELETE

The MERGE INTO struggled, but finished successfully:

Response code
Time taken: 6964.789 seconds

Ok, rerunning mw_content_reconcile_mw_content_history_monthly's spark_consistency_check one more time.

Ok we are looking good now:

SELECT count(1) as count_all,
       SUM(IF(ARRAY_CONTAINS(reasons, 'page_was_deleted'), 1, 0)) as count_deletes,
       wiki_id
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
GROUP BY wiki_id
ORDER BY count_all DESC
""").show(20, truncate=False)
+---------+-------------+-------------+
|count_all|count_deletes|wiki_id      |
+---------+-------------+-------------+
|595172   |0            |labswiki     |
|367738   |0            |enwiki       |
|107544   |0            |dewiki       |
|76582    |0            |mlwiki       |
|61557    |0            |frwiki       |
|58923    |0            |zhwiki       |
|58311    |0            |eswiki       |
|40935    |0            |wikidatawiki |
|39360    |0            |ruwiki       |
|30175    |0            |ptwiki       |
|27923    |0            |jawiki       |
|23604    |0            |itwiki       |
|22987    |0            |idwikivoyage |
|22502    |776          |commonswiki  |
|20973    |0            |nlwiki       |
|20419    |0            |kowiki       |
|19731    |0            |arwiki       |
|16969    |0            |incubatorwiki|
|16139    |0            |trwiki       |
|15511    |0            |enwiktionary |
+---------+-------------+-------------+
only showing top 20 rows


spark.sql("""
SELECT count(1) as count,
       reasons
FROM wmf_content.inconsistent_rows_of_mediawiki_content_history_v1
WHERE computation_class = 'all-of-wiki-time'
  AND computation_dt = TIMESTAMP '2025-01-01 00:00:00'
GROUP BY reasons
ORDER BY count DESC
""").show(20, truncate=False)
+-------+------------------------------------------------------------------------------------+
|count  |reasons                                                                             |
+-------+------------------------------------------------------------------------------------+
|1544510|[missing_from_target]                                                               |
|310599 |[missing_from_source]                                                               |
|34436  |[mismatch_page]                                                                     |
|28817  |[missing_from_target, page_was_restored]                                            |
|8495   |[mismatch_user_visibility, mismatch_comment_visibility]                             |
|6938   |[mismatch_content_visibility]                                                       |
|4454   |[missing_from_source, page_was_restored]                                            |
|815    |[mismatch_comment_visibility]                                                       |
|437    |[mismatch_page, page_was_restored]                                                  |
|426    |[mismatch_user_visibility]                                                          |
|298    |[missing_from_source, page_was_deleted, page_was_restored]                          |
|290    |[mismatch_user_visibility, mismatch_content_visibility]                             |
|234    |[mismatch_content_visibility, page_was_restored]                                    |
|230    |[mismatch_user_visibility, mismatch_content_visibility, mismatch_comment_visibility]|
|179    |[mismatch_page, page_was_deleted, page_was_restored]                                |
|163    |[mismatch_size, mismatch_page, page_was_deleted, page_was_restored]                 |
|122    |[mismatch_user_visibility, mismatch_comment_visibility, page_was_restored]          |
|108    |[mismatch_page, page_was_deleted]                                                   |
|27     |[mismatch_size, mismatch_page, page_was_deleted]                                    |
|19     |[mismatch_content_visibility, mismatch_comment_visibility]                          |
+-------+------------------------------------------------------------------------------------+
only showing top 20 rows

Now running spark_emit_reconcile_events_to_kafka.

spark_emit_reconcile_events_to_kafka finished successfully, verifying T382645 along the way.

We now have to wait until mw_content_merge_events_to_mw_content_history_daily's spark_process_reconciliation_events kicks in to see if the merge of the reconciliation events is successful.

spark_emit_reconcile_events_to_kafka finished successfully, verifying T382645 along the way.

We now have to wait until mw_content_merge_events_to_mw_content_history_daily's spark_process_reconciliation_events kicks in to see if the merge of the reconciliation events is successful.

Merge complete! 🎉

For completeness, rerunning spark_consistency_check to check whether all inconsistencies were fixed. If we do find issues still there, we'd fix them elsewhere as this ticket is about tuning and not correctness, but wanted to run it to see what happens.

spark_emit_reconcile_events_to_kafka finished successfully, verifying T382645 along the way.

We now have to wait until mw_content_merge_events_to_mw_content_history_daily's spark_process_reconciliation_events kicks in to see if the merge of the reconciliation events is successful.

Merge complete! 🎉

For completeness, rerunning spark_consistency_check to check whether all inconsistencies were fixed. If we do find issues still there, we'd fix them elsewhere as this ticket is about tuning and not correctness, but wanted to run it to see what happens.

Not all inconsistencies were fixed, but most. I will chase root cause for the remaining inconsistencies elsewhere.

xcollazo moved this task from In Process to Done on the Dumps 2.0 (Kanban Board) board.