Page MenuHomePhabricator

[WDQS Streaming Updater] Add error handling for Streaming Updater
Closed, ResolvedPublic8 Estimated Story Points

Description

As a developer I want the streaming updater to retry on errors that can be retried instead of failing so that pipeline can continue when it's possible.

AC:

  • Streaming updater pipeline is not failing because of recoverable errors
  • Calls to wikibase are retried (see current implementation of WikibaseRepository - it has a mechanism for retries already)
  • If error that fails the pipeline is unrecoverable, Flink shouldn't retry ad infinitum
  • Data errors ( e.g. revisions after page delete, constant 404 on new revision) should be logged to a file on HDFS in structured form.

Event Timeline

Zbyszko created this task.Mar 25 2020, 9:10 AM
Restricted Application added a project: Wikidata. · View Herald TranscriptMar 25 2020, 9:10 AM
Restricted Application added a subscriber: Aklapper. · View Herald Transcript
Zbyszko updated the task description. (Show Details)Mar 26 2020, 3:52 PM
Zbyszko renamed this task from Add error handling for Streaming Updater to [WDQS Streaming Updater] Add error handling for Streaming Updater.Apr 1 2020, 11:04 AM
dcausse added a subscriber: dcausse.Jul 7 2020, 2:58 PM

This task should also try to uncover improper error handling in the shared code used by the steaming updater.
One example is WikibaseRepository: when hitting a 404 while fetch RDF data it simply ignores this and returns an empty list of statements, this is far from ideal. For the streaming updater, reaching 404 might mean:

  • the streaming updater is faster that mysql replication and it hits a mysql replica that does not have this revision yet (unlikely, the min latency for now is 1min)
  • the streaming update is too late and tries to fetch a revision for an item that was deleted, very likely esp. during back-fills
Zbyszko updated the task description. (Show Details)Aug 10 2020, 3:59 PM

We need to decide our approach on possible data corruption issues as well. One that comes to mind is a rev create with revid higher than previouis page delete.

Zbyszko updated the task description. (Show Details)Aug 17 2020, 4:25 PM
TJones updated the task description. (Show Details)Aug 17 2020, 5:27 PM
Zbyszko updated the task description. (Show Details)Aug 17 2020, 5:27 PM
CBogen set the point value for this task to 8.Aug 17 2020, 5:32 PM
Zbyszko updated the task description. (Show Details)Aug 24 2020, 3:57 PM
dcausse updated the task description. (Show Details)Aug 26 2020, 9:38 AM

Removed "alerting" as I think this is too soon to tackle and should be considered when we have a sane deployment system, currently the pipeline is run manually on yarn.

dcausse added a comment.EditedAug 26 2020, 9:43 AM

We need to decide our approach on possible data corruption issues as well. One that comes to mind is a rev create with revid higher than previouis page delete.

All scenario could happen if events are received out of order and are not necessarily "data corruption", revcreate with revid > a previous page delete may happen if a page-undelete event for this revid is late.

We need to decide our approach on possible data corruption issues as well. One that comes to mind is a rev create with revid higher than previouis page delete.

All scenario could happen if events are received out of order and are not necessarily "data corruption", revcreate with revid > a previous page delete may happen if a page-undelete event for this revid is late.

Right, I wasn't aware back when writing a comment how exactly undeletes are handled - probably in most case that will be out of order events. I'm wondering how should we treat those situation - if we won't receive some intermediate revision, we simply diff the newer one received and ignore the one out of order. Should we treat page undeletes events the same way?

In any case, that's probably out of scope for this ticket.

We need to decide our approach on possible data corruption issues as well. One that comes to mind is a rev create with revid higher than previouis page delete.

All scenario could happen if events are received out of order and are not necessarily "data corruption", revcreate with revid > a previous page delete may happen if a page-undelete event for this revid is late.

Right, I wasn't aware back when writing a comment how exactly undeletes are handled - probably in most case that will be out of order events. I'm wondering how should we treat those situation - if we won't receive some intermediate revision, we simply diff the newer one received and ignore the one out of order. Should we treat page undeletes events the same way?

I don't know... I hope that for the most cases late events will properly be detected by the window function and thus logged accordingly in a side output. For other situations, serious bug in MW, changeprop failing to propagate a particular topic, machines with completely unsynced clocks or any other serious issue affecting the validity of the streams we should aim imho to detect these as much as we can and log to the spurious-events side output anything that we think is suspicious. I would not try (yet) to recover such inconsistencies but rather learn what are the most common problems and tune the stream accordingly. Baring any lost events I think we should be able to track all these problems, either because lateness or because inconsistent with the current state.

Change 622617 had a related patch set uploaded (by DCausse; owner: DCausse):
[wikidata/query/rdf@master] Do not swallow errors from WikibaseRepository

https://gerrit.wikimedia.org/r/622617

Change 622617 merged by jenkins-bot:
[wikidata/query/rdf@master] Do not swallow errors from WikibaseRepository

https://gerrit.wikimedia.org/r/622617

Change 623000 had a related patch set uploaded (by DCausse; owner: DCausse):
[wikidata/query/rdf@master] Report side outputs as structured data set

https://gerrit.wikimedia.org/r/623000

dcausse added a comment.EditedSep 1 2020, 8:23 AM

FailedOps are now queryable from spark reading a parquet file, after running one hour of backfill all errors are ENTITY_NOT_FOUND which are probably deleted entities.

scala> val df_failed = spark.read.parquet("/wmf/discovery/streaming_updater/failed_events/2020-09-01--07")
scala> df_failed.withColumn("event_time_ts", to_timestamp(col("operation.event_time"))).select("*").groupBy(year(col("event_time_ts")), month(col("event_time_ts")), dayofweek(col("event_time_ts")), col("exception_type"), col("fetch_error_type")).count().show(10, false);
+-------------------+--------------------+------------------------+-----------------------------------------------------------------+----------------+-----+
|year(event_time_ts)|month(event_time_ts)|dayofweek(event_time_ts)|exception_type                                                   |fetch_error_type|count|
+-------------------+--------------------+------------------------+-----------------------------------------------------------------+----------------+-----+
|2020               |8                   |2                       |org.wikidata.query.rdf.tool.wikibase.WikibaseEntityFetchException|ENTITY_NOT_FOUND|403  |
|2020               |8                   |3                       |org.wikidata.query.rdf.tool.wikibase.WikibaseEntityFetchException|ENTITY_NOT_FOUND|1472 |
|2020               |8                   |1                       |org.wikidata.query.rdf.tool.wikibase.WikibaseEntityFetchException|ENTITY_NOT_FOUND|1209 |
+-------------------+--------------------+------------------------+-----------------------------------------------------------------+----------------+-----+

Spurious events are also queryable, they are numerous after one hour of backfill. During backfill we may re-read a bunch of events as we set the kafka offsets of the input topic to the earliest timestamp found in the dumps and thus are expected (offsets set to 2020-08-23T04:07:58 but the all ttl dump ended at 2020-08-27T12:44:40) .

scala> val df_spurious = spark.read.parquet("/wmf/discovery/streaming_updater/spurious_events/2020-09-01--07")
scala> df_spurious.show(2, false)
+---------+--------------------+------------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------+-------+-------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|item     |event_time          |ingestion_time          |revision  |original_event_metadata                                                                                                                        |op_type|from_revision|inconsistency      |input_event                                                                                                                                                                                                                              |
+---------+--------------------+------------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------+-------+-------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Q94136325|2020-08-23T17:02:05Z|2020-09-01T07:09:02.257Z|1263886031|[c8787a61-05c8-45c3-999d-324910765a3f, 2020-08-23T17:02:05Z, mediawiki.revision-create, fb6e813f-f72d-4385-a297-0eb40c3c34d0, www.wikidata.org]|ignored|null         |newer_revision_seen|[Q94136325, 2020-08-23T17:02:05Z, 2020-09-01T07:09:02.257Z, 1263886031, revision-create, [c8787a61-05c8-45c3-999d-324910765a3f, 2020-08-23T17:02:05Z, mediawiki.revision-create, fb6e813f-f72d-4385-a297-0eb40c3c34d0, www.wikidata.org]]|
|Q4062427 |2020-08-23T17:02:10Z|2020-09-01T07:09:02.259Z|1263886064|[94d881a0-eaea-4da2-a7cf-9cf36c9bf348, 2020-08-23T17:02:10Z, mediawiki.revision-create, f3666818-26f3-45b0-b63b-126dc043aab7, www.wikidata.org]|ignored|null         |newer_revision_seen|[Q4062427, 2020-08-23T17:02:10Z, 2020-09-01T07:09:02.259Z, 1263886064, revision-create, [94d881a0-eaea-4da2-a7cf-9cf36c9bf348, 2020-08-23T17:02:10Z, mediawiki.revision-create, f3666818-26f3-45b0-b63b-126dc043aab7, www.wikidata.org]] |
+---------+--------------------+------------------------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------+-------+-------------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 2 rows


scala> df_spurious.count()
res39: Long = 1938335

No late events have been seen yet, but since we already record the original event in failed_ops I see no reason for it to fail.

  • Streaming updater pipeline is not failing because of recoverable errors: the sole recoverable errors I can think of now are transient failures of the various connection points: hdfs, kafka, checkpoint timeouts, these seem rare and could be handled using flink restart strategies. We might perhaps want to make this configurable in the future.
  • Calls to wikibase are retried (see current implementation of WikibaseRepository - it has a mechanism for retries already): wikibase fetches are retried 4 times and then they produce a FailedOp, they do not fail the pipeline
  • If error that fails the pipeline is unrecoverable, Flink shouldn't retry ad infinitum: related to the retry strategies, currently we do not allow a single unexpected failure
  • Data errors ( e.g. revisions after page delete, constant 404 on new revision) should be logged to a file on HDFS in structured form: done

Exception received while attempting a checkpoint (streaming-updater flink pipeline):

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2747 for operator RouteFailedOpsToSideOutput -> RDFPatchChunkOperation -> MeasureEventProcessingLatencyOperation -> Sink: output (1/1). Failure reason: Checkpoint was declined.
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:892)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:882)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:850)
        ... 13 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

The pipeline was able to resume properly using the last successful checkpoint, this kind of error should self correct once we enable the right retry strategy.
We could investigate increasing such timeouts and possibly see if we're affected by https://issues.apache.org/jira/browse/KAFKA-8803 but I wonder if this is worth the effort.

Change 623000 merged by jenkins-bot:
[wikidata/query/rdf@master] Report side outputs as structured data set

https://gerrit.wikimedia.org/r/623000

Gehel closed this task as Resolved.Sep 14 2020, 1:06 PM