Page MenuHomePhabricator

Improve handling of delete, restore, and merge from incremental update
Closed, ResolvedPublic

Description

In T368176: [Dumps 2] Spike: Figure root causes of missing rows when doing reconciliation we found a large chunk of mismatch resulting from not processing page delete, restore, and merge events. This task is to look into our incremental updating airflow jobs (the ones that update wmf_dumps.wikitext_raw) and determine the best way to include the missed delete, restore, and merge.

We figured that a good way to move forward while simplifying the reconcile mechanism is to:

  • Continue having an hourly Spark MERGE INTO job that consumes the page_content_change hive table.
  • Have one additional hourly Spark job that scans page_content_change for deletes, accumulates the page_ids, and applies a single DELETE. A similar mechanism should be used for page moves as well, that would apply an UPDATE rather than a DELETE.
  • Add a new hourly Spark MERGE INTO job that consumes the page_content__late_change hive table. (Will be done on separate ticket, T375077)
    • All remaining inconsistencies should be relatively small, and thus would wait on the reconcile mechanism. Considering that page deletes and page moves would be applied on an hourly basis, the reconcile mechanism can potentially be run less frequently.

Event Timeline

Quick spike to size up how many revisions we're dealing with on a daily basis:

with enwiki_revisions as (
 select rev_page as page_id,
        rev_id as revision_id
   from wmf_raw.mediawiki_revision
  where snapshot='2024-06'
    and wiki_db = 'enwiki'

  union all

 select ar_page_id as page_id,
        ar_rev_id as revision_id
   from wmf_raw.mediawiki_archive
  where snapshot='2024-06'
    and wiki_db = 'enwiki'
),

pages_with_delete_logs as (
 select substring(log_timestamp, 0, 8) as day,
        count(*) as revisions_this_page_refers_to
   from wmf_raw.mediawiki_logging
            inner join
        enwiki_revisions            on log_page = page_id
  where snapshot='2024-06'
    and wiki_db = 'enwiki'
    and log_type = 'delete'
  group by 1
)

 select max(revisions_this_page_refers_to) as most_revisions_deleted_in_one_day,
        min(revisions_this_page_refers_to) as most_revisions_deleted_in_one_day,
        approx_percentile(revisions_this_page_refers_to, array(0.5, 0.75, 0.9)) as p50p75p90_revisions_deleted_in_one_day
   from pages_with_delete_logs
most_revisions_deleted_in_one_day: 54352170
most_revisions_deleted_in_one_day: 821
p50p75p90_revisions_deleted_in_one_day: [1435742,2940518,4664766]

Time taken: 1078.045 seconds, Fetched 1 row(s)

So basically, just for enwiki, we see 1.4 MILLION revisions shuffled around as part of some delete operation, and that goes up to 3 million 25% of the time and tops out over 5 million. Just for enwiki.

Based on this, I don't think we could send these into a literal where revision_id in (<<some 6 million ids>>), even if we did it hourly we're still talking millions probably on some spike.

I'm back on dumps after some time, things I'm going to look at, in this order:

Quick update on discussions with @Ottomata and @gmodena:

We figured that a good way to move forward while simplifying the reconcile mechanism is to:

  • Continue having an hourly Spark MERGE INTO job that consumes the page_content_change hive table.
  • Have one additional hourly Spark job that scans page_content_change for deletes, accumulates the page_ids, and applies a single DELETE. A similar mechanism should be used for page moves as well, that would apply an UPDATE rather than a DELETE.
  • Add a new hourly Spark MERGE INTO job that consumes the page_content__late_change hive table.
    • All remaining inconsistencies should be relatively small, and thus would wait on the reconcile mechanism. Considering that page deletes and page moves would be applied on an hourly basis, the reconcile mechanism can potentially be run less frequently.

This is contingent in us confirming that Iceberg/Parquet bloom filters works for our use case. A preliminary check suggests it would, but a deeper look is needed. I will share my preliminary numbers soon.

Notes on using Iceberg/Parquet bloom filters.

TL;DR: For our use case, bloom filters did not show improvements. Original table performance is reasonable.

Longer:

Our problem: We want to apply DELETEs and UPDATEs that look like so:

DELETE
FROM t1
WHERE wiki_db = 'enwiki' AND page_id IN (A, B, C, D)

But, we write the tables partitioned by (wiki_db, months(revision_timestamp)). Thus, for any delete applied to any particular wiki_db, we hypothesize that we have to do a full scan of the target wiki partition.

Attempted solution:
Iceberg now provides read and write support for per column Parquet Bloom filters. ( See Iceberg Table write properties for configuration details.). You can declare it like so:

...
TBLPROPERTIES (
...
    'write.parquet.bloom-filter-enabled.column.page_id' = 'true',
    'write.parquet.bloom-filter-fpp.column.page_id' = '0.01',      -- attempt a false positive probability of 1% for page_id
    'write.parquet.bloom-filter-max-bytes' = '1048576'             -- but, cap bloom filter size to at most 1MB
...
)

We use the following data type for page_ids:

mysql:research@dbstore1009.eqiad.wmnet [wikidatawiki]> desc page;
+--------------------+---------------------+------+-----+---------+----------------+
| Field              | Type                | Null | Key | Default | Extra          |
+--------------------+---------------------+------+-----+---------+----------------+
| page_id            | int(10) unsigned    | NO   | PRI | NULL    | auto_increment |
...

Thus, presumably we have way too big of a range of 0 to 4,294,967,295; the bloom filter would not be effective. However, in practice, our Iceberg table's parquet files only ever contain ~1000 to ~16,000 page_id values, making it a perfect bloom filter candidate which can easily achieve 1% false positive rates with way less than 1MB of space:

spark.sql("""
SELECT -- file_path,
       -- partition,
       file_size_in_bytes/1024/1024 as file_size_in_megas,
       record_count,
       readable_metrics.page_id.value_count as page_id_value_count,       
       readable_metrics.page_id.lower_bound as page_id_lower_bound,
       readable_metrics.page_id.upper_bound as page_id_upper_bound,

       readable_metrics.revision_timestamp.lower_bound as revision_timestamp_lower_bound,
       readable_metrics.revision_timestamp.upper_bound as revision_timestamp_upper_bound
FROM wmf_dumps.wikitext_raw_rc2.files
WHERE partition.wiki_db = 'enwiki'
  AND partition.revision_timestamp_month = 624
ORDER BY readable_metrics.revision_timestamp.lower_bound ASC
LIMIT 10
""").show(100, truncate=False)
+------------------+------------+-------------------+-------------------+-------------------+------------------------------+------------------------------+
|file_size_in_megas|record_count|page_id_value_count|page_id_lower_bound|page_id_upper_bound|revision_timestamp_lower_bound|revision_timestamp_upper_bound|
+------------------+------------+-------------------+-------------------+-------------------+------------------------------+------------------------------+
|134.4532060623169 |16000       |16000              |593                |77277628           |2022-01-01 00:00:00           |2022-01-01 02:20:58           |
|31.970674514770508|3092        |3092               |2041               |72076618           |2022-01-01 02:20:58           |2022-01-01 02:54:05           |
|32.60470962524414 |3740        |3740               |1728               |75177296           |2022-01-01 02:54:06           |2022-01-01 03:35:41           |
|9.381248474121094 |904         |904                |1010               |69645690           |2022-01-01 03:35:43           |2022-01-01 03:48:24           |
|130.46067142486572|20000       |20000              |680                |77067150           |2022-01-01 03:48:25           |2022-01-01 07:52:44           |
|31.934041023254395|3889        |3889               |680                |71738958           |2022-01-01 04:19:51           |2022-01-01 05:02:14           |
|131.65628337860107|15000       |15000              |783                |76457659           |2022-01-01 05:02:15           |2022-01-08 20:22:32           |
|17.514870643615723|3259        |3259               |1425               |73265122           |2022-01-01 05:08:34           |2022-01-01 05:31:17           |
|21.74545955657959 |2555        |2555               |3392               |69647173           |2022-01-01 07:52:45           |2022-01-01 08:16:03           |
|129.8040657043457 |15000       |15000              |290                |70734681           |2022-01-01 08:16:03           |2022-01-01 11:27:06           |
+------------------+------------+-------------------+-------------------+-------------------+------------------------------+------------------------------+

So we tested it as follows:

  1. Create a test table with the same DDL except for the new bloom filter bits. I called this table xcollazo.wikitext_raw_rc2.
  2. Populate the table all of enwiki revisions:
# this took 3.3h
spark.sql("""
INSERT INTO xcollazo.wikitext_raw_rc2
SELECT *
FROM wmf_dumps.wikitext_raw_rc2
WHERE wiki_db = 'enwiki'
""").show(100, truncate=False)
  1. Run a couple benchmark queries to exercise the bloom filters. The easiest, most repeatable thing was a SELECT with a bunch of page_ids as recently seen from the page_change stream. A DELETE or an UPDATE will yield different results, but the bloom filter only helps on identifying target files to SELECT or not based on the contained page_ids of the file.

Results:
wmf_dumps.wikitext_raw_rc2:

# With 8 driver cores, 66k files scanned, read 47M rows, 148k splits,  4.4GB
# Elapsed time: 303.5358393192291 seconds  (120 query, 183 planning)
# Elapsed time: 301.08918023109436 seconds (102 query, 199 planning)
# Elapsed time: 286.9110209941864 seconds  (90 query, 196 planning)
# Elapsed time: x seconds (x query, x planning)
for i in range(0,3):
    start_time = time.time()
    spark.sql("""
    SELECT count(1) as count
    FROM wmf_dumps.wikitext_raw_rc2
    WHERE

    (wiki_db = 'enwiki' AND page_id IN (12924534, 29687986, 35328557, 73692977, 74530252, 74530254, 75962364, 75962367, 75971447, 75971928, 75977325, 75985100, 75985872, 76310582, 76310588, 76310589, 77328875, 77478992, 77479936, 77480638, 77480639, 77486461, 77486512, 77488096, 77488322, 77488407, 77488486))

    """).show(20)
    end_time = time.time()
    print(f"Elapsed time: {end_time - start_time} seconds")

xcollazo.wikitext_raw_rc2:

# With 8 driver cores, 66k files scanned, read 892k rows, 126k splits, 22.5GB
# Elapsed time: 281.00382900238037 seconds (120 query, 161 planning)
# Elapsed time: 408.46775674819946 seconds (240 query, 168 planning)
# Elapsed time: 259.80541920661926 seconds ( 90 query, 169 planning)
# Elapsed time: x seconds (x query, x planning)
for i in range(0,3):
    start_time = time.time()
    spark.sql("""
    SELECT count(1) as count
    FROM xcollazo.wikitext_raw_rc2
    WHERE

    (wiki_db = 'enwiki' AND page_id IN (12924534, 29687986, 35328557, 73692977, 74530252, 74530254, 75962364, 75962367, 75971447, 75971928, 75977325, 75985100, 75985872, 76310582, 76310588, 76310589, 77328875, 77478992, 77479936, 77480638, 77480639, 77486461, 77486512, 77488096, 77488322, 77488407, 77488486))

    """).show(20)
    end_time = time.time()
    print(f"Elapsed time: {end_time - start_time} seconds")

Analysis:
A) In terms of query time, there is no discernible difference between using or not using the bloom filter.
B) Notice that both tables had to open all of the enwiki files which are about 66k files. Notice I said open and not necessarily read. Once we get to the footer, we can access either the bloom filter or, on the original table, I presume the dictionary encoding of page_id.
C) While the original table read more actual rows (47M vs 892k), in terms of data size, the bloom table read more (22.5GB vs 4.4GB). Since the bloom table was created via an INSERT that would have shuffled the files, I will omit making judgements here.
D) Although query time is comparable, this benchmark also surfaces a ~160 to ~190 seconds of latency due to query planning on Spark. I have narrowed this down to a bottleneck reading HDFS BlockLocation objects. We will investigate this on a separate task.

Conclusions:
There is currently no benefit in paying the cost of generating bloom filters for page_ids at write time, and the time cost of the query is reasonable for the original table. Both version of the table do a full scan of the enwiki partition, opening all files, but, I am pleasantly surprised that (presumably) the dictionary encodings of the parquet file's row groups are doing an excellent job at avoiding further cost other than open the files and reading these statistics. On the original table, reading only ~47M rows out of ~1.2B total rows is quite surprising to me.

On the original table, reading only ~47M rows out of ~1.2B total rows is quite surprising to me.

Indeed, < 4% read to get everything it needs to plan, impressive.

For posterity, an example of a page that got moved twice in the same hour:

presto> SELECT
     ->   dt,
     ->   page.namespace_id,
     ->   page.page_title,
     ->   prior_state.page.namespace_id AS old_namespace_id,
     ->   prior_state.page.page_title AS old_page_title
     -> FROM event.mediawiki_page_content_change_v1
     -> WHERE year=2024
     ->   AND month=8
     ->   AND day=1
     ->   AND hour=4
     ->   AND page_change_kind = 'move'
     ->   AND meta.domain != 'canary'
     ->   AND page.page_id IN (1133753)
     ->   ORDER BY dt DESC;
          dt          | namespace_id |           page_title           | old_namespace_id |         old_page_title         
----------------------+--------------+--------------------------------+------------------+--------------------------------
 2024-08-01T04:31:08Z |            0 | Transcona,_Winnipeg            |              118 | Draft:Move/Transcona,_Winnipeg 
 2024-08-01T04:31:04Z |          118 | Draft:Move/Transcona,_Winnipeg |                0 | Transcona                      
(2 rows)

Query 20240916_203155_00936_tpjp6, FINISHED, 15 nodes
Splits: 275 total, 275 done (100.00%)
[Latency: client-side: 0:02, server-side: 0:02] [52K rows, 9.57MB] [27.8K rows/s, 5.12MB/s]

In such a situation, we want the latest move to win.

Copy pasting writeup from MR 35, for completeness:

So far, we have ignored events that relate to page changes, namely page deletes and moves.

To keep the downstream reconciliation process lightweight, we decided to also consume this events on an hourly basis.

In this MR, we implement page delete and move ingestion by:

  • For deletes, we calculate all touched page_ids and inject those in the Iceberg DELETE statement.
  • For moves we do similarly, but we also JOIN against the target table to be able resolve all touched revision_ids. This is neccesary as the moves cannot be applied with an UPDATE statement, and require a MERGE INTO, and this mechanism requires uniqueness on the join clause.
    • Further, we introduce a new control column, row_move_last_update, to be able to apply moves independently of other updates.

These changes introduce two new writes per hour, and I benchmarked them to add ~20 minutes more, most of that cost on file rewrites. But, when enabling Iceberg's merge-on-read, the whole pipeline (revision level events, plus deletes and moves) takes ~7 minutes to run for a typical hour of enwiki. Thus if we incorporate these changes, we should also set the target table to 'write.merge.mode' = 'merge-on-read', and figure out a rewrite job on a separate MR.

Associated ALTERs needed after merging in MR 35:

ALTER TABLE wmf_dumps.wikitext_raw_rc2
ADD COLUMNS (
    row_move_last_update TIMESTAMP COMMENT 'the timestamp of the last move event or backfill that updated this row'
);

ALTER TABLE wmf_dumps.wikitext_raw_rc2
SET TBLPROPERTIES (
  'write.merge.mode' = 'merge-on-read',
  'write.delete.mode' = 'merge-on-read'
)

xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/837

Bump Dumps PySpark conda env to pickup jobs to ingest deletes and moves.

milimetric merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/837

Bump Dumps PySpark conda env to pickup jobs to ingest deletes and moves.

Planning on deploying this to prod on Monday Sept 23 2024.

Mentioned in SAL (#wikimedia-operations) [2024-09-23T14:03:43Z] <xcollazo@deploy1003> Started deploy [airflow-dags/analytics@3e2d3b8]: Deploy latest DAGs to analytics Airflow instance. T369868.

Mentioned in SAL (#wikimedia-operations) [2024-09-23T14:04:32Z] <xcollazo@deploy1003> Finished deploy [airflow-dags/analytics@3e2d3b8]: Deploy latest DAGs to analytics Airflow instance. T369868. (duration: 00m 48s)

Mentioned in SAL (#wikimedia-analytics) [2024-09-23T14:04:54Z] <xcollazo> Deployed latest DAGs to analytics Airflow instance. T369868.

Ran the following as per T369868#10154751:

  1. Pause all DAGs with tag mediawiki_dumps.
  1. Run the following:
ssh an-launcher1002.eqiad.wmnet
sudo -u analytics bash
kerberos-run-command analytics spark3-sql

spark-sql (default)> use wmf_dumps;
Response code
Time taken: 4.969 seconds

spark-sql (default)> ALTER TABLE wmf_dumps.wikitext_raw_rc2
                   > ADD COLUMNS (
                   >     row_move_last_update TIMESTAMP COMMENT 'the timestamp of the last move event or backfill that updated this row'
                   > );
24/09/23 14:08:54 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
Response code
Time taken: 1.64 seconds

spark-sql (default)> ALTER TABLE wmf_dumps.wikitext_raw_rc2
                   > SET TBLPROPERTIES (
                   >   'write.merge.mode' = 'merge-on-read',
                   >   'write.delete.mode' = 'merge-on-read'
                   > );
24/09/23 14:09:05 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
Response code
Time taken: 0.489 seconds
  1. Unpause DAGs.

Run of DAG dumps_merge_events_to_wikitext_raw was successful. However, the runtime is significantly higher than expected at ~38 minutes.

Will experiment with the configuration as it seems we are spending a lot of time blocked on shuffles.

Doubling spark.sql.shuffle.partitions from 64 to 128 helps somewhat, but not much.

This tuning will require a bit more thought, so I will do it separately.

Doubling spark.sql.shuffle.partitions from 64 to 128 helps somewhat, but not much.

This tuning will require a bit more thought, so I will do it separately.

Opened T375402 to chase this.