Page MenuHomePhabricator

Design and implement tables to store parsed content from mediawiki.page_content_change
Closed, ResolvedPublic5 Estimated Story Points

Description

User Story
As a data engineer, I need to design and implement tables to store the parsed content from the mediawiki.page_content_change stream
Done is:
  • Database schema is created and reviewed - Review done in on of our code mob sessions
  • Schema is deployed

Details

Other Assignee
JEbe-WMF
TitleReferenceAuthorSource BranchDest Branch
Refactor `wikitext_raw` table to support backfillingrepos/data-engineering/dumps/mediawiki-content-dump!4xcollazooptimize-writesmain
Customize query in GitLab

Event Timeline

Milimetric updated Other Assignee, added: JEbe-WMF.
xcollazo changed the task status from Open to In Progress.Aug 17 2023, 6:05 PM

Since this task is highly coupled to T340861, we've been doing it jointly.

The current schema iteration has been reviewed in one of our code mob sessions.

This current schema, named wmf_dumps.wikitext_raw_rc1 can be found here, but copy pasted here for convenience:

CREATE TABLE xcollazo_iceberg.wikitext_raw_rc1 (
    page_id                     BIGINT    COMMENT 'id of the page',
    page_namespace              INT       COMMENT 'namespace of the page',
    page_title                  STRING    COMMENT 'title of the page ',
    page_redirect_title         STRING    COMMENT 'title of the redirected-to page',
    user_id                     BIGINT    COMMENT 'id of the user that made the revision (or -1 if anonymous)',
    user_text                   STRING    COMMENT 'text of the user that made the revision (either username or IP)',
    user_is_visible             BOOLEAN   COMMENT 'whether the user details have been suppressed or not',
    revision_id                 BIGINT    COMMENT 'id of the revision',
    revision_parent_id          BIGINT    COMMENT 'id of the parent revision',
    revision_timestamp          TIMESTAMP COMMENT 'timestamp of the revision',
    revision_is_minor_edit      BOOLEAN   COMMENT 'whether this revision is a minor edit or not',
    revision_comment            STRING    COMMENT 'comment made with revision',
    revision_comment_is_visible BOOLEAN   COMMENT 'whether the comment has been suppressed or not',
    revision_sha1               STRING    COMMENT 'Nested SHA1 hash of hashes of all content slots. See https://www.mediawiki.org/wiki/Manual:Revision_table#rev_sha1',
    revision_size               BIGINT    COMMENT 'the sum of the content_size of all content slots',
    revision_content_slots      MAP<
                                    STRING,
                                    STRUCT<content_body:   STRING,
                                           content_format: STRING,
                                           content_model:  STRING,
                                           content_sha1:   STRING,
                                           content_size:   BIGINT
                                    >
                                >         COMMENT 'a MAP containing all the content slots associated to this revision. Typically just the "main" slot, but also "mediainfo" for commonswiki.',
    revision_content_is_visible BOOLEAN   COMMENT 'whether the content in the slots has been suppressed or not',
    wiki_db                     STRING    COMMENT 'the wiki project',
    row_last_update             TIMESTAMP COMMENT 'the timestamp of the last event or backfill that updated this row'
)
USING ICEBERG
PARTITIONED BY (wiki_db, days(revision_timestamp)) -- wiki_db partitioning helps the backfill process,
                                                   -- while days(revision_timestamp) helps the hourly stream intake
TBLPROPERTIES (
    'write.format.default' = 'avro',               -- avoid OOMs due to revision_content_slots
    'write.metadata.previous-versions-max' = '10',
    'write.metadata.delete-after-commit.enabled' = 'true'
)
COMMENT 'Base to create MediaWiki full revision dumps from.'

Here is the rationale for the schema changes from wikitext_raw_rc0 to wikitext_raw_rc1:

  • Change the partitioning strategy from hours(revision_timestamp) to (wiki_db, days(revision_timestamp)).
    • The rationale for days(revision_timestamp) is that this strategy generates much less ORed predicates that we need to push down when doing the MERGE INTO. This will also help to contain the amount of files in HDFS once we call CALL spark_catalog.system.rewrite_data_files() on it.
    • The rationale for adding a wiki_db partition is to aid the backfilling process. This process touches all days(revision_timestamp) partitions and thus we need a separate mechanism that pushes down the wiki_db in order to make the backfill manageable. This way we can ingest in wiki_db groupings.
    • Since partitioning keys are orthogonal in Iceberg, this strategy, so far, gives us a good ingestion compromise.
  • Switch from parquet to avro. After discussions with the team, we figured this is safer given that content_slots contain full revisions.
  • Flatten out the schema of the target table. We now include what we believe to be the neccesary fields to make a dump out of and nothing else.
  • We introduce a helper TIMESTAMP row called row_last_updated. The idea is that it will serve as a watermark that we will bump every time we touch a particular row.
    • For streaming ingests, we will update it with meta.dt (time the event was received by the system).
    • For backfills, we will update it with the backfilling table's 'freshness date', which in the case of wmf.mediawiki_wikitext_history it happens to be snapshot (which is the dumps 1.0 release date).
    • Notice how, in the event of a stream ingest or backfill, if we have more recent data already (ie. higher watermark) then we ignore the update.

Following T335860#9006727 to deploy wmf_dumps.wikitext_raw_rc1:

ssh an-coord1001.eqiad.wmnet
sudo -u analytics bash

kerberos-run-command analytics spark3-sql

spark-sql (default)> CREATE TABLE wmf_dumps.wikitext_raw_rc1 (
                   >     page_id                     BIGINT    COMMENT 'id of the page',
                   >     page_namespace              INT       COMMENT 'namespace of the page',
                   >     page_title                  STRING    COMMENT 'title of the page ',
                   >     page_redirect_title         STRING    COMMENT 'title of the redirected-to page', 
                   >     user_id                     BIGINT    COMMENT 'id of the user that made the revision (or -1 if anonymous)',
                   >     user_text                   STRING    COMMENT 'text of the user that made the revision (either username or IP)',
                   >     user_is_visible             BOOLEAN   COMMENT 'whether the user details have been suppressed or not',
                   >     revision_id                 BIGINT    COMMENT 'id of the revision',
                   >     revision_parent_id          BIGINT    COMMENT 'id of the parent revision',
                   >     revision_timestamp          TIMESTAMP COMMENT 'timestamp of the revision',
                   >     revision_is_minor_edit      BOOLEAN   COMMENT 'whether this revision is a minor edit or not',
                   >     revision_comment            STRING    COMMENT 'comment made with revision',
                   >     revision_comment_is_visible BOOLEAN   COMMENT 'whether the comment has been suppressed or not',
                   >     revision_sha1               STRING    COMMENT 'Nested SHA1 hash of hashes of all content slots. See https://www.mediawiki.org/wiki/Manual:Revision_table#rev_sha1',
                   >     revision_size               BIGINT    COMMENT 'the sum of the content_size of all content slots',
                   >     revision_content_slots      MAP<
                   >                                     STRING,
                   >                                     STRUCT<content_body:   STRING,
                   >                                            content_format: STRING,
                   >                                            content_model:  STRING,
                   >                                            content_sha1:   STRING,
                   >                                            content_size:   BIGINT
                   >                                     >
                   >                                 >         COMMENT 'a MAP containing all the content slots associated to this revision. Typically just the "main" slot, but also "mediainfo" for commonswiki.',
                   >     revision_content_is_visible BOOLEAN   COMMENT 'whether the content in the slots has been suppressed or not',
                   >     wiki_db                     STRING    COMMENT 'the wiki project',
                   >     row_last_update             TIMESTAMP COMMENT 'the timestamp of the last event or backfill that updated this row'
                   > )
                   > USING ICEBERG
                   > PARTITIONED BY (wiki_db, days(revision_timestamp)) -- wiki_db partitioning helps the backfill process,
                   >                                                    -- while days(revision_timestamp) helps the hourly stream intake
                   > TBLPROPERTIES (
                   >     'write.format.default' = 'avro',               -- avoid OOMs due to revision_content_slots
                   >     'write.metadata.previous-versions-max' = '10',
                   >     'write.metadata.delete-after-commit.enabled' = 'true'
                   > )
                   > COMMENT 'Base to create MediaWiki full revision dumps from.';
Response code
Time taken: 23.802 seconds

Now let's populate it with testing data for simplewiki:

kerberos-run-command analytics spark3-sql   --master yarn   --executor-memory 16G   --executor-cores 4   --driver-memory 16G   --conf spark.dynamicAllocation.maxExecutors=64

INSERT INTO wmf_dumps.wikitext_raw_rc1 SELECT * FROM xcollazo_iceberg. wikitext_raw_rc1 WHERE wiki_db='simplewiki' ORDER BY wiki_db, revision_timestamp;
...
Response code
Time taken: 220.514 seconds