Page MenuHomePhabricator

Implement job to transform mediawiki.page_content_change
Closed, ResolvedPublic5 Estimated Story Points

Description

User Story
As a data engineer, I need to build an Airflow job to transform data from event data* to Iceberg tables

* Event Data here means events in the topics mediawiki.page_content_change and mediawiki.revision_visibility_change (will do revision_visibility_change separately)

Done is:
  • Job is running on hourly schedule on Airflow
  • mediawiki.page-content-change might not be in a state where all of the data is available (we should limit the scope of this to 1 smaller wiki to make testing easier for now? We are consuming all wikis.)

Details

Other Assignee
JEbe-WMF

Related Objects

Event Timeline

JArguello-WMF renamed this task from Implement job to transform mediawiki.page-content-change to [needs to be split] Implement job to transform mediawiki.page-content-change.May 15 2023, 4:29 PM
JArguello-WMF set the point value for this task to 9.
lbowmaker renamed this task from [needs to be split] Implement job to transform mediawiki.page-content-change to [needs to be split] Implement job to transform mediawiki.page_content_change.May 16 2023, 12:14 AM
lbowmaker renamed this task from [needs to be split] Implement job to transform mediawiki.page_content_change to Implement job to transform mediawiki.page_content_change.
lbowmaker updated the task description. (Show Details)
lbowmaker changed the point value for this task from 9 to 5.
xcollazo changed the task status from Open to In Progress.Jun 5 2023, 8:29 PM
xcollazo claimed this task.
xcollazo moved this task from Next Up to In Progress on the Data Pipelines (Sprint 14) board.

Currently, only event.rc1_mediawiki_page_content_change is available as a Hive table ( rc1 = Release Candidate 1) . mediawiki.revision_visibility_change is available.

Some notes on this work so far:

  1. We now have an Airflow DAG that is good enough for reviews at https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/430.
  2. I did a bunch of test runs and the job currently takes 3-5 minutes to complete per ingested hour.
  3. As of now, we have been able to encapsulate all the required transformations from the source event.rc1_mediawiki_page_content_change table into a MERGE INTO. This SQL statement currently lives at https://gitlab.wikimedia.org/xcollazo/mediawiki-content-dump/-/blob/main/mediawiki_build_dumps_from_events_merge_into.hql.
  4. (3) above has many open questions:
    1. What should the target table database and name should be? Perhaps wmf_dumps.mediawiki_content ?
    2. As it currently stands, the MERGE INTO performance will deteriorate since we are not doing predicate pushdown on the ON clause. The issue though, is that the source data is partitioned by meta.dt ( the timestamp when the event was received ), while the target table is partitioned by dt (the timestamp when the event was produced). I figured that dt made the most sense for a table modeling a dump, since I would expect SELECTs to be based on dt as well. If we were confident about the drift between meta.dt and dt perhaps we could push down at least the year and month, but current data on event.rc1_mediawiki_page_content_change suggests drift could be years even.
    3. When a (wiki_id, revision.rev_id) pair has an event with the same dt (yes, this happens), current MERGE INTO logic deduplicates the event by choosing the 'winning' one based on changelog_kind, with delete being highest priority. Perhaps we also need deduplication logic for page_change_kind ?
    4. Speaking of page_change_kind, I noticed that one possible kind is page_change_kind = visibility_change. I confirmed that there is such data on event.rc1_mediawiki_page_content_change. So perhaps we do not need to consume mediawiki.revision_visibility_change at all since it is folded in already?

CC @JEbe-WMF, @Milimetric , @Ottomata

current data on event.rc1_mediawiki_page_content_change suggests drift could be years even.

Really? I have seen some silly events, but we might be able to discard them? T282887: Avoid accepting Kafka messages with whacky timestamps

When a (wiki_id, revision.rev_id) pair has an event with the same dt

Would be curious to know the cases for these.

perhaps we do not need to consume mediawiki.revision_visibility_change at all since it is folded in already?

page_change and page_content_change only capture visibility changes to the current revision, and the current rev's content cannot be suppressed (a new rev must be created first). mediawiki.revision_visibility_change will have visibility changes for all revisions.

current data on event.rc1_mediawiki_page_content_change suggests drift could be years even.

Really? I have seen some silly events, but we might be able to discard them? T282887: Avoid accepting Kafka messages with whacky timestamps

SELECT
    abs(to_unix_timestamp(to_timestamp(dt)) - to_unix_timestamp(to_timestamp(meta.dt)))/60/60/24/365 as drift_in_years,
    dt,
    meta.dt as meta_dt
FROM rc1_mediawiki_page_content_change
ORDER BY drift_in_years DESC LIMIT 10;

drift_in_years          dt                      meta_dt
19.72640277777778	2003-09-07T09:45:35Z	2023-05-25T13:02:53.801266Z
19.396066749112126	2004-01-14T07:09:11Z	2023-06-02T20:41:52.689117Z
18.998406360984273	2004-04-29T11:38:29Z	2023-04-24T21:40:52.27638Z
18.885413749365803	2004-08-05T13:55:41Z	2023-06-20T18:09:09.097318Z
18.882671867072553	2004-08-05T13:55:41Z	2023-06-19T18:08:01.391324Z
18.879937975646882	2004-08-05T13:55:41Z	2023-06-18T18:11:05.404038Z
18.87719641045155	2004-08-05T13:55:41Z	2023-06-17T18:10:07.403142Z
18.87445662100457	2004-08-05T13:55:41Z	2023-06-16T18:10:05.418078Z
18.871715372907154	2004-08-05T13:55:41Z	2023-06-15T18:09:17.409511Z
18.868978405631662	2004-08-05T13:55:41Z	2023-06-14T18:10:44.416111Z
Time taken: 36.701 seconds, Fetched 10 row(s)

When a (wiki_id, revision.rev_id) pair has an event with the same dt

Would be curious to know the cases for these.

These are pervasive, and they are particularly interesting to me because an Iceberg MERGE INTO will fail if they are found (as is, only one update is allowed per ON clause match):
(excuse the convoluted SELECT statement below, but I had to salt it, otherwise it takes forever:)

SELECT
count(1) as count
FROM
(
  SELECT 
    *, 
    FLOOR(RAND() * 8) AS salt 
    FROM rc1_mediawiki_page_content_change
) AS a
INNER JOIN
(
  SELECT 
    *, 
    EXPLODE(ARRAY(0,1,2,3,4,5,6,7)) AS salt 
    FROM rc1_mediawiki_page_content_change
) AS b
ON
a.wiki_id = b.wiki_id AND
a.revision.rev_id = b.revision.rev_id AND
a.meta.id != b.meta.id AND
a.dt = b.dt AND
a.salt = b.salt

count
218672499498
Time taken: 1236.364 seconds, Fetched 1 row(s)

Some example hits:

SELECT
a.meta.id as a_meta_id,
b.meta.id as b_meta_id
FROM
(
  SELECT 
    *, 
    FLOOR(RAND() * 8) AS salt 
    FROM rc1_mediawiki_page_content_change
) AS a
INNER JOIN
(
  SELECT 
    *, 
    EXPLODE(ARRAY(0,1,2,3,4,5,6,7)) AS salt 
    FROM rc1_mediawiki_page_content_change
) AS b
ON
a.wiki_id = b.wiki_id AND
a.revision.rev_id = b.revision.rev_id AND
a.meta.id != b.meta.id AND
a.dt = b.dt AND
a.salt = b.salt
LIMIT 100;

fd386604-d9b4-477e-b3aa-287b9c204da6	08bd12dc-ca62-45f6-ba6a-d8944f4ad434
fd386604-d9b4-477e-b3aa-287b9c204da6	08bd12dc-ca62-45f6-ba6a-d8944f4ad434
3feec88c-1089-4a81-85c5-7b831b3334f2	f4fd4f51-198a-459c-8641-f6f067675eda
5784ee08-6c99-4fb3-a5c8-83caf99b0bf0	e6f2c18c-5777-493d-9331-a4aecc4e5205
55d55de1-bd65-4b6c-8a3d-ff1755d42132	428ba882-7843-4db0-8624-2c6752bc1bdb
55d55de1-bd65-4b6c-8a3d-ff1755d42132	428ba882-7843-4db0-8624-2c6752bc1bdb
55d55de1-bd65-4b6c-8a3d-ff1755d42132	428ba882-7843-4db0-8624-2c6752bc1bdb
55d55de1-bd65-4b6c-8a3d-ff1755d42132	428ba882-7843-4db0-8624-2c6752bc1bdb
977ee408-d428-4cf6-9107-d37ac74de9b0	2fd90807-1b53-4c8c-8680-0dae3b216557
795b5b28-0a44-4e30-b0be-2ece1c39b99f	ea8a90fb-6ed0-4fc3-905f-453bdcd38432
9540f855-cbcd-4c29-9243-d6163c396f5c	4da403a4-8391-4118-8f82-00e48a419d3a
c9a9d101-3f9e-49a3-87c1-0280998b6917	a4118e75-84b7-40c7-900b-4235e7fa1d11
...

Zooming in on one particular example:

SELECT
  changelog_kind, dt, meta.dt as meta_dt, meta.id as meta_id, meta.request_id as meta_request_id, page_change_kind, wiki_id, datacenter, year, month, day, hour
FROM
  rc1_mediawiki_page_content_change
WHERE
  meta.id IN ('fd386604-d9b4-477e-b3aa-287b9c204da6', '08bd12dc-ca62-45f6-ba6a-d8944f4ad434')
  
changelog_kind	dt	meta_dt	meta_id	meta_request_id	page_change_kind	wiki_id	datacenteryear	month	day	hour
update	2023-04-27T15:28:43Z	2023-04-27T15:28:50.524204Z	fd386604-d9b4-477e-b3aa-287b9c204da6	d3fd1331-5ca6-4703-93cf-cf97c412b93f	edit	abwiki	eqiad	2023	4	27	15
update	2023-04-27T15:28:43Z	2023-05-03T22:07:15.937117Z	08bd12dc-ca62-45f6-ba6a-d8944f4ad434	b69457a0-d51e-47fa-9546-ac3464c127b4	edit	abwiki	eqiad	2023	5	3	22
update	2023-04-27T15:28:43Z	2023-05-03T22:07:19.614943Z	fd386604-d9b4-477e-b3aa-287b9c204da6	d3fd1331-5ca6-4703-93cf-cf97c412b93f	edit	abwiki	eqiad	2023	5	3	22
update	2023-04-27T15:28:43Z	2023-04-27T15:28:46.44396Z	08bd12dc-ca62-45f6-ba6a-d8944f4ad434	b69457a0-d51e-47fa-9546-ac3464c127b4	edit	abwiki	eqiad	2023	4	27	15
Time taken: 94.721 seconds, Fetched 4 row(s)

So this one looks like it was consumed twice by the event pipeline on 2023-04-27 and, for some reason, reconsumed twice on 2023-05-03. Note that they are all datancenter=eqiad so likely unrelated to a switchover.

Another observation is that dt's resolution is to the second. It would be nice if the resolution was bumped to nanos, like in meta.dt. Less chance of clashes.

(I just discussed a separate example with @Milimetric in which we speculated the event was triggered multiple times by MediaWiki.)

perhaps we do not need to consume mediawiki.revision_visibility_change at all since it is folded in already?

page_change and page_content_change only capture visibility changes to the current revision, and the current rev's content cannot be suppressed (a new rev must be created first). mediawiki.revision_visibility_change will have visibility changes for all revisions.

Got it!

I just had a chat with @Milimetric on the current MERGE INTO. Jotting down my to do below so that I don't forget:

  • WHEN MATCHED AND s.changelog_kind = 'insert' is problematic. Need to check if we indeed have matches for that. If so, perhaps we need to tee to an error table, or perhaps have an error column?
  • Figure out partial suppression when s.changelog_kind = 'delete', as it is not always a full delete. (See https://www.mediawiki.org/wiki/Manual:Revision_table#rev_deleted )
  • Assuming we are backfilled, we should not see WHEN NOT MATCHED AND s.changelog_kind = 'update', right ?
  • Check if I can compute a list of literals from a SELECT in the ON clause to go around full target table scan issue.

in which we speculated the event was triggered multiple times by MediaWiki

This is possible, but I think unlikely given how far apart they are. But, we really only started running rc1.mediawki.page_content_change consistently (still no real committed SLOs atm), but back in late April / early May I think we were still randomly testing things. I betcha this was me or @gmodena reprocessing history to do some stress testing / memory usage debugging.


Re old dts:

For the dt: 2004-08-05T13:55:41Z ones, Something must be causing EventBus to re-emit this revision as an edit event:
https://pl.wikipedia.org/w/index.php?title=Wikipedysta:Hensle&oldid=984818

I looked at one or two of your other examples, and they seem wrong too.

For these specific examples, I think we can reject them.

However, it does seem like we might want to support partitioning by dt anyway, especially in the cases for backfill? Whenever we get around to backfilling this dataset, events for existent pages will be created, and the latest event for most of them will be an edit with dt set to whatever the time fo the most recent edit is.

Another observation is that dt's resolution is to the second. It would be nice if the resolution was bumped to nanos, like in meta.dt. Less chance of clashes.

Oh! Agree. This is MediaWiki / EventBus's fault! Made a task: T340067: EventBus should set dt fields with greater precision than second

Figure out partial suppression when s.changelog_kind = 'delete', as it is not always a full delete. (See https://www.mediawiki.org/wiki/Manual:Revision_table#rev_deleted )

Eh? in page change streams, a delete is a page delete. We don't handle revision deletions in page change streams, as the current revision cannot be deleted.

Something must be causing EventBus to re-emit this revision as an edit event:

I'd be curious to know if these unexpected old events are also in the mediawiki_revision_create table. The fact that at least the ones for '2004-08-05T13:55:41Z' seem to be emitted at about the same time once a day smells pretty fishy.

I now have two MRs up for review:

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/430 implements the AIrflow job.

https://gitlab.wikimedia.org/xcollazo/mediawiki-content-dump/-/merge_requests/1 implements the pyspark job that manages the MERGE INTO statement.

Copying over some TODOs:

  • We don't do much flattening of the event data at all. Once we have a downstream consumer, we can start shaping the target table better.
  • We need a backfill mechanism.
  • We need to build a mechanism so that we log errors like if we ever get MATCHes that are have changelog_kind = 'insert, but we need to backfill first.
  • We need to move the pyspark project to another namespace that is not my personal one!
  • We don't do much flattening of the event data at all. Once we have a downstream consumer, we can start shaping the target table better.

Opened T340856.

  • We need a backfill mechanism.

@Ottomata points to:

I've also opened a Spike to attempt to do this with Spark and MERGE INTO: T340861. Perhaps we can discuss pros and cons on Flink vs Spark over there?

  • We need to build a mechanism so that we log errors like if we ever get MATCHes that are have changelog_kind = 'insert, but we need to backfill first.

Opened T340863,

  • We need to move the pyspark project to another namespace that is not my personal one!

I'll do this before closing this current ticket.

Change 935442 had a related patch set uploaded (by Jennifer Ebe; author: Jennifer Ebe):

[analytics/refinery@master] T335860 Implement job to transform mediawiki revision_visibility_change Hql

https://gerrit.wikimedia.org/r/935442

  • We need to move the pyspark project to another namespace that is not my personal one!

I'll do this before closing this current ticket.

Done. The pyspark project now lives at https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump.

I've also made the target table available at wmf_dumps.wikitext_raw_rc0. As mentioned upthread, we expect the schema to change considerably, but we are making this version available for folks to play with.

I realized that I had not properly created wmf_dumps.wikitext_raw_rc0, as the LOCATION as well as database and table ownership was wrong. So I deleted it and recreated as per T335305#8890255:

Executed the following to create a new HDFS folder for the wmf_dumps database:

ssh an-coord1001.eqiad.wmnet
sudo -u hdfs bash

kerberos-run-command hdfs hdfs dfs -mkdir /wmf/data/wmf_dumps
kerberos-run-command hdfs hdfs dfs -chown analytics:analytics-privatedata-users /wmf/data/wmf_dumps
kerberos-run-command hdfs hdfs dfs -chmod 755 /wmf/data/wmf_dumps


Confirming everything looks good:
hdfs@an-coord1001:/home/xcollazo$ kerberos-run-command hdfs hdfs dfs -ls /wmf/data | grep dumps
drwxr-xr-x   - analytics          analytics-privatedata-users          0 2023-07-11 19:14 /wmf/data/wmf_dumps

Fix permissions of existing Hive database:

kerberos-run-command hdfs hdfs dfs -chown analytics /user/hive/warehouse/wmf_dumps.db

hdfs@an-coord1001:/home/xcollazo$ kerberos-run-command hdfs hdfs dfs -ls /wmf/data | grep dumps
drwxr-xr-x   - analytics          analytics-privatedata-users          0 2023-07-11 19:21 /wmf/data/wmf_dumps

Now let's create the table:

ssh an-coord1001.eqiad.wmnet
sudo -u analytics bash

kerberos-run-command analytics spark3-sql

spark-sql (default)> CREATE TABLE wmf_dumps.wikitext_raw_rc0
                   > USING ICEBERG
                   > PARTITIONED BY (hours(rev_dt))
                   > TBLPROPERTIES (
                   >     'write.format.default' = 'avro' -- avoid OOMs while due to wikitext content
                   > )
                   > COMMENT 'Base to create MediaWiki full revision dumps from.'
                   > LOCATION '/wmf/data/wmf_dumps/wikitext_raw_rc0'
                   > AS SELECT
                   >     comment,
                   >     created_redirect_page,
                   >     to_timestamp(dt) as dt,
                   >     meta,
                   >     page,
                   >     page_change_kind,
                   >     performer,
                   >     prior_state,
                   >     revision,
                   >     wiki_id,
                   >     is_wmf_domain,
                   >     normalized_host,
                   >     to_timestamp(revision.rev_dt) as rev_dt
                   >   FROM event.rc1_mediawiki_page_content_change
                   >   LIMIT 0;
Response code
Time taken: 5.202 seconds

Finally, let's populate it with testing data:

kerberos-run-command analytics spark3-sql   --master yarn   --executor-memory 16G   --executor-cores 4   --driver-memory 16G   --conf spark.dynamicAllocation.maxExecutors=64

INSERT INTO wmf_dumps.wikitext_raw_rc0 SELECT * FROM xcollazo_iceberg.dumps_test_5 ORDER BY rev_dt;

Mentioned in SAL (#wikimedia-analytics) [2023-07-13T20:38:20Z] <xcollazo> deployed Airflow DAGs for analytics instance to pickup T335860

Mentioned in SAL (#wikimedia-operations) [2023-07-14T19:19:17Z] <xcollazo@deploy1002> Started deploy [airflow-dags/analytics@37d3ad6]: Run page_content_change_to_wikitext_raw DAG serially. T335860

Mentioned in SAL (#wikimedia-operations) [2023-07-14T19:19:32Z] <xcollazo@deploy1002> Finished deploy [airflow-dags/analytics@37d3ad6]: Run page_content_change_to_wikitext_raw DAG serially. T335860 (duration: 00m 14s)

Change 935442 abandoned by Jennifer Ebe:

[analytics/refinery@master] T335860 Implement job to transform mediawiki revision_visibility_change Hql

Reason:

moved to dumps gitlab repo

https://gerrit.wikimedia.org/r/935442

The first version of this job is now in production. Its output is can be found on Iceberg table wmf_dumps.wikitext_raw_rc0.

There are many tasks that were spawn of off this first attempt, discussed at T335860#8981528.

I left this job running untouched for a week and many times it failed. Some failures are related to needing more memory, others I have not had a chance to debug. But since we are going to significantly change the schema and partitioning of this table, I will not try to optimize this version. Immediate follow up work is being done at T340861.