Page MenuHomePhabricator

Re-order and optimize change events
Closed, ResolvedPublic8 Estimated Story Points

Description

When processing change-events the update pipeline must make sure that these changes are processed in order by the ingestion pipeline.

Reordering implies some buffering that might be done either using a flink window or a custom process function using timers, there are pros and cons for both:

  • windowing might help to have fewer timers but will hold a bigger state
  • a custom process function might have to trigger more timers but might allow to optimize the events (merge) on the fly

Reordering should mainly use the rev_id to sort events and the mediawiki timestamp when the rev_id does not change (delete vs undeletes).
To efficiently store the re-ordering state the events will have to be partitioned using the key: [wiki_id, page_id].
The optimization step should merge events related to the same page with a set of rules that will have to be specified and document (possibly in this ticket).

Having an idea of how many events we could merge (deduplicate) might be interesting (trade-of between state size, latency and de-duplication efficiency).
This might be doable doing a quick analysis of the cirrusSearchLinksUpdate job backlog (from kafka jumbo) over a time window of 1, 2, 5 and 10min and see how many events can be de-duplicated.

AC:

  • the preparation job has buffering operator to properly re-order events
  • the preparation job should merge events related to a same page into a single event using some rules that will have to be documented
  • the buffering operator should have its buffer delay tunable
  • the buffering operator should expose metrics about how many events are de-duplicated

Details

TitleReferenceAuthorSource BranchDest Branch
Add deduplication to the merge steprepos/search-platform/cirrus-streaming-updater!6ebernhardsonwork/ebernhardson/dedupmain
Customize query in GitLab

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript
Gehel triaged this task as High priority.Feb 6 2023, 4:47 PM
Gehel edited projects, added Discovery-Search; removed Discovery-Search (Current work).

This might be doable doing a quick analysis of the cirrusSearchLinksUpdate job backlog (from kafka jumbo) over a time window of 1, 2, 5 and 10min and see how many events can be de-duplicated.

To get started on this i pulled the current contents of eqiad.mediawiki.job.cirrusSearchLinksUpdate from kafka-main into hdfs and ran some analysis. This analysis amounts to rounding down the timestamp to the nearest bucket and then counting duplicates over the tuple (bucket, wiki, page_title). This should be roughly the same as flink's tumbling time window. Worth noting this is a best-case scenario, assuming we can always collapse events for a page to a single event.

Number of Jobs: ~177M

intervalpruned duplicates% dropped
1 min3.1M1.7%
3 min9.5M5.4%
5 min13.3M7.5%
10 min17.5M9.9%
15 min19.6M11.1%

The same data (with more data points) in graph form:

num_duplicates_by_deduplicate_time_span.png (248×378 px, 10 KB)

This suggests to me 5 minutes is a reasonably good time to do the deduplication over and has captured much of the benefit. 10 minutes is plausible and 15 minutes doesn't seem to add enough benefit to justify the continued time extension.

After looking things over i think this is a plausible way to reorder and deduplicate the page-change events. @dcausse @pfischer thoughts?

Some considerations:

  • This deduplication is only considering the page-change stream
  • While events vary a bit, at the end of the day an event in page-change stream can only do one of two things.
    • Update the doc with the cirrusdoc content for that rev_id
    • Delete the doc
  • If an event has the same action it can be considered the same and deduplicated
  • If we have update and delete for the same rev_id the delete should take precedence
  • If we have update and delete for the same page_id they should be considered in revision order. update prior to delete is dropped, update after delete is retained as two events emit in revision order

Rough outline of what a revision based deduplication might look like:

  • Key by (wiki, page_id)
  • Window over some time range
  • Accumulate into Map<Long, InputEvent> with rev id as key`. This assumes we never need more than 1 event to represent a revision.
  • Use rules above to decide, given two events for the same rev_id, which event is retained
  • When closing the window:
    • If no deletes exist, emit one event with the most recent rev_id
    • If deletes exist drop all events with rev_id <= the delete, emit the delete, if followup updates exist emit one event with the most recent rev_id

Thanks for the analysis! I agree that 5mins sounds like a good trade-off.

After looking things over i think this is a plausible way to reorder and deduplicate the page-change events. @dcausse @pfischer thoughts?

Some considerations:

  • This deduplication is only considering the page-change stream

Pretty sure this is what you meant but just a note to clarify, In the pipeline page-change refers to changes happening to the page itself and the ones we obtain from the page-change stream, events from cirrusSearchLinksUpdate are not yet processed (T325565) by the pipeline but should be considered too, we don't have a good name for them (page-refresh or page-rerenders).

  • While events vary a bit, at the end of the day an event in page-change stream can only do one of two things.
    • Update the doc with the cirrusdoc content for that rev_id
    • Delete the doc
  • If an event has the same action it can be considered the same and deduplicated
  • If we have update and delete for the same rev_id the delete should take precedence

We might have to check how undelete are treated in the existing code, delete and undelete events will be on the same revision leaving us with only the event-time to determine proper ordering.

  • If we have update and delete for the same page_id they should be considered in revision order. update prior to delete is dropped, update after delete is retained as two events emit in revision order

Rough outline of what a revision based deduplication might look like:

  • Key by (wiki, page_id)
  • Window over some time range
  • Accumulate into Map<Long, InputEvent> with rev id as key`. This assumes we never need more than 1 event to represent a revision.

Nice! I hope we don't need to know everything and that the ordering/discarding decisions can be done incrementally

  • Use rules above to decide, given two events for the same rev_id, which event is retained
  • When closing the window:
    • If no deletes exist, emit one event with the most recent rev_id
    • If deletes exist drop all events with rev_id <= the delete, emit the delete, if followup updates exist emit one event with the most recent rev_id

What are the cases where we want the delete event to be processed if a subsequent revision is available? We could perhaps keep only one InputEvent in a reduced state?

While thinking about all this i was pondering how the windowing process works. The basic Tumbling windows are locked to the wall clock, so a 5 minute window closes at 3:00, 3:05, 3:10, etc. This means that if the first event comes in at the end of a window it wont be deduped with followup events, because they land in the next window.

Another slightly more complicated approach would be to assign windows ourselves. When an event comes in and has a valid window we put it in there, if no window exists we create a new window that closes in 5 minutes. This is functionally similar to flink's session windows, the primary difference being we set a hard limit on the length of the window. I ran a quick simulation of this strategy through spark and see the following change in deduplication rates:

intervaltumblingmodified session
11.7%2.7%
35.4%7.0%
57.5%8.8%
109.9%11.0%
1511.1%12.1%

The downside to this is likely to be requiring more Timers in flink. Honestly, I don't entirely know how many timers it's going to create or how that effects things. If it means it has a timer once a minute to flush out windows of the same minute it's probably not a concern, if it means every window gets it's own timer that might be something to at least investigate the implications of.

Some considerations:

  • This deduplication is only considering the page-change stream

Pretty sure this is what you meant but just a note to clarify, In the pipeline page-change refers to changes happening to the page itself and the ones we obtain from the page-change stream, events from cirrusSearchLinksUpdate are not yet processed (T325565) by the pipeline but should be considered too, we don't have a good name for them (page-refresh or page-rerenders).

I hadn't been considering those. In terms of deduplication i think it will be the same, the result of those events is the same action and we can deduplicate over the action space. Seems like we can union page-change into page-refresh prior to the dedup step.

  • While events vary a bit, at the end of the day an event in page-change stream can only do one of two things.
    • Update the doc with the cirrusdoc content for that rev_id
    • Delete the doc
  • If an event has the same action it can be considered the same and deduplicated
  • If we have update and delete for the same rev_id the delete should take precedence

We might have to check how undelete are treated in the existing code, delete and undelete events will be on the same revision leaving us with only the event-time to determine proper ordering.

Hmm, i hadn't considered those either. I wasn't actually sure when we could get a delete and an update, it seemed like delete was the safest option to take. I'll have to think through the sequencing, and implement test cases to verify, but it seems initial plausible we can instead always keep the event with the highest timestamp per rev_id? I dunno, i can't yet think of a counter-case but i feel like one probably exists.

  • If we have update and delete for the same page_id they should be considered in revision order. update prior to delete is dropped, update after delete is retained as two events emit in revision order

Rough outline of what a revision based deduplication might look like:

  • Key by (wiki, page_id)
  • Window over some time range
  • Accumulate into Map<Long, InputEvent> with rev id as key`. This assumes we never need more than 1 event to represent a revision.

Nice! I hope we don't need to know everything and that the ordering/discarding decisions can be done incrementally

  • Use rules above to decide, given two events for the same rev_id, which event is retained
  • When closing the window:
    • If no deletes exist, emit one event with the most recent rev_id
    • If deletes exist drop all events with rev_id <= the delete, emit the delete, if followup updates exist emit one event with the most recent rev_id

What are the cases where we want the delete event to be processed if a subsequent revision is available? We could perhaps keep only one InputEvent in a reduced state?

I was trying to think about it in terms of having the same visible state in elasticsearch as not deduplicating, although perhaps we can do better than that. If we hadn't deduplicated we would have removed the document and all secondary fields (popularity, tags, etc) and a followup would have injected fresh data into the index. But as you suggest, we could
use some context to decide that the delete is unnecessary and can be skipped. The visible output is slightly different, but we would simply have to re-provide that data later anyways.

Another slightly more complicated approach would be to assign windows ourselves.

Turns out we don't have to assign windows ourselves, we can provide WindowStagger.NATURAL:

When the first event is received in the window operator, take the difference between the start of the window and current procesing time as the offset.