Page MenuHomePhabricator

[Event Platform] Design and Implement realtime enrichment pipeline for MW page change with content
Open, Needs TriagePublic

Description

User Story
As a platform engineer, I need to design, implement and deploy a Flink service that listens to existing event streams, consolidates them, enriches the event and outputs to Kafka
The service must:
  • Call MW API to get the wikitext for the article
  • Format the input stream data and wikitext into the new topic format
  • Output the formatted data to a new Kafka topic
Expected Spikes:
  • Data modeling exercise for new consolidated stream
  • Others?
Why are we doing this?
  • Simplify event stream consumption. Consumers can listen to a single stream that represent the state of a page rather than a page action (current design)
  • Adding content to streams to make them usable by consumers without having to enrich themselves

Related Objects

StatusSubtypeAssignedTask
Opengmodena
OpenOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
Resolveddcausse
OpenNone
Resolvedgmodena
Resolvedgmodena
ResolvedOttomata
OpenOttomata
OpenNone
OpenOwenRB
OpenNone

Event Timeline

Let's keep in mind the potential of a refactor of the input streams as well. Since we know now that entity based change streams are better, it'd be much nicer if MW produced e.g. page-create, delete and edit all in the same stream. Such a refactor is not part of this PoC, but we should write stream processing functions so that they could easily be moved an entity based stream input later.

Also q: would it be worth doing this in Flink Python? I'm sure Scala is "better", but we might get better adoption and participation if we can do it in Python. I'd expect this to work fine for simple enough enrichment jobs like this one.

any others? Undelete?

Probably. Let's ask @dcausse, @RBrounley_WMF and @Protsack.stephan

Likely:

  • mediawiki.revision-visibility-change
  • mediawiki.page-undelete
  • mediawiki.page-suppress
  • mediawiki.page-move

any others? Undelete?

Probably. Let's ask @dcausse, @RBrounley_WMF and @Protsack.stephan

Likely:

  • mediawiki.revision-visibility-change
  • mediawiki.page-undelete
  • mediawiki.page-suppress
  • mediawiki.page-move

I'm not convinced that you would need revision-visibility-change as it's emitted when hiding past revision, removing current revision is done through page-suppress.

Oh ho, Table API Async? https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/functions/AsyncTableFunction.html

I found this via this blog post (https://github.com/getindata/flink-http-connector), which implements a HTTP REST Table connector, allowing for SQL join of a streaming Table with a HTTP REST API. Pretty cool!(?)

@gmodena Q about async http to MW API. We def need it for the bootstrapping case (although I hope if bootstrapping we can get the content from a non MW source (HDFS? shared data platform Object Store?), but do we need/want it for the regular realtime case?

Since we are aiming for an page entity based stream, I'd guess we'd key this stream on page id, and aim to have events ordered by revision within that stream topic partition in kafka. Can we accomplish this with the async function? I suppose, since the input streams aren't necessarily ordered by revision ID now, it doesn't matter, but will it later? Will we need to reorder the output if we do async?

From my side I would maybe consider simplifying things for the user (if possible) and making something like:

  • mediawiki.page-update - where we have page creations and updates (new revisions) in the same stream
  • mediawiki.page-delete - for the page deletions
  • mediawiki.revision-visibility-change

It's not much of a different from the original, but looks little bit more simple in my mind.
If I would limit this to some kind of MVP I think it's a good first step.
Original is awesome as well and it will cover the same use cases. In my mind making things simpler will encourage more people to use the stream.

+1, I think we might even prefer page-deletes rolled into the same stream too. So a mediawiki.page-change, ideally modeling it as a kind of 'changelog' stream:

Let's keep in mind the potential of a refactor of the input streams as well. Since we know now that entity based change streams are better, it'd be much nicer if MW produced e.g. page-create, delete and edit all in the same stream. Such a refactor is not part of this PoC, but we should write stream processing functions so that they could easily be moved an entity based stream input later.

If by input streams, you mean altering mediawiki to also emit the wikitext (judging from the description of the task, this is this the main alteration of event) in the same event (could be an event in a new stream if you care about not breaking backwards compatibility), then 100 times YES!!!!

Making mediawiki emitting richer events would obviate the need for spinning up a new service, leading to saving human time running and debugging it.

Yup! WIP now! Our plan is to make mediawiki emit a page_change stream without any content. We will have stream processors that enrich this stream with extra data, like raw content (wikitext, wikidata json), or parsed html, etc.

Yup! WIP now! Our plan is to make mediawiki emit a page_change stream without any content. We will have stream processors that enrich this stream with extra data, like raw content (wikitext, wikidata json), or parsed html, etc.

Cool, that sounds interesting. I suppose one of the stream processors will be mediawiki itself? Please not via some external actor to mediawiki actor and an API call, but rather via some extension...

I can envision also other processors too, e.g. enriching with ORES like scores, suggestions, tags, kv pairs, etc. Those would necessitate an external to mediawiki actor and would be the more generalized form whereas the above suggestion of mediawiki doing it itself for wikitext would be the architectural optimization that would save us from headaches down the road.

MediaWiki Event Carried State Transfer: Comprehensiveness - Decision Record

I suppose one of the stream processors will be mediawiki itself?

The current plan is to produce page state change stream (good context in T308017) from MediaWiki via EventBus. A Flink stream processor will consume this stream ask MW for content, and produce a new enriched stream. We chose not to produce this directly from MediaWiki because we believe there are going to be a lot of enrichment pipelines like this one, and wanted to build expertise and best practices and tooling to make this easier to do in the future.

Change 851673 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/mediawiki-config@master] Declare rc0.mediawiki.page_content_change stream

https://gerrit.wikimedia.org/r/851673

Change 851673 merged by jenkins-bot:

[operations/mediawiki-config@master] Declare rc0.mediawiki.page_content_change stream

https://gerrit.wikimedia.org/r/851673

Mentioned in SAL (#wikimedia-operations) [2022-11-01T19:15:30Z] <otto@deploy1002> Synchronized wmf-config/InitialiseSettings.php: Declare rc0.mediawiki.page_content_change stream - T307959 T308017 (duration: 03m 42s)

Change 852909 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/mediawiki-config@master] Set eventgate service for rc0.mediawiki.page_content_change stream

https://gerrit.wikimedia.org/r/852909

Change 852909 merged by jenkins-bot:

[operations/mediawiki-config@master] Set eventgate service for rc0.mediawiki.page_content_change stream

https://gerrit.wikimedia.org/r/852909

Ottomata renamed this task from [Shared Event Platform] Design and Implement POC Flink Service to Combine Existing Streams, Enrich and Output to New Topic to [Event Platform] Design and Implement realtime enrichment pipeline for MW page change with content.Mon, Jan 23, 6:18 PM
Ottomata updated the task description. (Show Details)

I archived the mediawiki-stream-enrichment repo associated with this task. That contained a Scala PoC that help us inform the current (WIP) Python implementation deployed on DSE.