Page MenuHomePhabricator

Mechanism for error logging when doing MERGE INTO
Closed, ResolvedPublic8 Estimated Story Points

Description

On T335860, we implemented a pyspark job that runs a MERGE INTO that transforms event data into a table that will eventually have all the mediawiki revision history.

Reviews of the MERGE INTO pointed at situations that should not happen, assuming that we have a properly backfilled table:

WHEN MATCHED AND s.changelog_kind IN ('insert', 'update') AND s_dt >= t.dt THEN
  UPDATE SET
...
WHEN MATCHED AND s.changelog_kind = 'delete' AND s_dt >= t.dt THEN DELETE
WHEN NOT MATCHED AND s.changelog_kind IN ('insert', 'update') THEN
  INSERT (
   ...

For example, in the snippet above, we should never hit WHEN MATCHED AND s.changelog_kind = 'insert' and we should also never hit WHEN NOT MATCHED AND s.changelog_kind ='update'.

Also:

Discussed this ticket with @JAllemandou, and there are some correctness issues that this work does not address. For example, we want to force dumps_merge_visibility_events_to_wikitext_raw to run after the same hour has been ingested for dumps_merge_events_to_wikitext_raw, because those visibility changes may be mutating new (wiki_db, revision_id ) tuples from the same hour.

I will tackle this correctness issues separately though, as part of T340863, as we also discussed that even when having correct order, we want to be able to detect if something is not looking good.

In this task we should:

  • These things do happen right now, but that is because we are not backfilled, so we should wait until T340861 is complete first.
  • We should run a couple queries to see if the above situations do happen after backfill.
  • Build a mechanism to log these situations. It can be as simple as having an extra column in the target table where we log the error. Or fancier.
  • For partial correctness, we should merge the two current events Airflow jobs into one and force ordering.
  • Add one more column to separate visibility updates from regular update, so that visibility changes are idempotent.

Details

TitleReferenceAuthorSource BranchDest Branch
Fine tune Dumps 2.0 backfill and event ingestion.repos/data-engineering/airflow-dags!558xcollazofine-tune-backfillmain
Switch to parquet. Use WRITE ORDERED BY.repos/data-engineering/dumps/mediawiki-content-dump!21xcollazomove-to-parquetmain
Switch to Dumps wikitext_raw_rc2 schema.repos/data-engineering/airflow-dags!524xcollazowikitext_raw_rc2main
Modify schema to wikitext_raw_rc2.repos/data-engineering/dumps/mediawiki-content-dump!18xcollazowikitext_raw_rc2main
Customize query in GitLab

Event Timeline

WDoranWMF set the point value for this task to 8.Oct 17 2023, 11:11 AM

Here are the schema changes that I think we should do as part of this task:

Introduce a row_visibility_last_update column.

This control column will help with the following situations:

  • Make visibility event ingestion idempotent - Right now, we consume visibility changes without bumping row_last_update. So if we were to repeat ingestion of a particular hour, we could inadvertedly erase a later event.
  • Keep visibility stream ingestion apart from page_content ingestion - We do not want to change row_last_update on a visibility change as a visibility event timestamp could trump a less recent but needed content event.

Here is an example in which the meta_dt from a visibility change flipped in relation to its counterpart page_content change in an hour window:

SELECT c.wiki_id,
       c.revision.rev_id,
       c.changelog_kind,
       v.prior_state.visibility as prior_visibility,
       v.visibility,
       c.meta.dt as content_meta_dt,
       v.meta.dt as visibility_meta_dt,
       to_timestamp(c.meta.dt) < to_timestamp(v.meta.dt) as v_after_c
FROM event.mediawiki_page_content_change_v1 c
INNER JOIN event.mediawiki_revision_visibility_change v ON c.wiki_id = v.database AND c.revision.rev_id = v.rev_id
WHERE c.year = 2023
  AND c.month = 10
  AND c.day = 16
  AND c.hour = 0
  
  AND v.year = 2023
  AND v.month = 10
  AND v.day = 16
  AND v.hour = 0

+-------+----------+--------------+------------------+--------------------+---------------------------+--------------------+---------+
|wiki_id|rev_id    |changelog_kind|prior_visibility  |visibility          |content_meta_dt            |visibility_meta_dt  |v_after_c|
+-------+----------+--------------+------------------+--------------------+---------------------------+--------------------+---------+
|enwiki |1180332207|update        |{true, true, true}|{false, false, true}|2023-10-16T00:00:07.573767Z|2023-10-16T00:01:34Z|true     |
|bewiki |4605967   |update        |{true, true, true}|{false, true, false}|2023-10-16T00:16:43.691925Z|2023-10-16T00:16:42Z|false    |
|ruwiki |133622800 |update        |{true, true, true}|{false, true, true} |2023-10-16T00:05:29.567523Z|2023-10-16T00:05:26Z|false    |
|ruwiki |133622800 |update        |{true, true, true}|{false, true, true} |2023-10-16T00:04:56.575687Z|2023-10-16T00:05:26Z|true     |
|bewiki |4605967   |update        |{true, true, true}|{false, true, false}|2023-10-16T00:06:50.596076Z|2023-10-16T00:16:42Z|true     |
+-------+----------+--------------+------------------+--------------------+---------------------------+--------------------+---------+

We could solve this particular error without a new column intra hour, but if such a situation arises between hours (and thus between MERGE INTOs), then we need the separate column to be able to deterministically ingest the events.

Introduce an errors column.

Schema:

errors    ARRAY<
             STRUCT<error_timestamp: TIMESTAMP,
                    error_producer: STRING,
                    error_message: STRING,
                    error_severity: STRING
             >
          >        COMMENT 'an ARRAY, typically empty, that will contain any detected ingestions errors. Consumers may want to skip rows with errors depending on their severity.'

This column will allow us to log errors jointly with the row data. The MERGE INTOs will be the mechanism producing these entries, so an example entry would be:

...
WHEN MATCHED AND s.changelog_kind = 'insert'
  UPDATE SET
    t.page_id = s_page_id,
    ...
    t.errors = concat(t.errors, array(named_struct(
                                         'error_timestamp', NOW(),
                                         'error_producer', 'events_merge_into.py',
                                         'error_message', '(wiki_db, revision_id) existed, but got a changelog_kind = insert',
                                         'error_severity', 'WARN'
                                         )
                                 )
                     ),
    ...
    t.row_last_update = s_meta_dt

A dumps downstream process may then do the following filtering:

SELECT *
FROM wmf_dumps.wkitext_raw_rc2
WHERE ....
AND cardinality(errors) = 0

Someone interested in studying the errors would then do:

SELECT *
FROM wmf_dumps.wkitext_raw_rc2
WHERE ....
AND cardinality(errors) > 0

On backfill, we should reset errors to 0, as the intent is to catch errors from the streams.

Change partitioning to PARTITIONED BY (wiki_db, months(revision_timestamp)).

This is not a schema change, as Iceberg partitioning does not affect the schema, but we want to try this out given (wiki_db, days(revision_timestamp)) has been taxing to the HDFS files count, with ~3.5M folders and ~5M files so far:

xcollazo@stat1007:~$ hdfs dfs -count /user/hive/warehouse/wmf_dumps.db/wikitext_raw_rc1/data
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
     3569878      5056902     44264942677765 /user/hive/warehouse/wmf_dumps.db/wikitext_raw_rc1/data

This particular change could regress significantly our hourly ingestions, so we will try it, and if it is bad, we can try Iceberg merge on read, therefore:

Explicitly make the table an Iceberg v2 table.

This will give us the flexibility to do 'merge-on-read' deletes if needed. Especially to cope with the changes from the partitioning.

  1. Introduce a row_visibility_last_update column.

I like it! It's way more explicit and will take almost no space because it will mostly be null

  1. Introduce an errors column.

Ideally this could be a separate table, but for performance reasons it makes a lot of sense. Looking for errors would be a full table scan, so maybe after we get a sense of what kinds of errors there are, we may be able to rethink this approach. This is a good way to get us moving though.

  1. Change partitioning to PARTITIONED BY (wiki_db, months(revision_timestamp)).

scary :) I have a feeling that merge-on-read + compaction jobs would be really nice for this use case. The only thing is if this becomes a more broadly used table, might be worth revisiting.

  1. Explicitly make the table an Iceberg v2 table.

For sure, +1

Introduce a row_visibility_last_update column.

I like it! It's way more explicit and will take almost no space because it will mostly be null

Having NULLs here will complicate the MERGE INTO logic, even more than it already is ( see draft MR https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/18 ). So I was thinking of always defining it. The vast majority of the weight of this table comes from revision_content_slots, as seem by this example of all of eswiki where a new TIMESTAMP (== an Avro long) would weigth a total of ~1GB vs the current content size of ~2.9TB:

SELECT count(1)*8 as timestamp_total_bytes, sum(revision_content_slots['main'].content_size) as content_total_bytes FROM wmf_dumps.wikitext_raw_rc1 WHERE wiki_db = 'eswiki';
timestamp_total_bytes	content_total_bytes
1141134984	3197064784033

Introduce an errors column.

Ideally this could be a separate table, but for performance reasons it makes a lot of sense. Looking for errors would be a full table scan, so maybe after we get a sense of what kinds of errors there are, we may be able to rethink this approach. This is a good way to get us moving though.

I think it would be hard for errors to be on a separate table, considering that we need to be inside of the MERGE INTO logic to be able to detect them, and MERGE INTOs do not support multiple tables. After the MERGE INTO, we just loose the context and can't recreate the errors anymore. If we were building a changelog we could separate it easily no problem.

Ran the following to create the new table (similar to T335860#9006727):

ssh an-coord1001.eqiad.wmnet
sudo -u analytics bash

kerberos-run-command analytics spark3-sql

CREATE TABLE wmf_dumps.wikitext_raw_rc2 (
    page_id                     BIGINT    COMMENT 'id of the page',
    page_namespace              INT       COMMENT 'namespace of the page',
    page_title                  STRING    COMMENT 'title of the page ',
    page_redirect_title         STRING    COMMENT 'title of the redirected-to page',
    user_id                     BIGINT    COMMENT 'id of the user that made the revision; null if anonymous, zero if old system user, and -1 when deleted or malformed XML was imported',
    user_text                   STRING    COMMENT 'text of the user that made the revision (either username or IP)',
    user_is_visible             BOOLEAN   COMMENT 'whether the user details have been suppressed or not',
    revision_id                 BIGINT    COMMENT 'id of the revision',
    revision_parent_id          BIGINT    COMMENT 'id of the parent revision',
    revision_timestamp          TIMESTAMP COMMENT 'timestamp of the revision',
    revision_is_minor_edit      BOOLEAN   COMMENT 'whether this revision is a minor edit or not',
    revision_comment            STRING    COMMENT 'comment made with revision',
    revision_comment_is_visible BOOLEAN   COMMENT 'whether the comment has been suppressed or not',
    revision_sha1               STRING    COMMENT 'Nested SHA1 hash of hashes of all content slots. See https://www.mediawiki.org/wiki/Manual:Revision_table#rev_sha1',
    revision_size               BIGINT    COMMENT 'the sum of the content_size of all content slots',
    revision_content_slots      MAP<
                                    STRING,
                                    STRUCT<content_body:   STRING,
                                           content_format: STRING,
                                           content_model:  STRING,
                                           content_sha1:   STRING,
                                           content_size:   BIGINT
                                    >
                                >         COMMENT 'a MAP containing all the content slots associated to this revision. Typically just the "main" slot, but also "mediainfo" for commonswiki.',
    revision_content_is_visible BOOLEAN   COMMENT 'whether the content in the slots has been suppressed or not',
    wiki_db                     STRING    COMMENT 'the wiki project',
    errors                      ARRAY<
                                    STRUCT<error_upstream_id:           STRING,     -- the id of the event that caused the error, typically meta.id.
                                           error_upstream_timestamp:    TIMESTAMP,  -- the timestamp of the event that caused the error, typically meta.dt.
                                           error_producer:              STRING,
                                           error_message:               STRING,
                                           error_severity:              STRING
                                    >
                                >         COMMENT 'an ARRAY, typically empty, that will contain any detected ingestion errors. Consumers may want to skip rows with errors depending on their severity.',
    row_last_update             TIMESTAMP COMMENT 'the timestamp of the last content event or backfill that updated this row',
    row_visibility_last_update  TIMESTAMP COMMENT 'the timestamp of the last visibility event or backfill that updated this row'
)
USING ICEBERG
PARTITIONED BY (wiki_db, months(revision_timestamp)) -- wiki_db partitioning helps the backfill process,
                                                     -- while months(revision_timestamp) helps the stream intake
TBLPROPERTIES (
    'format-version' = '2',                                -- allow merge-on-read if needed
    'write.format.default' = 'avro',                       -- avoid OOMs due to revision_content_slots
    'write.metadata.previous-versions-max' = '10',
    'write.metadata.delete-after-commit.enabled' = 'true'
)
COMMENT 'Base to create MediaWiki full revision dumps from.'
LOCATION '/wmf/data/wmf_dumps/wikitext_raw_rc2';

Response code
Time taken: 3.388 seconds

Deployed to production.

Will report whether everything works as expected later.

TL;DR:

Backfill took about ~111 hours with the new partitioning of (wiki_db, months(revision_timestamp)). Compare that to about ~48 hours with (wiki_db, days(revision_timestamp)). Most of the extra time comes from failures and straggler executors with lots of data to process given that individual partitions are now ~30x bigger than before. We have some leads for enhancing performance for next run.

Longer:
All right, the backfill is now done. Here are the numbers, jointly with numbers from last backfill (T346281#9206815) for comparison:

TaskDaily partitioningMonthly partitioningDiff factorNotes
wait_for_data_in_mw_wikitext_history0:00:030:00:020.67
wait_for_data_in_raw_mediawiki_revision0:00:040:00:020.50
spark_create_intermediate_table18:05:1816:44:180.93No change in schema for intermedia table here so no change in runtime.
spark_backfill_merge_into_20010:01:230:02:431.96
spark_backfill_merge_into_20020:03:250:02:410.79
spark_backfill_merge_into_20030:06:470:03:530.57
spark_backfill_merge_into_20040:11:370:14:311.25
spark_backfill_merge_into_20050:31:240:55:301.77
spark_backfill_merge_into_20060:55:031:47:411.96
spark_backfill_merge_into_20071:16:072:27:031.93
spark_backfill_merge_into_20081:24:492:41:171.90
spark_backfill_merge_into_20090:46:012:35:283.38
spark_backfill_merge_into_20100:43:582:28:163.37
spark_backfill_merge_into_20111:36:132:12:171.37
spark_backfill_merge_into_20120:49:182:06:322.57
spark_backfill_merge_into_20130:48:452:07:242.61
spark_backfill_merge_into_20140:46:301:56:272.50
spark_backfill_merge_into_20152:58:492:17:130.77
spark_backfill_merge_into_20161:15:222:33:232.04
spark_backfill_merge_into_20171:31:383:15:492.14
spark_backfill_merge_into_20181:41:1910:32:336.24I observed ~11 straglers here that spent most of the time.
spark_backfill_merge_into_20194:08:3910:32:192.54
spark_backfill_merge_into_20202:04:404:34:492.20
spark_backfill_merge_into_20212:22:154:55:262.08
spark_backfill_merge_into_20222:16:1110:07:224.46
spark_backfill_merge_into_20231:47:2212:21:536.91~3 stragglers here as well, but also a failure that restarted the write from scratch.
drop_intermediate_table0:00:250:00:251.00
remove_intermediate_files0:00:30N/A
Total48:13:2599:37:472.07So it took 2x the time.

Here is an inode comparison (dir count, file count, total size):

Monthly partitioning:

$ hdfs dfs -count /wmf/data/wmf_dumps/wikitext_raw_rc2/data
      179324       220730     26078477110787 /wmf/data/wmf_dumps/wikitext_raw_rc2/data

Daily partitioning (likely a bit less since backfill happened a while ago and we've been ingesting hourly, but generally accurate):

$ hdfs dfs -count /user/hive/warehouse/wmf_dumps.db/wikitext_raw_rc1/data
     3574753      5127198     48903124525321 /user/hive/warehouse/wmf_dumps.db/wikitext_raw_rc1/data

So for dir count, one order of magnitude less, for file count as well. And finally for rough total size, the new table is ~80% smaller, going from ~44TB to ~23!

A bit more on stragglers:

Seems like there are two things at play here:

  • the first, and most costly, is that we had two partitions with FetchFailedExceptions deep into their writes. This can be observed on /wmf/data/wmf_dumps/wikitext_raw_rc2/data/wiki_db=enwiki/revision_timestamp_month=2023-07, where attemp 1 wrote first file on Oct 26 13:47, and last file on Oct 26 15:46. Then attempt 2 started, and since this a whole month being written by one executor, then we have to start all over again from Oct 26 18:50 to Oct 26 20:46. This effective made that partition take ~7 hours to write from the shuffle.
  • the second, is that because now whole months are serialized to a single executor, I suspect that our 2 core configuration is CPU bound. We should experiment with chunkier executors, while maintaining cluster usage percentage around ~20%. So perhaps 4-6 cores 50 executors instead of 2 and 100.

So this is definitely a compromise between runtime (2 days vs 4) to general HDFS pressure. Since most of the wasted time is on stragglers and failures, next time let's try the conf change suggested above, and see where we land. For now, I believe these changes are a net positive given that the HDFS pressure long term is much less.

( CC @JAllemandou. More fun for us for next run! 😄 )

Ok, hourly events ingestion has been running for at least a week now, and there are bad news:

  • spark_content_merge_into, which used to take ~15 minutes to complete (blue in graphic), now takes ~45 minutes (light orange in graphic)
  • spark_visibility_merge_into, which used to take ~3 minutes now takes ~20 minutes (dark orange in graphic).

And that is with double the executors! I even tried enabling merge-on-read, but there was little to no benefit due to the vast majority of the time being spent on data read rather than data write.

Screenshot 2023-11-06 at 4.30.21 PM.png (1×3 px, 666 KB)

Because ~45m + ~20m = ~65m, the job has struggled to backfill and catch up.

There are a couple things we could still try:

  • Most of the MERGE INTO time is spent reading data to figure out which data files are going to be affected by the MERGE. We currently pushdown all the hours that we will touch, but because partitioning is by month, we typically read ~3-6TB of data. We could fine tune this to not only include hours affected but also wiki_dbs affected. I suspect this would will us a marginal win though, since typically the big wikis will anyways be touched in an hourly cadence, and so they would always be included.
  • Another avenue would be to convert from AVRO to PARQUET. The rationale here would be that per data file statistics are kept with PARQUET, but not with AVRO (This is a current Iceberg limitation). We write all data files sorted by (wiki_db, revision_timestamp), meaning that the per file statistic would very useful stats to do file pruning on query planning. This would allow the MERGE INTO to pinpoint exactly what data files are affected in a particular month rather than reading the whole month of data (which is what is happening right now). This could introduce its own issues though, around both writers and readers having a tough time reading/writing due to OOMs. But its worth a try given that I speculate this format change would let us continue to use monthly partitioning.

Awesome work @xcollazo

I'm glad about the backfill managing to finish :)
About the change of configuration, I don't think growing executors will help, as the split is made by task, and therefore by executor core. A solution could be to grow the number of splits used to shuffle.

The problem with regular updates from events is definitely not great :(
I can definitely see how using Parquet instead of Avro would help - I'm nonetheless afraid of the table being complicated to query if done this way. Having no better solution to offer, let's try this :)

I ran a full backfill to convert all remaining AVRO files to PARQUET. Here is a run comparison:

TaskMonthly partitioning (Avro)Monthly partitioning (Parquet)Diff factorNotes
wait_for_data_in_mw_wikitext_history0:00:020:00:021
wait_for_data_in_raw_mediawiki_revision0:00:020:00:031.5
spark_create_intermediate_table16:44:1820:22:351.217348734
spark_backfill_merge_into_20010:02:430:02:541.067484663
spark_backfill_merge_into_20020:02:410:03:371.347826087
spark_backfill_merge_into_20030:03:530:06:011.549356223
spark_backfill_merge_into_20040:14:310:17:281.203214696
spark_backfill_merge_into_20050:55:301:09:241.25045045
spark_backfill_merge_into_20061:47:413:32:561.977402879
spark_backfill_merge_into_20072:27:032:53:201.178737391
spark_backfill_merge_into_20082:41:173:30:151.30360649
spark_backfill_merge_into_20092:35:283:25:101.319682676
spark_backfill_merge_into_20102:28:163:10:451.286533273
spark_backfill_merge_into_20112:12:173:08:161.423207761
spark_backfill_merge_into_20122:06:324:34:382.170442571
spark_backfill_merge_into_20132:07:242:54:201.368393511
spark_backfill_merge_into_20141:56:273:01:101.555746386
spark_backfill_merge_into_20152:17:137:17:463.190331592
spark_backfill_merge_into_20162:33:235:33:572.177224818
spark_backfill_merge_into_20173:15:493:53:231.191846115
spark_backfill_merge_into_201810:32:338:12:540.7792269386
spark_backfill_merge_into_201910:32:197:59:280.7582698542
spark_backfill_merge_into_20204:34:496:50:201.493116623
spark_backfill_merge_into_20214:55:264:39:460.9469705517
spark_backfill_merge_into_202210:07:228:14:150.8137588497
spark_backfill_merge_into_202312:21:536:33:390.5306090356
drop_intermediate_table0:00:250:00:240.96
remove_intermediate_files0:00:300:00:311.033333333
Total99:37:47111:29:171.119024053Generating Parquet is 11% more expensive than Avro

We will now ingest streaming data for a couple days to see if we got the improvement we specualted we could get on T340863#9310578.

I'm also eager to check if we run into parquet-decompression issues as I think could happen. Thanks a lot for running those experiments @xcollazo :)

We did not get the improvements that we were speculating on T340863#9310578, not even with merge-on-read.

This fact was very puzzling to me, until I delved deeper into the individual files. The follow query inspects the min and max values for each file of the 2023-10 partition of enwiki of the current snapshot:

# Are parquet files as we expect them to be on 2023-10 ?
spark.sql(
"""
SELECT
file_format,
partition.revision_timestamp_month,
readable_metrics['revision_timestamp']['lower_bound'] as lower_bound,
readable_metrics['revision_timestamp']['upper_bound'] as upper_bound
FROM wmf_dumps.wikitext_raw_rc2.data_files
WHERE partition.wiki_db = 'enwiki'
  AND partition.revision_timestamp_month = 645
ORDER BY lower_bound ASC
""").show(100, truncate=False)


+-----------+------------------------+-------------------+-------------------+
|file_format|revision_timestamp_month|lower_bound        |upper_bound        |
+-----------+------------------------+-------------------+-------------------+
|PARQUET    |645                     |2023-10-01 00:00:00|2023-10-31 23:59:37|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:16|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:31|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:49|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:57|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:57|
|PARQUET    |645                     |2023-10-01 00:00:02|2023-10-31 23:59:54|
|PARQUET    |645                     |2023-10-01 00:00:02|2023-10-31 23:59:54|
|PARQUET    |645                     |2023-10-01 00:00:03|2023-10-31 23:59:52|
|PARQUET    |645                     |2023-10-01 00:00:03|2023-10-31 23:59:50|
|PARQUET    |645                     |2023-10-01 00:00:04|2023-10-31 23:59:07|
|PARQUET    |645                     |2023-10-01 00:00:05|2023-10-31 23:59:24|
|PARQUET    |645                     |2023-10-01 00:00:06|2023-10-31 23:59:45|
|PARQUET    |645                     |2023-10-01 00:00:06|2023-10-31 23:59:33|
|PARQUET    |645                     |2023-10-01 00:00:06|2023-10-31 23:58:35|
|PARQUET    |645                     |2023-10-01 00:00:07|2023-10-31 23:59:42|
|PARQUET    |645                     |2023-10-01 00:00:07|2023-10-31 23:59:11|
...

Notice how each file includes data all the way from the beginning to the end of the month. This means that, effectively, Iceberg cannot discard any of these files when doing one of our MERGEs! So we are still having to read the whole month!

What I was expecting to see was a total ordering inside every partition, so that we would have to read at most a couple of the files of any particular month.

I think I misread https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes, and instead of hash mode, we want range mode.

Ran:

ALTER TABLE wmf_dumps.wikitext_raw_rc2 SET TBLPROPERTIES (
    'write.distribution-mode'='range'
)

spark-sql (default)> DELETE FROM wmf_dumps.wikitext_raw_rc2;
Response code
Time taken: 8.904 seconds

Current props:

Table Properties	[current-snapshot-id=7905477679598267939,format=iceberg/parquet,write.distribution-mode=range,write.format.default=parquet,write.merge.mode=copy-on-write,write.metadata.delete-after-commit.enabled=true,write.metadata.previous-versions-max=10]

Rerunning backfill one more time! This is the lucky one!! 😹

I'm also eager to check if we run into parquet-decompression issues as I think could happen. Thanks a lot for running those experiments @xcollazo :)

Right, it is on the queue!

Ok I think I figured it out. Rerunning again with:

spark-sql (default)> DELETE FROM wmf_dumps.wikitext_raw_rc2;
Response code
Time taken: 9.864 seconds

spark-sql (default)> ALTER TABLE wmf_dumps.wikitext_raw_rc2 SET TBLPROPERTIES (
                   >     'write.target-file-size-bytes' = '134217728'
                   > );
23/12/11 20:45:27 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
Response code
Time taken: 1.177 seconds

spark-sql (default)> ALTER TABLE wmf_dumps.wikitext_raw_rc2 WRITE ORDERED BY wiki_db, revision_timestamp;
23/12/11 20:46:26 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
Response code
Time taken: 0.737 seconds

Which yields the following TBLPROPERTIES:

Table Properties	[current-snapshot-id=7198391926576962044,format=iceberg/parquet,sort-order=wiki_db ASC NULLS FIRST, revision_timestamp ASC NULLS FIRST,write.distribution-mode=range,write.format.default=parquet,write.merge.distribution-mode=range,write.merge.mode=copy-on-write,write.metadata.delete-after-commit.enabled=true,write.metadata.previous-versions-max=10,write.target-file-size-bytes=134217728]

I'll explain these settings in a second, but just documenting here for completeness.

We did not get the improvements that we were speculating on T340863#9310578, not even with merge-on-read.

This fact was very puzzling to me, until I delved deeper into the individual files. The follow query inspects the min and max values for each file of the 2023-10 partition of enwiki of the current snapshot:

# Are parquet files as we expect them to be on 2023-10 ?
spark.sql(
"""
SELECT
file_format,
partition.revision_timestamp_month,
readable_metrics['revision_timestamp']['lower_bound'] as lower_bound,
readable_metrics['revision_timestamp']['upper_bound'] as upper_bound
FROM wmf_dumps.wikitext_raw_rc2.data_files
WHERE partition.wiki_db = 'enwiki'
  AND partition.revision_timestamp_month = 645
ORDER BY lower_bound ASC
""").show(100, truncate=False)


+-----------+------------------------+-------------------+-------------------+
|file_format|revision_timestamp_month|lower_bound        |upper_bound        |
+-----------+------------------------+-------------------+-------------------+
|PARQUET    |645                     |2023-10-01 00:00:00|2023-10-31 23:59:37|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:16|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:31|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:49|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:57|
|PARQUET    |645                     |2023-10-01 00:00:01|2023-10-31 23:59:57|
|PARQUET    |645                     |2023-10-01 00:00:02|2023-10-31 23:59:54|
|PARQUET    |645                     |2023-10-01 00:00:02|2023-10-31 23:59:54|
|PARQUET    |645                     |2023-10-01 00:00:03|2023-10-31 23:59:52|
|PARQUET    |645                     |2023-10-01 00:00:03|2023-10-31 23:59:50|
|PARQUET    |645                     |2023-10-01 00:00:04|2023-10-31 23:59:07|
|PARQUET    |645                     |2023-10-01 00:00:05|2023-10-31 23:59:24|
|PARQUET    |645                     |2023-10-01 00:00:06|2023-10-31 23:59:45|
|PARQUET    |645                     |2023-10-01 00:00:06|2023-10-31 23:59:33|
|PARQUET    |645                     |2023-10-01 00:00:06|2023-10-31 23:58:35|
|PARQUET    |645                     |2023-10-01 00:00:07|2023-10-31 23:59:42|
|PARQUET    |645                     |2023-10-01 00:00:07|2023-10-31 23:59:11|
...

Notice how each file includes data all the way from the beginning to the end of the month. This means that, effectively, Iceberg cannot discard any of these files when doing one of our MERGEs! So we are still having to read the whole month!

What I was expecting to see was a total ordering inside every partition, so that we would have to read at most a couple of the files of any particular month.

I think I misread https://iceberg.apache.org/docs/latest/spark-writes/#writing-distribution-modes, and instead of hash mode, we want range mode.

It is true that range mode is necessary for our intentions, but it is not sufficient. Another thing I had missed was that the shuffle was being run as follows on a MERGE:

(3) Exchange
Input [21]: [page_id#105L, page_namespace#106, page_title#107, page_redirect_title#108, user_id#109L, user_text#110, user_is_visible#111, revision_id#112L, revision_parent_id#113L, revision_timestamp#114, revision_is_minor_edit#115, revision_comment#116, revision_comment_is_visible#117, revision_sha1#118, revision_size#119L, revision_content_slots#120, revision_content_is_visible#121, wiki_db#122, errors#123, row_last_update#124, row_visibility_last_update#125]
Arguments: rangepartitioning(wiki_db#122 ASC NULLS FIRST, icebergmonthtransform(revision_timestamp#114) ASC NULLS FIRST, 200), REPARTITION_BY_NUM, [plan_id=65]

Notice that indeed the rangepartitioning is kicking in, but it is sorting on wiki_db ASC NULLS FIRST, icebergmonthtransform(revision_timestamp) ASC NULLS FIRST ! This is shocking, but now that I think about it, it makes sense. Iceberg can only command Spark to sort on what it knows, and what it knows is that the table should be partitioned on exactly that sort order. So we need another mechanism to tell Iceberg that we would rather sort not to the month resolution, but to the whole TIMESTAMP resolution.

Thus, we need an ALTER TABLE ... WRITE ORDERED BY. This mechanism is supported by Iceberg 1.2.0 and on, and it requires Spark 3.1 and on. So we are good in both counts. Unfortunately, as of today, we cannot fold the TBLPROPERTIES that this mechanism generetes into our CREATE TABLE. Nonetheless, we can run:

ALTER TABLE wmf_dumps.wikitext_raw_rc2 WRITE ORDERED BY wiki_db, revision_timestamp;

And now, when running MERGE, we get the right sorting when shuffling:

(3) Exchange
Input [21]: [page_id#1144L, page_namespace#1145, page_title#1146, page_redirect_title#1147, user_id#1148L, user_text#1149, user_is_visible#1150, revision_id#1151L, revision_parent_id#1152L, revision_timestamp#1153, revision_is_minor_edit#1154, revision_comment#1155, revision_comment_is_visible#1156, revision_sha1#1157, revision_size#1158L, revision_content_slots#1159, revision_content_is_visible#1160, wiki_db#1161, errors#1162, row_last_update#1163, row_visibility_last_update#1164]
Arguments: rangepartitioning(wiki_db#1161 ASC NULLS FIRST, revision_timestamp#1153 ASC NULLS FIRST, 200), REPARTITION_BY_NUM, [plan_id=514]

And also the right expected file boundaries:

# how does data look?
spark.sql(
"""
SELECT
file_format,
partition.revision_timestamp_month,
readable_metrics['revision_timestamp']['lower_bound'] as lower_bound,
readable_metrics['revision_timestamp']['upper_bound'] as upper_bound
FROM xcollazo_iceberg.wikitext_raw_rc2_sorted.data_files
WHERE partition.wiki_db = 'enwiki'
  AND partition.revision_timestamp_month = 512
ORDER BY lower_bound ASC
""").show(500, truncate=False)

+-----------+------------------------+-------------------+-------------------+
|file_format|revision_timestamp_month|lower_bound        |upper_bound        |
+-----------+------------------------+-------------------+-------------------+
|PARQUET    |512                     |2012-09-01 00:00:01|2012-09-01 02:59:04|
|PARQUET    |512                     |2012-09-01 02:59:05|2012-09-01 03:54:51|
|PARQUET    |512                     |2012-09-01 03:54:52|2012-09-01 08:16:20|
|PARQUET    |512                     |2012-09-01 08:16:20|2012-09-01 08:38:24|
|PARQUET    |512                     |2012-09-01 08:38:25|2012-09-01 12:46:49|
|PARQUET    |512                     |2012-09-01 12:46:49|2012-09-01 12:51:56|
|PARQUET    |512                     |2012-09-01 12:51:57|2012-09-01 15:57:00|
|PARQUET    |512                     |2012-09-01 15:57:02|2012-09-01 16:31:00|
|PARQUET    |512                     |2012-09-01 16:31:01|2012-09-01 19:09:12|
|PARQUET    |512                     |2012-09-01 19:09:13|2012-09-01 19:49:26|
|PARQUET    |512                     |2012-09-01 19:49:27|2012-09-01 22:15:28|
|PARQUET    |512                     |2012-09-01 22:15:28|2012-09-01 23:38:38|
|PARQUET    |512                     |2012-09-01 23:38:39|2012-09-02 03:01:36|
...

The WRITE ORDERED BY comes with 2 side effects:

First, rangepartitioning() is highly sensible to spark.sql.shuffle.partitions, and since we had it set to 131,072, it did create many small files. It actually created 131072 * 23 years = ~3M files splits. This is less taxing that the ~5M we had with day partitioning (see T340863#9259279), but it is not ideal at all. This is easily tunable though. I tested 32,768 on year=2023, and it generated significantly less files with reasonable average file size:

# average file size per year
spark.sql(
"""
SELECT
avg(ceil(file_size_in_bytes/1024/1024)) as file_size_in_megas,
count(1) as count,
year(readable_metrics['revision_timestamp']['lower_bound']) as year
FROM wmf_dumps.wikitext_raw_rc2.data_files
WHERE partition.wiki_db = 'enwiki'
GROUP BY year
ORDER BY year ASC
""").show(500, truncate=False)

+------------------+-----+----+
|file_size_in_megas|count|year|
+------------------+-----+----+
|1.0               |54490|2001|
|1.0002437166793603|98475|2002|
|1.0               |64442|2003|
|1.0029809937911784|52667|2004|
|2.348815797206496 |51047|2005|
|6.299488914483156 |55568|2006|
|8.777622351184288 |52901|2007|
|9.898963090976423 |48606|2008|
|10.824583353005051|42362|2009|
|11.284475703324809|39100|2010|
|11.68925300927259 |34726|2011|
|12.751881866811877|31219|2012|
|17.596499373229953|21539|2013|
|17.657454858949183|20879|2014|
|19.38580321382039 |20723|2015|
|21.85241357669578 |19121|2016|
|25.849336362549323|16726|2017|
|28.03143227478937 |15430|2018|
|32.842958459979734|13818|2019|
|39.90892011723153 |13307|2020|
|34.52726004818055 |15774|2021|
|34.276945630696666|15689|2022|
|92.35712816292944 |4861 |2023|   <<<< Only year run with 32,768 spark partitions
+------------------+-----+----+

In fact, if applied to all years we would be at 32,768 * 23 = ~750,000 files splits, which I find reasonable if we just choose the one spark.sql.shuffle.partitions parameter for all MERGEs. We can make this fancier, and have a table per year to choose the right spark.sql.shuffle.partitions, but I will note this down as future optimization because we really need to close this task. For now, I will go with 32,768 for all years unless there is strong opposition.

Second, I though rangepartitioning() would make the backfill go slower due to its implementation details, but in fact it makes it significantly faster because we now have many tasks ( spark.sql.shuffle.partitions / 12 months ) for executors to pick up for every month, while before we would have the one executor write all of the data for a particular month serially, making the job wait a lot for the couple of partitions that take long.

In terms of hourly event ingestions, we get some beautiful numbers with range mode:

  • spark_content_merge_into takes ~6 minutes (compared to ~15 minutes when we were doing hourly partitions, and to ~70 minutes on recent runs without range mode)
  • spark_visibility_merge_into takes ~3 minutes (compared to ~3 minutes when we were doing hourly partitions, and to ~10 minutes on recent runs without range mode)

Screenshot 2023-12-12 at 3.17.08 PM.png (1×3 px, 411 KB)

For now, I will go with 32,768 for all years unless there is strong opposition.

Nevermind, this gets us back to the issue from T348772. We will have to find a lower number here.

With spark.sql.shuffle.partitions=5120, we get:

# average file size per year for all wikis
spark.sql(
"""
SELECT
ceil(avg(file_size_in_bytes/1024/1024)) as avg_file_size_in_megas,
count(1) as count,
year(readable_metrics['revision_timestamp']['lower_bound']) as year
FROM wmf_dumps.wikitext_raw_rc2.data_files
GROUP BY year
ORDER BY year ASC
""").show(500, truncate=False)

+----------------------+-----+----+
|avg_file_size_in_megas|count|year|
+----------------------+-----+----+
|1                     |4949 |2001|
|1                     |5604 |2002|
|1                     |6317 |2003|
|5                     |9366 |2004|
|17                    |11697|2005|
|37                    |15684|2006|
|47                    |17576|2007|
|50                    |18283|2008|
|50                    |19199|2009|
|51                    |19549|2010|
|50                    |19576|2011|
|51                    |19690|2012|
|51                    |19768|2013|
|48                    |19609|2014|
|52                    |20836|2015|
|58                    |20917|2016|
|62                    |22014|2017|
|70                    |24853|2018|
|74                    |26436|2019|
|73                    |27177|2020|
|73                    |26477|2021|
|78                    |29566|2022|
|75                    |24634|2023|
+----------------------+-----+----+
# total files for the current snapshot
spark.sql(
"""
SELECT
count(1) as count
FROM wmf_dumps.wikitext_raw_rc2.data_files 
""").show(500, truncate=False)

+------+
|count |
+------+
|429777|
+------+

After playing with executor sizing for a bit, we found a permutation that can compute on revision_content_slots['main'].content_body without loosing any executors due to OOM:

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.shuffle.service.name": 'spark_shuffle_3_3',
        "spark.shuffle.service.port": '7338',
        "spark.yarn.archive": "hdfs:///user/spark/share/lib/spark-3.3.2-assembly.zip",
        "spark.dynamicAllocation.maxExecutors": 128,
        "spark.driver.memory": "16g",
        "spark.executor.memory": "16g",
        ##
        # extras to make Iceberg work on 3.3.2:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
        "spark.jars.ivySettings": "/etc/maven/ivysettings.xml",  # fix jar pulling
    }
)

Example SQL exercising every single one of the revisions of all wiki time:

# let's go all in: force read all content from all wikis from all time!
# .. and do it on 1 core, 16G executors!
spark.sql(
"""
SELECT count(1)
FROM (
    SELECT length(revision_content_slots['main'].content_body) as actual_length,
           revision_content_slots['main'].content_size as advertised_length
    FROM wmf_dumps.wikitext_raw_rc2
)
WHERE actual_length != advertised_length
""").show(100, truncate=False)

+----------+
|count(1)  |
+----------+
|5530001645|
+----------+

The result set is not important (at least for the purposes of this ticket, although we should investigate!), what is important is that we are able to read the wikitext of all 6.4B rows without issue, thus giving us confidence that the current partitioning strategy, while using PARQUET with 'write.target-file-size-bytes' = '134217728' (128MB) is a useful configuration not only for the Dumps process but also for other potential users of this table.

I did see YARN containers fail with 8GB and 4GB. So we should document these minimum executor requirements as part of T347611.

CC @JAllemandou