Page MenuHomePhabricator

NEW BUG REPORT Mediawiki_history contains duplicate rows for some revisions
Closed, ResolvedPublic3 Estimated Story PointsBUG REPORT

Description

Data Platform Engineering Bug Report or Data Problem Form.

What kind of problem are you reporting?

  • Access related problem
  • Service related problem
  • Data related problem
For a data related problem:
  • Is this a data quality issue?
    • Yes
  • What datasets and/or dashboards are affected?
    • mediawiki_history and all derived datasets
  • What are the observed vs expected results? Please include information such as location of data, any initial assessments, sql statements, screenshots.
    • A snapshot of mediawiki_history should not contain more than one row for each revision. In other words, there should be no duplicate values of (wiki_db, revision_id) among revision-create events. This is the case in the 2024-05 snapshot, but in the 2024-06 snapshot there are 2.1 M rows that are duplicates of another revision. I have set the priority to high, but since these duplicates are only 0.03% of all revision-create events in the snapshots, I would not object to medium priority too.

SQL queries

Duplicates in the 2024-06 snapshot

WITH rev_id_frequencies AS (
    SELECT
        COUNT(*) AS frequency
    FROM wmf.mediawiki_history
    WHERE
        event_entity = 'revision'
        AND event_type = 'create'
        AND snapshot = '2024-06'
    GROUP BY
        wiki_db,
        revision_id
)
SELECT
    COUNT(*) AS revisions_with_duplicates_count,
    SUM(frequency - 1) AS duplicate_count
FROM rev_id_frequencies
WHERE frequency > 1
revisions_with_duplicates_count 	     duplicate_count
1080128 	                             2146208

Duplicates in the 2024-05 snapshot

WITH rev_id_frequencies AS (
    SELECT
        COUNT(*) AS frequency
    FROM wmf.mediawiki_history
    WHERE
        event_entity = 'revision'
        AND event_type = 'create'
        AND snapshot = '2024-05'
    GROUP BY
        wiki_db,
        revision_id
)
SELECT
    COUNT(*) AS revisions_with_duplicates_count,
    SUM(frequency - 1) AS duplicate_count
FROM rev_id_frequencies
WHERE frequency > 1
revisions_with_duplicates_count 	     duplicate_count
0 	                                     NULL

Event Timeline

nshahquinn-wmf triaged this task as High priority.
nshahquinn-wmf renamed this task from NEW BUG REPORT Mediawiki_history contains duplicate rows for many revisions to NEW BUG REPORT Mediawiki_history contains duplicate rows for some revisions.Jul 11 2024, 5:18 PM

This may help in diagnosing the problem: looking at the snapshot, the number of duplicates is not uniform across event_timestamp. There are almost none until 2014, and then the number generally increases until the most recent month.

(The graph isn't exactly the number of duplicates; it's the difference in monthly edit count between two version of the derived editor_month dataset, but ~95% of that difference is from the duplicate issue, so the trend you see should reflect the pattern of the duplicates.)

rough monthly duplicate count.png (432×579 px, 26 KB)

Checking:

  • wmf.mediawiki_history: duplicate revision/create records indeed exist, some have 4 copies and some 2 copies but all spot-checked duplicates come in even numbers
  • wmf_raw.mediawiki_revision: does not show the same duplication
  • analytics mysql replicas: the pages those revisions belong to were moved and had some delete/restore and delete/revision actions in the logging table
  • cloud replicas: agrees with analytics replicas

Suggestion for next steps:

Take the set of revisions that we spot-checked and found have duplicates:

(1083767163,1074554092,1145226024,926122109,1063031349,856024925,1061304076,1064886114,1198774778,890765446,1155799897,1144969417,1229151474,1064338058,1113491817,622355109,1192329952,1209336449,1151014494,1221728174,1140546791,1101399093,1054004789,1100858232,1165874742,1223798377,357949483,1054233653,1082080211,804912156,1209018663,1107974569,1066873373,1181047901,1157047169,1197701975,1019030669,1209079679,682869270,1138924079,1154166201,911668841,1186503964,856888153,893223585,890800613,1211646168,1169006233,1224091675,1057593293,1103538142,1161767123,402112345,1040608459,1124943725,732089052,1143207633,765326684,635662987,1059643146,1086175963,1087182426,870114262,731056183,976512607,779855061,1121960938,949796739,1047788230,882686398,1201383829,1124050223,845175656,1195934141,1096247250,917577582,975005793,1114910486,991204207,1086916116,925355014,806324800,1134201129,951706099,1182419689,1065635192,1102340334,1090962614,962808778,1178816273,771479470,708769801,1226462002,1187273682,950120523,1059434643,1146117094,750356607,1192535777,1191486043)

And manually run the mediawiki_history algorithm with a filtered down version of the revision table including just these (in enwiki). You'd do this by changing this query:

https://gerrit.wikimedia.org/r/plugins/gitiles/analytics/refinery/source/+/refs/heads/master/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/mediawikihistory/sql/RevisionViewRegistrar.scala#125

I tried to rerun the job for one of the small wiki_db (tetwiki) with duplicate revision record. using this command:

spark3-submit --executor-cores 2 --executor-memory 16G --driver-memory 16G \
  --name test_mediawiki_history_denormalize__denormalize_history__20240601 \
  --class org.wikimedia.analytics.refinery.job.mediawikihistory.MediawikiHistoryRunner \
  --deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.24-shaded.jar \
  --snapshot 2024-06 --mediawiki-base-path hdfs://analytics-hadoop/wmf/data/raw/mediawiki \
  --mediawiki-private-base-path hdfs://analytics-hadoop/wmf/data/raw/mediawiki_private --wikis tetwiki \
  --output-base-path hdfs://analytics-hadoop/user/ebysans/data/mediawiki/ --temporary-path \
  hdfs://analytics-hadoop/user/ebysans/tmp/mediawiki/history/checkpoints --base-num-partitions \
  64 --no-stats

In our current snapshot 2024-06, tetwiki has a revison_id 67708 with 4 frequency. After running the Runner job for just tetwiki, the test snapshot just has only 1 occurrence of revison_id 67708. See the result below:

scala> val wk_df = spark.read.parquet("hdfs://analytics-hadoop/user/ebysans/data/mediawiki/history/snapshot=2024-06")
wk_df: org.apache.spark.sql.DataFrame = [wiki_db: string, event_entity: string ... 69 more fields]

scala> wk_df.where("revision_id=67708").count()
res5: Long = 1

So this looks like a transient error @Milimetric.

So I digged further by looking at the airflow job to see if it ran twice for any reason and I think I found the culprit. The job indeed ran twice.
The first run reported this error which we've seen before in this ticket T342911. Skein job ran and failed with this error:

[2024-07-03, 21:45:43 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 239, in submit
    time.sleep(15)
  File "/usr/lib/airflow/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1633, in signal_handler
    self.task.on_kill()
  File "/usr/lib/airflow/lib/python3.10/site-packages/airflow/providers/apache/spark/operators/spark_submit.py", line 165, in on_kill
    self._hook.on_kill()
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/spark.py", line 441, in on_kill
    return self._skein_hook.on_kill()
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 317, in on_kill
    return self.stop("KILLED", "Killed by Airflow executor")
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 303, in stop
    if self.final_status() == "UNDEFINED":
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 131, in final_status
    report = self.report()
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 124, in report
    return self._client.application_report(self._application_id)
  File "/usr/lib/airflow/lib/python3.10/site-packages/skein/core.py", line 795, in application_report
    resp = self._call('getStatus', proto.Application(id=app_id))
  File "/usr/lib/airflow/lib/python3.10/site-packages/skein/core.py", line 274, in _call
    return getattr(self._stub, method)(req, timeout=timeout)
  File "/usr/lib/airflow/lib/python3.10/site-packages/grpc/_channel.py", line 1158, in __call__
    ) = self._blocking(
  File "/usr/lib/airflow/lib/python3.10/site-packages/grpc/_channel.py", line 1142, in _blocking
    event = call.next_event()
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 366, in grpc._cython.cygrpc.SegregatedCall.next_event
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 187, in grpc._cython.cygrpc._next_call_event
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 181, in grpc._cython.cygrpc._next_call_event
  File "src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi", line 78, in grpc._cython.cygrpc._latent_event
  File "src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi", line 61, in grpc._cython.cygrpc._internal_latent_event
  File "src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi", line 42, in grpc._cython.cygrpc._next
  File "/usr/lib/airflow/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 1633, in signal_handler
    self.task.on_kill()
  File "/usr/lib/airflow/lib/python3.10/site-packages/airflow/providers/apache/spark/operators/spark_submit.py", line 165, in on_kill
    self._hook.on_kill()
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/spark.py", line 441, in on_kill
    return self._skein_hook.on_kill()
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 317, in on_kill
    return self.stop("KILLED", "Killed by Airflow executor")
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 303, in stop
    if self.final_status() == "UNDEFINED":
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 131, in final_status
    report = self.report()
  File "/srv/deployment/airflow-dags/analytics/wmf_airflow_common/hooks/skein.py", line 124, in report
    return self._client.application_report(self._application_id)
  File "/usr/lib/airflow/lib/python3.10/site-packages/skein/core.py", line 795, in application_report
    resp = self._call('getStatus', proto.Application(id=app_id))
  File "/usr/lib/airflow/lib/python3.10/site-packages/skein/core.py", line 280, in _call
    raise ConnectionError("Unable to connect to %s" % self._server_name)
skein.exceptions.ConnectionError: Unable to connect to driver

2nd attempt started at :

[2024-07-03, 19:11:43 UTC] {skein.py:238} INFO - SkeinHook Airflow SparkSkeinSubmitHook skein launcher mediawiki_history_denormalize__denormalize_history__20240601 application_1719935448343_17046 status: RUNNING - Waiting until finished.

So most likely Even though airflow terminated the job due to connection error, spark job from this attempt was still running and wasn't killed before starting attempt 2. Hence the reason for the duplicates.

So I reran mediawiki_history_denormalize airflow dag to re-generate the snapshot for 2024-06 and also reran mediawiki_history_check_denormalize. I did a check using the same query @nshahquinn-wmf ran. We don't have any more duplicates.

spark-sql (default)> WITH rev_id_frequencies AS (
                   >     SELECT
                   >         COUNT(*) AS frequency
                   >     FROM wmf.mediawiki_history
                   >     WHERE
                   >         event_entity = 'revision'
                   >         AND event_type = 'create'
                   >         AND snapshot = '2024-06'
                   >     GROUP BY
                   >         wiki_db,
                   >         revision_id
                   > )
                   > SELECT
                   >     COUNT(*) AS revisions_with_duplicates_count,
                   >     SUM(frequency - 1) AS duplicate_count
                   > FROM rev_id_frequencies
                   > WHERE frequency > 1
                   > ;
revisions_with_duplicates_count duplicate_count
0       NULL

Next steps would be to rerun any affected downstream jobs.

I reran the following diwnstream airflow dags:

  • Mediawiki_history_reduced_dag
  • Edit_hourly_dag
  • geoeditors/editors_daily_monthly_dag
  • unique_editors_by_country_monthly
  • Geoeditors_edits_monthly_dag
  • Geoeditors_monthly
  • Geoeditors_public_monthly
  • Druid_load_geoeditors_monthly
  • Cassandra_load_editors_by_country_monthly
  • Druid_load_edit_hourly
Ahoelzl set the point value for this task to 3.Sep 6 2024, 8:53 PM