Page MenuHomePhabricator

Flink job to enrich reconciliation events
Closed, ResolvedPublic

Description

On T368782: MediaWiki Reconciliation API, we will be emitting a new kind of 'reconciliation' event. Hopefully the schema of such event is exactly the same as the page change event.

In this task, we should create a Flink job that will:

  • Consume this new event stream
  • Create an enriched version of that includes the content slots, hopefully exactly the same as mediawiki_page_content_change_v1 (could not find schema?).
  • A separete Gobblin process should make this stream available as a Hive Table under the event Hive database.

Requirements

This flink enrichment job should target the DSE cluster. This new k8s service will:

  • not be publicly accessible via a <appname>.wikimedia.org subdomain.
  • not require users logging in.
  • consume/produce from/to kafka jumbo.
  • _may_ need to consume from kafka main (@gmodena to clarify this requirement).
  • need to reach MediaWiki Action endpoints (via internal routes).

Resources and expected load

Some initial estimates. They might need some refinements as we go.

  • Flink topology: one job manager and two task managers, managed by the Flink k8s operator. Each will be allocated a dedicated k8s pod.
  • 1G memory allocate to job manager, and 1.5G allocated to taskmanagers should be safe defaults.
  • Expected load: significantly lower than mw-page-content-change-nerich. This app will consume events from a topic produced into a batch daily (hopefully hourly process). The worst case scenario estimated so fare is 100k requests/hour.
  • Flink HA. TBC. In its first iteration, we'll probably won't need HA for this job. We might still want an object store to snapshot kafka offsets. We could experiment with snapshotting to CEPH if available.

Actions

The Dumps team will

  • Provide Data SREs a namespace name.
  • Setup a new job in the mediawiki-event-enrichment repo, for integration with Deployment pipeline.
  • Add helmfile/values to deployment-charts, based atop the flink-app Helm chart.
  • Add new input output streams to EventStreamConfig (the Gobblin consumer will be enabled by default)/

As discussed in slack, the following steps will require Data SRE actions.

  • create the namespace
  • create the read/deploy credentials

Details

Other Assignee
xcollazo
Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
content_history: import from absolute path.repos/data-engineering/mediawiki-event-enrichment!85gmodenafix-module-importmain
blubber: append app base dir to PYTHONPATH.repos/data-engineering/mediawiki-event-enrichment!84gmodenafix-docker-pythonpathmain
mediawiki-event-enrichment: remove dead code.repos/data-engineering/mediawiki-event-enrichment!83gmodenacleanupmain
Add mediawiki_content_history reconciliation app.repos/data-engineering/mediawiki-event-enrichment!82gmodenadumps-reconciliationmain
Customize query in GitLab

Event Timeline

@gmodena and @Ottomata the description above is just me thinking out loud. Kindly please modify as you see fit.

Presumably we should be able to reuse all the work done already for mediawiki_page_content_change_v1?

We _might_ be able to do this in the existent mw-page-content-change-enrich job, but it won't be as straightforward as a making a new enrichment job. Having these in the same job would be a nice general pattern to support though. We should at least look into it to see how difficult it will be.

@gmodena and @Ottomata the description above is just me thinking out loud. Kindly please modify as you see fit.

We started some discussion about this in https://phabricator.wikimedia.org/T358373#9883792. Let's move it here to keep things in scope.
After some thought, I am less keen on reusing the mw-page-content-change-enrich namespace than I was at first.

There are two aspect to code organization:

  1. The place where the business logic lives. Back in the day, we designed mediawiki-event-enrichment as a monorepo for several enrichment codebases. The way CI and builds are configured, we can add a new enrichment job for reconciliation that re-uses shared logic, and publish a docker image that bundles this new reconciliation app. The docker image would be the same as mw-page-content-change-enrich, but with a different entry point (if needed). The jobs might end up being the same docker image, but with different parametrization (input / output streams).
  1. k8s deployments. We follow a convention where each application is assigned a dedicated namespace. Re-using the existing mw-page-content-change-enrich job would mean re-using its namespace for Dumps reconciliation. Not against if extending scope of mw-page-content-change-enrich makes sense, but I am not sure yet that it does. The two seem similar in spirit (enrichment logic), but might have different enough RACIs. The monorepo was built to support exactly this kind of scenario. From an operational point of view I'd rather keep reconciliation and content (wikitext) enrichment as separate concerns (and deployments / deployers pool). This would mean reusing (or extending) mw-page-content-change-enrich, but standing up a dedicated instance in k8s (f/up from https://phabricator.wikimedia.org/T368745#9939817).

As a side note, if were to refactor mw-page-content-change-enrich, I would not feel comfortable deploying changes straight to wikikube. With mw-page-content-change-enrich we did a pretty good job in meeting SLOs, and I'd like to keep things like this :)

Consume this new event stream

We are still spiking how the stream will be produced (evengate vs. spark).

But can we assume the stream will be produced directly into jumbo, and won't have multi dc / replication requirements?

But can we assume the stream will be produced directly into jumbo, and won't have multi dc / replication requirements?

I think we decided we want to do reconcilliation of page_change and page_content_change in general, so that it can be used for Search and others. The consumed 'mediawiki.page_change.reconciled.v1' or whatever stream will be produced to kafka-main. mw-page-content-change-enrich runs in wikikube and produces to kafka-jumbo. I'd be inclined to do the same for mw-page-content-change-reconciled-enrich too (naming TBD of course).

But can we assume the stream will be produced directly into jumbo, and won't have multi dc / replication requirements?

I think we decided we want to do reconcilliation of page_change and page_content_change in general, so that it can be used for Search and others.

We discussed this scenario, but there is no decision yet AFAIK.

From the last sync: @Milimetric is exploring (again, no decision yet) using spark + maridb replicas (instead of a mw endpoint) to produce the input stream for this enrichment job (records that had missing wikitext or inaccurate rows). That application would run on hadoop and would (I assume) produce to jumbo. In this scenario we _could_ run the Flink job single DC on DSE (see slack thread).

Another point raised by @Milimetric is about whether they'll be able to piggy back on page change schema for emitting missing (from page change) revision records. Note that this _might_ break other assumptions we have about reusing mw-page-content-change-enrich logic.

If we go down the MW endpoint + EventBus + EventGate route we could produce to main. However, if the stream ends up being for Dumps only (and not a generic page change reconciliation one) jumbo might be an easier path (and in that case I wonder if wikikube is the right cluster to target).

There is a Miro board and slack thread to collect input about this for our next sync.

if the stream ends up being for Dumps only

So far, I think page_content_change is for dumps only, which is why we produced to jumbo. We'd produce to jumbo for reconciled stream too.

I suppose we should target running Flink reconciled enrichment job in dse-k8s at first anyway, even if we decide we want to run the job multi-DC in wikikube eventually. So let's go for it!

gmodena updated Other Assignee, added: gmodena.
gmodena updated Other Assignee, removed: gmodena.

I suppose we should target running Flink reconciled enrichment job in dse-k8s at first anyway, even if we decide we want to run the job multi-DC in wikikube eventually. So let's go for it!

Ack.

For documentation purposes: our current strategy regarding reconciled enrichment is focused on supporting the Dumps 2.0 use case, which relies on single-DC compute (Hadoop). A dse -> wikikube roll out path should fit nicely with the project roadmap.

I'll be picking up this task soon and will keep you posted on the progress.

Change #1070000 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] dse-k8s-eqiad: re-enable the Flink operator

https://gerrit.wikimedia.org/r/1070000

Change #1070000 merged by Brouberol:

[operations/deployment-charts@master] dse-k8s-eqiad: re-enable the Flink operator

https://gerrit.wikimedia.org/r/1070000

Change #1070597 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] ds8-k8s-service: add values for dumps2 job.

https://gerrit.wikimedia.org/r/1070597

Change #1074135 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] Create production and staging NS for mw-dump-rev-content-reconcile-enrich

https://gerrit.wikimedia.org/r/1074135

Change #1074136 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/puppet@production] deployment_server: create prod/staging users for mw-dump-rev-content-reconcile-enrich

https://gerrit.wikimedia.org/r/1074136

Change #1074136 merged by Brouberol:

[operations/puppet@production] deployment_server: create mw-dump-rev-content-reconcile-enrich users

https://gerrit.wikimedia.org/r/1074136

Change #1074135 merged by Brouberol:

[operations/deployment-charts@master] Create production and staging NS for mw-dump-rev-content-reconcile-enrich

https://gerrit.wikimedia.org/r/1074135

Change #1070597 merged by jenkins-bot:

[operations/deployment-charts@master] dse-k8s-service: add values for dumps2 job.

https://gerrit.wikimedia.org/r/1070597

Change #1075226 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] dse-k8s-services: fix values in dump enrichment app.

https://gerrit.wikimedia.org/r/1075226

Change #1075226 merged by jenkins-bot:

[operations/deployment-charts@master] dse-k8s-services: fix values in dump enrichment app.

https://gerrit.wikimedia.org/r/1075226

Change #1075537 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] flink-operator: specify a list of NS to watch in dse-k8s-eqiad

https://gerrit.wikimedia.org/r/1075537

Change #1075537 merged by Brouberol:

[operations/deployment-charts@master] flink-operator: specify a list of NS to watch in dse-k8s-eqiad

https://gerrit.wikimedia.org/r/1075537

Change #1075931 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] dse-k8s-service: add kafka-test brokers to flink app.

https://gerrit.wikimedia.org/r/1075931

Change #1076680 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] services: page-content-change-enrich: set deployment value.

https://gerrit.wikimedia.org/r/1076680

Change #1075931 merged by jenkins-bot:

[operations/deployment-charts@master] dse-k8s-services: dump-reconcile: add kafka-test brokers to flink app.

https://gerrit.wikimedia.org/r/1075931

Change #1077047 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] dse-k8s-services: content_history: update docker image.

https://gerrit.wikimedia.org/r/1077047

Change #1076680 merged by jenkins-bot:

[operations/deployment-charts@master] services: page-content-change-enrich: set deployment value.

https://gerrit.wikimedia.org/r/1076680

Change #1077047 merged by jenkins-bot:

[operations/deployment-charts@master] dse-k8s-services: content_history: update docker image.

https://gerrit.wikimedia.org/r/1077047

Change #1078923 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] dse-k8s-services: content_history: version bump image.

https://gerrit.wikimedia.org/r/1078923

Change #1078923 merged by jenkins-bot:

[operations/deployment-charts@master] dse-k8s-services: content_history: version bump image.

https://gerrit.wikimedia.org/r/1078923

Change #1080245 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] dse-k8s-services: content_history: version bump image.

https://gerrit.wikimedia.org/r/1080245

Change #1080245 merged by jenkins-bot:

[operations/deployment-charts@master] dse-k8s-services: content_history: version bump image.

https://gerrit.wikimedia.org/r/1080245