Reconcile stream name: mediawiki.dump.revision_history.reconcile.v1
(See T368753 for more details.)
Reconcile stream name: mediawiki.dump.revision_history.reconcile.v1
(See T368753 for more details.)
| Title | Reference | Author | Source Branch | Dest Branch | |
|---|---|---|---|---|---|
| Pick up repos/data-engineering/dumps/mediawiki-content-dump!48. | repos/data-engineering/airflow-dags!930 | xcollazo | fix-emit-dt-bug | main | |
| Fix TO_TIMESTAMP() format parameters. | repos/data-engineering/dumps/mediawiki-content-dump!48 | xcollazo | fix-emit-dt-issue | main | |
| Fix comment for reconcile_emit_dt column on create-wmf_dumps_wikitext_inconsistent_rows.hql | repos/data-engineering/dumps/mediawiki-content-dump!47 | xcollazo | fix-typo | main | |
| Job to submit reconcile events to Kafka via wmf-event-stream | repos/data-engineering/dumps/mediawiki-content-dump!46 | xcollazo | emit-via-wmf-event-stream | main | |
| Dumps 2 reconcile: Add task to emit reconcile events to kafka | repos/data-engineering/airflow-dags!914 | xcollazo | add-emit-reconcile-events-to-kafka | main | |
| Draft: Job to submit reconcile events to EventGate | repos/data-engineering/dumps/mediawiki-content-dump!32 | xcollazo | call-eventbus | main |
| Status | Subtype | Assigned | Task | ||
|---|---|---|---|---|---|
| Resolved | xcollazo | T358877 Dumps 2.0 Phase II: Production intermediate table milestone | |||
| Resolved | xcollazo | T358373 [Dumps 2] Reconciliation mechanism to detect and fetch missing/mismatched revisions | |||
| Resolved | xcollazo | T368753 Implement production mechanism that emits (wiki_db, revision_id) pairs for missing or inaccurate rows | |||
| Resolved | xcollazo | T368755 Python job that reads from wmf_dumps.wikitext_inconsistent_row and produced reconciliation events. | |||
| Resolved | xcollazo | T378122 Table maintenance for wmf_dumps.wikitext_inconsistent_row is failing | |||
| Duplicate | None | T379968 noc.wikimedia.org is slow and it times out sporadically | |||
| Resolved | JMeybohm | T380142 Reimaging a kubernetes control-plane invalidates service-account tokens issued by it |
xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/32
Draft: Job to submit reconcile requests to EventBus.
Change #1073855 had a related patch set uploaded (by Xcollazo; author: Xcollazo):
[operations/mediawiki-config@master] Declare stream 'mediawiki.dump.revision_history.reconcile.v1.rc0'
Mentioned in SAL (#wikimedia-operations) [2024-09-18T17:29:16Z] <sukhe> re-enable puppet on A:cp to finish rolling out T368755
Change #1073855 merged by jenkins-bot:
[operations/mediawiki-config@master] Declare streams in support of the reconciliation mechanism for Dumps 2.0.
Change #1075885 had a related patch set uploaded (by Gmodena; author: Gmodena):
[operations/mediawiki-config@master] EventStreamConfig: remove topic prefixes from dumps stream.
Change #1075885 abandoned by Gmodena:
[operations/mediawiki-config@master] EventStreamConfig: remove topic prefixes from dump streams.
Reason:
Removing prefixes will break some implicit analytics' jobs requirements.
Will wait until T374341: Add support for Spark producers in Event Platform lands to continue this work.
xcollazo closed https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/32
Draft: Job to submit reconcile events to EventGate
xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/46
Job to submit reconcile events to Kafka via wmf-event-stream
From MR 46:
In this MR we implement a job to submit reconcile events to Kafka via wmf-event-stream.
The job works as follows:
- First, we read wikitext_inconsistent_rows for a particular compute_dt and we categorize them as follows: inconsistent revision, missing page delete, missing page move.
- Based on the categorization, we proceed to collect affected revision_ids or page_ids.
- For inconsistent revisions, we hit the Analytics Replica and fetch the latest metadata available for that revision.
- For missing page moves, we figure the latest revision of that page, and hit the Analytics Replica and fetch the latest revision metadata, which includes the page metadata.
- For missing page deletes we do not need to hit the Replica, we already have the page_id in hand.
- All the above then gets converted into page_content_change events that mimic being page_change_kind = edit | move | delete.
- Finally, these events are emitted via the new wmf-event-stream Spark Data Source sink.
We were able to do much all of it in SQL, which I think makes the code more readable than if it was pyspark, but alas, it is still a complex transformation.
While testing MR 46, I suddenly started having a new mismatched schema failure :
error: instance type (number) does not match any allowed primitive type (allowed: ["integer"])
level: "error"
schema: {"loadingURI":"#","pointer":"/properties/revision/properties/rev_id"}
instance: {"pointer":"/revision/rev_id"}
domain: "validation"
keyword: "type"
found: "number"
expected: ["integer"]
error: instance type (number) does not match any allowed primitive type (allowed: ["integer"])
level: "error"
schema: {"loadingURI":"#","pointer":"/properties/revision/properties/rev_parent_id"}
instance: {"pointer":"/revision/rev_parent_id"}
domain: "validation"
keyword: "type"
found: "number"
expected: ["integer"]Which is weird, because I had not hit that before. I chased it down in Spark, and found that it was coming from the analytic replicas, and in Spark the schema coming from our replica queries now looks like so:
root |-- page_id: long (nullable = true) |-- page_namespace: integer (nullable = true) |-- page_title: string (nullable = true) |-- user_id: decimal(20,0) (nullable = true) <---------- |-- user_text: string (nullable = true) |-- user_is_visible: integer (nullable = true) |-- revision_id: decimal(20,0) (nullable = true) <---------- |-- revision_parent_id: decimal(20,0) (nullable = true) <---------- |-- mw_revision_timestamp: binary (nullable = true) |-- revision_is_minor_edit: integer (nullable = true) |-- revision_comment: string (nullable = true) |-- revision_comment_is_visible: integer (nullable = true) |-- revision_sha1: string (nullable = true) |-- revision_size: long (nullable = true) |-- slot_role_name: string (nullable = true) |-- slot_content_model: string (nullable = true) |-- slot_content_sha1: string (nullable = true) |-- slot_content_size: long (nullable = true) |-- revision_content_is_visible: integer (nullable = true)
None of the highligthed columns used to be decimal(20,0); they were longs, just like page_id.
It turns out that my test wiki, itwiki, was being migrated as part of T367856: Cleanup revision table schema. TL;DR here is that these ALTERs are being run on all wikis:
ALTER TABLE /*_*/revision CHANGE rev_id rev_id BIGINT UNSIGNED AUTO_INCREMENT NOT NULL, CHANGE rev_comment_id rev_comment_id BIGINT UNSIGNED NOT NULL, CHANGE rev_actor rev_actor BIGINT UNSIGNED NOT NULL, CHANGE rev_parent_id rev_parent_id BIGINT UNSIGNED DEFAULT NULL;
A BIGINT UNSIGNED translates in Spark to a DECIMAL(20,0), and in Iceberg there is no support for BIGINT UNSIGNED, but there is support for DECIMAL(20,0). Therefore, it looks like we will need to change our schema for all these ids. I won't do that as part of this ticket though, will do so separately.
xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/914
Dumps 2 reconcile: Add task to emit reconcile events to kafka
xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/46
Job to submit reconcile events to Kafka via wmf-event-stream
Mentioned in SAL (#wikimedia-operations) [2024-11-18T17:25:49Z] <xcollazo@deploy2002> Started deploy [airflow-dags/analytics@16a5867]: Deploy latest DAGs to analytics Airflow instance. T368755.
Mentioned in SAL (#wikimedia-operations) [2024-11-18T17:27:59Z] <xcollazo@deploy2002> Finished deploy [airflow-dags/analytics@16a5867]: Deploy latest DAGs to analytics Airflow instance. T368755. (duration: 02m 10s)
Mentioned in SAL (#wikimedia-analytics) [2024-11-18T17:28:12Z] <xcollazo> Deployed latest DAGs to analytics Airflow instance. T368755.
Ok after a hiatus due to multiple Airflow issues related to T380222: Analytics airflow instance not showing any DAGs, I am back to finish deploying this.
The work from MR 46 requires the following DDL to be run in prod:
ALTER TABLE wmf_dumps.wikitext_inconsistent_rows_rc1 ADD COLUMN reconcile_emit_dt TIMESTAMP COMMENT 'the time at which this inconsistency was emitted to EventGate for eventual reconcile. If NULL, it has not been submitted yet.'
Ran the following as analytics:
$ spark3-sql spark-sql (default)> ALTER TABLE wmf_dumps.wikitext_inconsistent_rows_rc1 > ADD COLUMN reconcile_emit_dt TIMESTAMP COMMENT 'the time at which this inconsistency was emitted to EventGate for eventual reconcile. If NULL, it has not been submitted yet.'; 24/11/19 20:10:56 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up Response code Time taken: 12.36 seconds
Quick check:
spark-sql (default)> select * from wmf_dumps.wikitext_inconsistent_rows_rc1 limit 1; wiki_db page_id revision_id revision_timestamp reasons computation_dt computation_window_min_dcomputation_window_max_dt reconcile_emit_dt nlwiktionary 1235023 5204816 2024-11-18 06:07:30 ["missing_from_target"] 2024-11-19 00:00:00 2024-11-18 00:00:00 2024-11-19 00:00:00 NULL Time taken: 23.708 seconds, Fetched 1 row(s)
We good.
xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/47
Fix comment for reconcile_emit_dt column on create-wmf_dumps_wikitext_inconsistent_rows.hql
xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/47
Fix comment for reconcile_emit_dt column on create-wmf_dumps_wikitext_inconsistent_rows.hql
xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/48
Fix TO_TIMESTAMP() format parameters.
xcollazo updated https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/930
Pick up repos/data-engineering/dumps/mediawiki-content-dump!48.
The failures were all on schema mismatch:
error: object has missing required properties (["dt"])
level: "error"
schema: {"loadingURI":"#","pointer":""}
instance: {"pointer":""}
domain: "validation"
keyword: "required"
required: ["$schema","changelog_kind","dt","meta","page","page_change_kind","revision","wiki_id"]
missing: ["dt"]The following SQL statement is a simplified replica query on arwiki, one of the wikis that failed:
SELECT
p.page_id AS page_id,
CAST(r.rev_id AS SIGNED) AS revision_id, -- CAST is a workaround until T368755 is solved.
r.rev_timestamp AS mw_revision_timestamp
FROM revision r
INNER JOIN page p ON r.rev_page = p.page_id
WHERE r.rev_id IN (68598618, 68598776, 68598778, 68598881, 68598967, 68598994, 68599001, 68599097, 68599120, 68599123, 68599155, 68599434, 68599654, 68599656, 68600038, 68600178, 68600838, 68600840, 68601110, 68601208, 68601211, 68601402, 68601404, 68601405, 68601637, 68601724, 68601726, 68601727, 68601880, 68601934, 68602756, 68603347, 68603369, 68603442, 68604030, 68604405, 68604408, 68604409, 68605586, 68605616, 68605838, 68605839, 68605892, 68605952, 68606011, 68606070, 68606072, 68606107, 68606134, 68606218, 68606224, 68606225, 68606235, 68606314, 68606317, 68606324, 68606332, 68606428, 68606429, 68606488, 68606490, 68606503, 68606565, 68606567, 68606568, 68606571, 68606572, 68606594, 68606607, 68606648, 68606760, 68606775, 68606801, 68606824, 68606826, 68606827, 68606888, 68606955, 68607044, 68607101, 68607120, 68607127, 68607192, 68607193, 68607233, 68607263, 68607321)mw_revision_timestamps all look legit:
+---------+-------------+-----------------------+ | page_id | revision_id | mw_revision_timestamp | +---------+-------------+-----------------------+ | 8288132 | 68598618 | 20241118003616 | | 9253391 | 68598776 | 20241118014337 | ... | 6914682 | 68606826 | 20241118214128 | | 9085744 | 68607120 | 20241118230839 | | 4148553 | 68607192 | 20241118231744 | | 3719588 | 68607193 | 20241118231744 | | 9761461 | 68607263 | 20241118233734 | +---------+-------------+-----------------------+ 58 rows in set (0.002 sec)
Thus the issue does not seem to arise from a bad data coming from MariaDB. Looking further into the pipeline, we have the something in the lines of this Spark SQL:
spark.sql("""
SELECT
revision_id,
FIRST(TO_TIMESTAMP(mw_revision_timestamp, 'yyyyMMddkkmmss')) AS revision_timestamp
FROM flat_inconsistent_revision_ids_source
GROUP BY
revision_id
""").show(100)
+-----------+-------------------+
|revision_id| revision_timestamp|
+-----------+-------------------+
| 68600838|2024-11-18 09:38:32|
| 68606567|2024-11-18 20:20:21|
...
| 68606218|2024-11-18 18:54:08|
| 68606070|2024-11-18 18:23:16|
| 68606490|2024-11-18 19:58:11|
| 68598618| null|
| 68601211|2024-11-18 10:03:00|
| 68598994|2024-11-18 03:01:42|
| 68603442|2024-11-18 14:40:52|
...
| 68606107|2024-11-18 18:32:05|
| 68601404|2024-11-18 10:20:15|
| 68607192|2024-11-18 23:17:44|
| 68598881|2024-11-18 02:08:59|
| 68606235|2024-11-18 18:57:32|
+-----------+-------------------+Bingo. Back to MariaDB to confirm that particular revision:
MariaDB [arwiki]> select * from revision where rev_id = 68598618; +----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+ | rev_id | rev_page | rev_comment_id | rev_actor | rev_timestamp | rev_minor_edit | rev_deleted | rev_len | rev_parent_id | rev_sha1 | +----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+ | 68598618 | 8288132 | 50695258 | 18171 | 20241118003616 | 1 | 0 | 1678 | 53918094 | onz37c5znafeupcdfyfwt7t54byar98 | +----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+
The timestamp looks legit.
Turns out the issue was user error on the TO_TIMESTAMP() format parameter. We were sending in yyyyMMddkkmmss, while the correct format is yyyyMMddHHmmss, with HH having a range of 00-23 for the hours, while the old kk had a range of 01-24. Whoops!
Fixing this resolves the issue:
spark.sql("""
SELECT
revision_id,
CAST(mw_revision_timestamp AS STRING) as mw_revision_timestamp_as_string,
TO_TIMESTAMP(mw_revision_timestamp, 'yyyyMMddkkmmss') AS revision_timestamp_kk,
TO_TIMESTAMP(mw_revision_timestamp, 'yyyyMMddHHmmss') AS revision_timestamp_HH
FROM flat_inconsistent_revision_ids_source
WHERE revision_id = 68598618
""").show(100)
+-----------+-------------------------------+---------------------+---------------------+
|revision_id|mw_revision_timestamp_as_string|revision_timestamp_kk|revision_timestamp_HH|
+-----------+-------------------------------+---------------------+---------------------+
| 68598618| 20241118003616| null| 2024-11-18 00:36:16|
+-----------+-------------------------------+---------------------+---------------------+xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/merge_requests/48
Fix TO_TIMESTAMP() format parameters.
xcollazo merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/930
Pick up repos/data-engineering/dumps/mediawiki-content-dump!48.