Page MenuHomePhabricator

Tune Dumps 2.0 hourly ingestion jobs
Closed, ResolvedPublic

Description

In T369868, we introduced extra jobs that effective do 3 writes per hour, and the runtime is now approaching ~38 mins per each consumed hour with the reasources and flags specified here, copied for convenience:

props = DagProperties(
    # DAG settings
    start_date=datetime(2023, 8, 23, 0),
    sla=timedelta(hours=6),
    conda_env=artifact("mediawiki-content-dump-0.2.0.dev0-ingest-deletes-and-moves.conda.tgz"),
    # target table
    hive_wikitext_raw_table="wmf_dumps.wikitext_raw_rc2",
    # source tables
    hive_mediawiki_page_content_change_table="event.mediawiki_page_content_change_v1",
    hive_revision_visibility_change="event.mediawiki_revision_visibility_change",
    # Spark job tuning
    driver_memory="16G",
    driver_cores="4",
    executor_memory="16G",
    executor_cores="2",
    max_executors="64",
    spark_driver_maxResultSize="8G",
    # keep shuffler partitions low so that the final file fanout is low as well
    spark_sql_shuffle_partitions="64",
    # avoid java.lang.StackOverflowError when generating MERGE predicate pushdowns
    spark_extraJavaOptions="-Xss4m",
    # disable fetching HDFS BlockLocations to avoid very long query planning times.
    spark_sql_iceberg_locality_enabled="false",
)

In this task we should take a good look at the query plans and figure what is spending the most time and try and tune it out.

spark_content_merge_into Task Instance Duration.png (1×1 px, 165 KB)

Details

Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Draft: Wild experiment to see whether pushdown of page_ids instead of revision_ids for revision level changes helpsrepos/data-engineering/mediawiki-content-pipelines!40xcollazotry-page-ids-instead-of-revision-idsmain
Hotfix: Pickup fix for tuning ingestion.repos/data-engineering/airflow-dags!885xcollazofix-silly-returnmain
Set wikitext_raw's rewrite_data_files() max file group size.repos/data-engineering/airflow-dags!884xcollazotune-rewritemain
Pick up ingestion tuning changes for Dumps 2.0repos/data-engineering/airflow-dags!876xcollazotune-dumps-ingestionmain
Use new partitioning and bloom filter strategy for ingestion.repos/data-engineering/mediawiki-content-pipelines!36xcollazotune-ingestionmain
Add support for Iceberg's rewrite_data_files().repos/data-engineering/airflow-dags!849xcollazoadd-iceberg-maintenancemain
Customize query in GitLab

Event Timeline

xcollazo changed the task status from Open to In Progress.Sep 26 2024, 8:42 PM

Will share findings next week, but right away we definitely need T373694: [Iceberg Migration] Extend Iceberg table maintenance mechanism to support data rewrite.

So will work on that jointly with this task.

Added to analytics-weekly-train at https://etherpad.wikimedia.org/p/analytics-weekly-train

Wanted to deploy right away, but there is a blocking Airflow upgrading going on. Will deploy tomorrow.

Mentioned in SAL (#wikimedia-operations) [2024-10-03T14:52:32Z] <xcollazo@deploy2002> Started deploy [airflow-dags/analytics@b715af7]: Deploy latest DAGs to the analytics Airflow instance. T373694. T375402

Mentioned in SAL (#wikimedia-operations) [2024-10-03T14:56:02Z] <xcollazo@deploy2002> Finished deploy [airflow-dags/analytics@b715af7]: Deploy latest DAGs to the analytics Airflow instance. T373694. T375402 (duration: 03m 33s)

Mentioned in SAL (#wikimedia-analytics) [2024-10-03T14:56:25Z] <xcollazo> Deployed latest DAGs to the analytics Airflow instance. T373694. T375402.

I tried doubling Spark resources, and even tripling it: from 64 executors, to 128, to 256. I did give executors less memory (from 16GB to 12GB), but I made sure that was plenty, and that it was not causing any trashing or spillage. Unfortunately this does not change the runtime by much.

The work done on T373694 does keep the runtime stable at around 60 minutes. Without it, it quickly degrades to ~90+ minutes.

These 3 ingest runs are available for the next ~90 days:

https://yarn.wikimedia.org/spark-history/history/application_1727783536357_67288/jobs/
https://yarn.wikimedia.org/spark-history/history/application_1727783536357_68618/jobs/
https://yarn.wikimedia.org/spark-history/history/application_1727783536357_70373/jobs/

All 3 show the same pattern, so let's drill in into the first one:

  • Main MERGE INTO: 6.7m
  • page deletes DELETE: 27m.
  • page moves MERGE INTO: 41m.

This is interesting since the vast majority of events are not page deletes nor moves:

SELECT COUNT(1) AS count,
       month,
       IF(page_change_kind IN ('delete', 'move', 'undelete'), 'page_level_change', 'revision_level_change') as change_category
FROM  event.mediawiki_page_change_v1
WHERE year = 2024
  AND month = 9
GROUP BY month, change_category
ORDER BY month ASC, change_category

count   month   change_category
474023  9       page_level_change
42273810        9       revision_level_change
Time taken: 155.274 seconds, Fetched 2 row(s)

Yet, the time we spend on these deletes and moves is 91% of the runtime (!).

Drilling deeper, we can clearly see that the approach of finding the page_ids takes most of that 91%. For example, the first Spark job that maps to the DELETE takes all of the 27m, and is opening 133,530 file splits. As per T369868#10142190's analysis, and as can be seen in the job details, we open the files and mostly figure out that the file does not contain the page_id and move on. But, we do this 133,530 times. The task median runtime is 2s, so (133,530 splits * 2sec ) / 60 sec/min / 128 workers give us ~35 mins which correlates well with the runtime.

This approach is simply not scalable with the current table partitioning. If we need to do an hourly DELETE FROM t1 WHERE wiki_db = 'enwiki' and page_id IN (1,2,3), then we need the data partitioned by page_id, somehow. Now, we need to be careful that tuning for the page delete DELETE and the page move MERGE INTO doesn't make the main MERGE INTO take over. But still, I think it is worth to try revising the PARTITION BY clause.

I will experiment with this now.

Copy pasting from MR 36:

In this MR we:

  • Refactor event ingestion code to allow independently running process_revisions(), process_page_deletes() and process_page_moves(). This will allow us to tune each Spark job independently, while still being able to run them all in one Spark job for when we ingest smaller event tables, such as the reconciliation table.
  • Start utilizing a new partitioning, sorting, and bloom filter strategy to ingest faster.
BeforeAfterRationale
PARTITIONED BY (wiki_db, months(revision_timestamp))PARTITIONED BY (wiki_db)The revision_timestamp based partitioning was not helping anymore now that the vast majority of the time was spent on our page deletes DELETE and page moves MERGE INTO which were doing a full table scan to see which revisions matched a particular page_id. We are better served by sorting by page_id instead.
WRITE ORDERED BY wiki_db, revision_timestampWRITE ORDERED BY wiki_db, page_id, revision_timestampThis sorting strategy aligns our page deletes DELETE and page moves MERGE INTO with the data, and they typycally touch now very few files, finishing in ~1 mins each.
No bloom filter.Max 5MB bloom filter on revision_idDoing the two changes above makes the revision level MERGE INTO be the part that consumes the most time now as it will now require full table scans. And while a bloom filter was not effective before when applied to page_ids, it is very effective against revision_ids with the new partitioning strategy. This is so because we now have way less files per wiki_db, and so the time spent going over all files does not dwarf the gains from the bloom filter. For the revision level MERGE INTO we now see it done in ~10 mins.

Associated ALTERs needed after merging in MR 36:

ALTER TABLE wmf_dumps.wikitext_raw_rc2 DROP PARTITION FIELD months(revision_timestamp);

ALTER TABLE wmf_dumps.wikitext_raw_rc2 SET TBLPROPERTIES (
    'write.parquet.bloom-filter-enabled.column.revision_id' = 'true',  -- bloom filter on revision_id helps the revision-level events MERGE INTO
    'write.parquet.bloom-filter-fpp.column.revision_id' = '0.01',      -- attempt a false positive probability of 1% for revision_id
    'write.parquet.bloom-filter-max-bytes' = '5242880'                 -- but, cap bloom filter size to at most 5MB
);

ALTER TABLE wmf_dumps.wikitext_raw_rc2 WRITE ORDERED BY wiki_db, page_id, revision_timestamp;

We also need to trigger a manual run of rewrite_data_files() so that data is repartitioned and sorted accordingly.

The thread with @Milimetric on how the bloom filter false positivity rate and max size is calculated is of interest for future Iceberg use cases, thus linking and copy pasting here:

Link to original conversation: https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/36#note_111342.

@Milimetric:
any calculation to include in a comment for context? If it was just a guess that might be useful to mention in case we want to play with this in the future.

@xcollazo:
Thank you for prompting this. I just realized that my original calculations where good, but the current calculations are not. We either need to lower the max size of files, or up the max bloom filter size.

Original calculation:

# is revision_id a good candidate for bloom filters? It appears to be, with a .75 percentile having ~100k revisions per file.
# Not a perfect candidate, but in the range of reasonability.
spark.sql("""
SELECT partition.wiki_db,
       PERCENTILE(file_size_in_bytes/1024/1024, array(0.25, 0.5, 0.75, 1.0)) as file_size_in_megas_percentiles,
       PERCENTILE(readable_metrics.revision_id.value_count, array(0.25, 0.5, 0.75, 1.0)) as revision_id_value_count_percentiles
FROM xcollazo.wikitext_raw_rc2_wiki_partitioned.files
WHERE partition.wiki_db IN ('enwiki', 'commonswiki', 'wikidatawiki')
GROUP BY partition.wiki_db
""").show(100, truncate=False)

+------------+--------------------------------------------------------------------------------+----------------------------------------+
|wiki_db     |file_size_in_megas_percentiles                                                  |revision_id_value_count_percentiles     |
+------------+--------------------------------------------------------------------------------+----------------------------------------+
|wikidatawiki|[2.279878616333008, 3.0794925689697266, 4.172296524047852, 501.81318950653076]  |[13809.0, 16609.0, 19649.0, 51525.0]    |
|enwiki      |[27.696892738342285, 32.89190340042114, 41.7926812171936, 523.2010679244995]    |[105623.0, 112104.0, 118246.0, 147706.0]|
|commonswiki |[11.567601919174194, 12.777786254882812, 14.784200429916382, 141.41772174835205]|[83393.25, 87293.5, 91278.5, 112773.0]  |
+------------+--------------------------------------------------------------------------------+----------------------------------------+

So we have 1,048,576 bits available (1MB) and ~100,000 unique revision_ids per file at the .75 percentile. Thus 1,048,576 bits / 100,000 inserts = 10.49 bits of space per insert, which as per parquet's bloom filter documentation would yield us ~1% probability of false positives. Very reasonable.

BUT! I made that calculation in the table *before* running rewrite_data_files(), and after running that, we have completely different numbers:

spark.sql("""
SELECT partition.wiki_db,
       PERCENTILE(file_size_in_bytes/1024/1024, array(0.25, 0.5, 0.75, 1.0)) as file_size_in_megas_percentiles,
       PERCENTILE(readable_metrics.revision_id.value_count, array(0.25, 0.5, 0.75, 1.0)) as revision_id_value_count_percentiles
FROM xcollazo.wikitext_raw_rc2_wiki_partitioned_plus_bf_on_rev_id.files
WHERE partition.wiki_db IN ('enwiki', 'commonswiki', 'wikidatawiki')
GROUP BY partition.wiki_db
""").show(100, truncate=False)

+------------+------------------------------------------------------------------------------+----------------------------------------+
|wiki_db     |file_size_in_megas_percentiles                                                |revision_id_value_count_percentiles     |
+------------+------------------------------------------------------------------------------+----------------------------------------+
|enwiki      |[67.68609046936035, 80.04043483734131, 98.86448001861572, 428.7305974960327]  |[232422.0, 257406.0, 277813.0, 342590.0]|
|commonswiki |[63.450965881347656, 69.40349674224854, 80.18363380432129, 179.99110221862793]|[449945.0, 465988.0, 481059.0, 543406.0]|
|wikidatawiki|[72.74677753448486, 100.00476408004761, 129.25442790985107, 527.4241724014282]|[146500.0, 441658.0, 484284.0, 591779.0]|
+------------+------------------------------------------------------------------------------+----------------------------------------+

That's ~414,000 unique revision_ids per file at .75 percentile. Thus 1,048,576 bits / 414,000 inserts = 2.53 bits of space per insert, which puts us off the chart, at a >> 10% false positive probability.

So to be back at ~1%, we seem to need X / 414,000 = 10.5, solving for X, X = 4,347,000 (~4.2 MBs). Hmm...

Mentioned in SAL (#wikimedia-operations) [2024-10-21T17:48:58Z] <xcollazo@deploy2002> Started deploy [airflow-dags/analytics@671896c]: Deploy T375402.

Mentioned in SAL (#wikimedia-operations) [2024-10-21T17:50:02Z] <xcollazo@deploy2002> Finished deploy [airflow-dags/analytics@671896c]: Deploy T375402. (duration: 01m 04s)

Mentioned in SAL (#wikimedia-analytics) [2024-10-21T17:51:00Z] <xcollazo> Deployed latest DAGs to Airflow analytics instance to pickup T375402.

Associated ALTERs needed after merging in MR 36:

ALTER TABLE wmf_dumps.wikitext_raw_rc2 DROP PARTITION FIELD months(revision_timestamp);

ALTER TABLE wmf_dumps.wikitext_raw_rc2 SET TBLPROPERTIES (
    'write.parquet.bloom-filter-enabled.column.revision_id' = 'true',  -- bloom filter on revision_id helps the revision-level events MERGE INTO
    'write.parquet.bloom-filter-fpp.column.revision_id' = '0.01',      -- attempt a false positive probability of 1% for revision_id
    'write.parquet.bloom-filter-max-bytes' = '5242880'                 -- but, cap bloom filter size to at most 5MB
);

ALTER TABLE wmf_dumps.wikitext_raw_rc2 WRITE ORDERED BY wiki_db, page_id, revision_timestamp;

Ran all these ALTER commands.

Current state of table:

spark-sql (default)> describe formatted wmf_dumps.wikitext_raw_rc2;
...
# Partitioning		
Part 0	wiki_db	
		
# Metadata Columns		
_spec_id	int	
_partition	struct<wiki_db:string,revision_timestamp_month:int>	
_file	string	
_pos	bigint	
		
# Detailed Table Information		
Name	spark_catalog.wmf_dumps.wikitext_raw_rc2	
Comment	Base to create MediaWiki full revision dumps from.	
Location	/wmf/data/wmf_dumps/wikitext_raw_rc2	
Provider	iceberg	
Owner	analytics	
Table Properties	[current-snapshot-id=7916199703503932046,format=iceberg/parquet,sort-order=wiki_db ASC NULLS FIRST, page_id ASC NULLS FIRST, revision_timestamp ASC NULLS FIRST,write.delete.mode=merge-on-read,write.distribution-mode=range,write.format.default=parquet,write.merge.distribution-mode=range,write.merge.mode=merge-on-read,write.metadata.delete-after-commit.enabled=true,write.metadata.previous-versions-max=10,write.parquet.bloom-filter-enabled.column.revision_id=true,write.parquet.bloom-filter-fpp.column.revision_id=0.01,write.parquet.bloom-filter-max-bytes=5242880,write.target-file-size-bytes=134217728]	
Time taken: 1.013 seconds, Fetched 39 row(s)

Will now attempt the rewrite_data_files().

Running the following manually for more than 12 hours now:

spark.sql("""
CALL spark_catalog.system.rewrite_data_files(
    table => 'wmf_dumps.wikitext_raw_rc2',
    strategy => 'sort',
    sort_order => 'wiki_db ASC NULLS FIRST, page_id ASC NULLS FIRST, revision_timestamp ASC NULLS FIRST',
    options => map('max-file-group-size-bytes', '53687091200',
                   'max-concurrent-file-group-rewrites','10',
                   'partial-progress.enabled', 'true',
                   'rewrite-all', 'true')
)
""").show(1000, truncate=False)

The rewrite-all is required so that we move all files to the new partitioning scheme. This process is causing high HDFS RPC use, but it should be finished by EOD.

xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/40

Draft: Wild experiment to see whether pushdown of page_ids instead of revision_ids for revision level changes helps

xcollazo closed https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/40

Draft: Wild experiment to see whether pushdown of page_ids instead of revision_ids for revision level changes helps

Sad news:

Even though, as per T375402#10239416, the performance of the hourly ingest seemed to be great on my testing, after merging all changes into the production table, we were not able to reproduce the performance benefits.

The revision level MERGE INTO continues to take way more time than the allotted max of 1 hour.

At this time, I am throwing the towel. There are more things to look into, like figuring out why I was not able to reproduce the gains, but there is a lot of other work to be done for Dumps 2.0 that needs attention. Thus, I think it is best, in the interest of time, to rest this work and bite the bullet: we will have to do consume at a daily cadence rather than hourly.

xcollazo changed the task status from In Progress to Stalled.Oct 23 2024, 7:51 PM

@xcollazo maybe stupid idea:

Would it be possible or helpful to partition by a hashed/sharded page_id? E.g.

-- $shard_size = 1000
PARTITIONED_BY(wiki_db, hash(page_id) % $shard_size)

@xcollazo maybe stupid idea:

Would it be possible or helpful to partition by a hashed/sharded page_id? E.g.

-- $shard_size = 1000
PARTITIONED_BY(wiki_db, hash(page_id) % $shard_size)

I think this would be similar to:

PARTITIONED_BY(wiki_db, truncate(1000, page_id))

This will be useful for the page level delete DELETE, and the page level move MERGE INTO, just like today it is useful that we do WRITE ORDERED BY wiki_db, page_id, revision_timestamp. We can push down the page_ids for those two mutations, because they are typically <= 1k events.

But the question remains: how do we help the revision level MERGE INTO? The suggested partitioning schema doesn't, because we have, at an hour level about ~150k events, and at a day level it is ~3.6M. If we push down the page_ids (or revision_ids), we still effectively do a full table scan.

But let's continue discussing! Perhaps we do find a sweet spot.

how do we help the revision level MERGE INTO? The suggested partitioning schema doesn't, because we have, at an hour level about ~150k events, and at a day level it is ~3.6M

What's the average number of hourly/daily distinct page_ids that get edits? Maybe it doesn't matter, because the worst case is what matters?

If we push down the page_ids (or revision_ids), we still effectively do a full table scan

Not sure I understand this. If the table is sharded by hash on page_id, it seems like it'd be possible for the query to restrict the merge to only the shard-partitions that have pages that have been edited?

Hm, I guess the point is that the page_ids would be spread evenly enough across the shards that the result would be that most of the time most shards are involved? Is that right?

how do we help the revision level MERGE INTO? The suggested partitioning schema doesn't, because we have, at an hour level about ~150k events, and at a day level it is ~3.6M

What's the average number of hourly/daily distinct page_ids that get edits? Maybe it doesn't matter, because the worst case is what matters?

The average I think matters most. To answer your question:

Average events per day for 2024:

SELECT avg(count) as avg_per_day
FROM (
  SELECT month, day, count(1) as count
  FROM event.mediawiki_page_content_change_v1
  WHERE page_change_kind IN ('create', 'edit')
    AND year = 2024
    AND month <= 10
  GROUP BY month, day
)

+-----------------+
|avg_per_day      |
+-----------------+
|1585890.788888889|
+-----------------+

Which correlates almost 1:1 with the average distinct revision_ids per day for 2024:

SELECT avg(count) as avg_per_day
FROM (
  SELECT month, day, count(1) as count
  FROM (
    SELECT month, day, revision.rev_id
    FROM event.mediawiki_page_content_change_v1
    WHERE page_change_kind IN ('create', 'edit')
      AND year = 2024
      AND month <= 10
    GROUP BY month, day, revision.rev_id
  )
  GROUP BY month, day
)

+-----------------+
|avg_per_day      |
+-----------------+
|1585519.411111111|
+-----------------+

Then the average distinct page_ids per day for 2024:

SELECT avg(count) as avg_per_day
FROM (
  SELECT month, day, count(1) as count
  FROM (
    SELECT month, day, page.page_id
    FROM event.mediawiki_page_content_change_v1
    WHERE page_change_kind IN ('create', 'edit')
      AND year = 2024
      AND month <= 10
    GROUP BY month, day, page.page_id
  )
  GROUP BY month, day
)

+------------------+
|avg_per_day       |
+------------------+
|1091206.6333333333|
+------------------+

As you can see, there is no way we could push these distinct values down.

If we push down the page_ids (or revision_ids), we still effectively do a full table scan

Not sure I understand this. If the table is sharded by hash on page_id, it seems like it'd be possible for the query to restrict the merge to only the shard-partitions that have pages that have been edited?

Hm, I guess the point is that the page_ids would be spread evenly enough across the shards that the result would be that most of the time most shards are involved? Is that right?

Yes, and also that we cannot possibly pushdown >= ~1M distinct page_ids or revision_ids.

But!, while playing with the data, I recently found out a great property, explained in next comment.

From MR 42:

On !41 we switched to daily processing, but botched the predicate pushdown, as we were only pushing down the wiki_db, which is too broad and would make us shuffle over 40TB of data (example can be seen here for next 90 days.)!

In this MR we fix this as we have figured the following observation:

Now that the revision level MERGE INTO only processes page_change_kind IN ('create', 'edit'), then the vast majority of events are in the 'future' compared to what we have already processed. Thus, we can push down, per wiki, the earliest event revision_timestamp we see in the new data. That way, we can eliminate a lot of files, given that they are WRITE ORDERED BY wiki_db, page_id, revision_timestamp.

So we pushdown the result of:

SELECT
  wiki_id,
  MIN(to_timestamp(revision.rev_dt)) as earliest_revision_dt
FROM event.mediawiki_page_content_change_v1
WHERE year=2024
  AND month=10
  AND day=4
  AND page_change_kind IN ('create', 'edit')
GROUP BY wiki_id
ORDER BY wiki_id, earliest_revision_dt

Additionally, for processing visibility events, we are now reading too much data with the old predicate pushdowns based on revision_timestamp. Considering we see ~1500 visibility events per day, we can certainly push them down. Thus we now push down the result of:

SELECT
  database,
  page_id,
  sort_array(collect_set(rev_id)) as rev_ids
FROM event.mediawiki_revision_visibility_change
WHERE year=2024
  AND month=10
  AND day=4
GROUP BY database, page_id
ORDER BY database, page_id

Bug: T377999

Now the revision level MERGE INTO takes ~5 mins, and so does the revision level visibility changes.

The pipeline as a whole though still currently takes ~2.5 hours, due to other changes done and explained in T377999 and T376713. But since it is daily now, I am pretty happy with it.