Page MenuHomePhabricator

[Shared Event Platform] Investigate Stream Processing Platforms
Closed, ResolvedPublic

Description

User Story
As a platform engineer, I need to evaluate various stream processing platforms so that as a group we can decide on the best solution for our needs
Timebox:
  • 2 weeks
Done is:
  • Decision made on solution
  • Evaluation criteria and decisions are documented here
Expected Spikes/Subtasks:
  • Flink
  • Kafka Streams
  • Knative

Event Timeline

@gmodena I heard you are about to go on vaca! Can you link any of your previous work on this here so we can use it as as staring point?

Hey @Ottomata,

the only recent bits of work I have is this PoC from our last code jam: https://gitlab.wikimedia.org/repos/generated-data-platform/event-driven-poc.
This branch https://gitlab.wikimedia.org/repos/generated-data-platform/event-driven-poc/-/tree/kafka-stream-wip contains some kafka streams experiments.

One bit I have not documented is the approach to async operations in flink vs kafka streams (you'll see examples in code).

Flink has the concept of async data streams and semantics for async io https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio/

Kafka Streams - to the best of my knowledge - does not support async processing:
https://issues.apache.org/jira/browse/KAFKA-6989
https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams

We should talk to @dcausse and @akosiaris about their experience deploying Flink and what they would do differently.

I just spent some time thinking about how we might revisit our Multi DC Replication setup for Kafka. Confluent Platform has some really fancy Multi-Region Kafka Cluster stuff, but unfortunetely this isn't upstreamed in to Apache Kafka.

I think we're stuck with multiple DC prefixed Kafka topics for now. MirrorMaker 2 would help us automate the creation and replication of topics (producers would not have to be DC aware), but I don't think upgrading to it now is necessary for our current use cases, as the ultimate topic layout would be approximately the same.

This means that consumers of these topics that very much care about e.g. page revision order will have to engage in some explicit reordering, especially in the case of MediaWiki DC failover (or if MW ever goes active-active). (That is, unless we couple the MW state change event streams directly to MariaDB (CDC as described in T120242: Eventually-Consistent MediaWiki state change events | MediaWiki events as source of truth).

WDQS Updater in Flink has logic to handle reordering, so perhaps we can factor that out into a Flink based reordering library that downstream FLink application consumers can use if they need to.

We should talk to @dcausse and @akosiaris about their experience deploying Flink and what they would do differently.

Actually, you want @JMeybohm more than me. I can tell you though that it wasn't a walk in the park.

Here are some thoughts we compiled while figuring out the possible deployment options on the wikikube k8s cluster: https://wikitech.wikimedia.org/wiki/Wikidata_Query_Service/Flink_On_Kubernetes .

@JMeybohm I wonder if the main pain points were more around the fact that the WDQS Updater is stateful. If there is no state (other than Kafka consumer offsets, which are stored in Kafka), perhaps multi DC k8s deployment won't be as difficult.

I think the use cases we are targeting atm are stateless.

@JMeybohm I wonder if the main pain points were more around the fact that the WDQS Updater is stateful. If there is no state (other than Kafka consumer offsets, which are stored in Kafka), perhaps multi DC k8s deployment won't be as difficult.

I think the use cases we are targeting atm are stateless.

There where multiple pain points IIRC:

  • Flink storing state (references to tombstones in swift, besides other things) in Kubernetes Configmaps. For one this means Flink needs permission to modify all Configmaps in it's namespace and also those Configmaps are not part of the helm chart, meaning they would be lost/need to be reconstructed somehow in case of disaster or when switching DC's.
  • The actual application/job is not part of the deployment. This means the artifact that gets deployed is not self contained (e.g. the docker image can't run on itself without further human interaction) and it needs domain knowledge to deploy "the right thing" to flink (after flink has been deployed). See docs at https://wikitech.wikimedia.org/wiki/Wikidata_Query_Service/Streaming_Updater#Kubernetes_setup

All that makes flink (at least in the WDQS incarnation) very special compared to all other services running in kubernetes.

Flink storing state [...] in Kubernetes Configmaps

I think in our current use cases, we won't need this. I believe all state will be handled by Kafka Offset commits. We'll be doing mostly just 1:1 event enrichment jobs.

The actual application/job is not part of the deployment

I was surprised about this too! I had some recollection that WDQS Updater was a application mode deployment, not session mode. @dcausse was this done to more easily accommodate WCQS?

In lieu of a better place, I want to write down some Multi DC thoughts from a long discussion @JAllemandou and I just had.

We need some shared terminology to talk about these concepts. Here are some suggestions:

Availability concepts:

  • active / active - This means that the streaming application itself is alive and receives events in both DCs.
  • active / passive - Only one DC instance of a streaming application runs (or is pooled) at any given time.

Stream input concepts:

  • double (or multi) compute - Each individual event is processed by every streaming application instance, meaning each event itself is processed multiple times.
  • single compute - Each individual event is only ever processed by one instance of a streaming application. This could mean that each streaming application only sees a subset of all events in the stream (e.g. eqiad events only in eqiad), or it could mean that only one instance ever sees all events at any given time (active/passive).

(Please offer other suggestions for these concepts if you have ideas on how to make them clearer.)

These can be combined. WDQS is active / active + double compute. Change Prop is active / active + single compute. Our stream enrichment apps (for MW state comprehensiveness, e.g. content in streams) will probably also be active / active + single compute.

WDQS updater is active / active + double compute because they need the all rdf updates from both DCs in proper order in a single topic so that blazegraph can be updated consistently.

We aren't sure how far we need to go to make our derived state change streams easy to use. It certainly would be nice if all state changes could be consumed in proper order using a single topic. WDQS updater would be much simpler if this were true.
Joseph and I imagined what it would look like if we were to accomplish that. I can't at the moment see any way to do this without double compute. Double compute isn't ideal, because consistency of the two outputs will surely drift over time.

We wanted explored the following and I just wanted to park these thoughts somewherre.

1. Enrichment Stream Processors(s)

active / active + single compute

These would create new entity (e.g. Page) based state change streams keyed by entity id (e.g. page_id). The eqiad processors would only consume and produce eqiad topics, and vice versa in codfw. DC Failover here is tied to MediaWiki failover. If MediaWiki is failed over to codfw, events will stop being produced in eqiad, and the eqiad processor will receive no input.

Example:
Produce a mediawiki.page_wikitext_change stream:

  • In eqiad: consume eqiad.mediawiki.revision-create (or whatever is needed), produce eqiad.mediawiki.page_wikitext_change.
  • In codfw: consume codfw.mediawiki.revision-create, produce codfw.mediawiki.page_wikitext_change.

(This is almost certainly what we will do for the events experiment we are currently embarking on.)

2. Ordered Entity Change Stream Processor(s)

active / passive + single compute

These create the 'nice to have' single topic keyed by entity_id ordered changes that WDQS and other downstream services might need in order to easily and confidently consume state change events. We'd make these active / passive so that we can create a single topic that contains all state changes. This topic one be mirrored one way from e.g. eqiad -> codfw, so that codfw consumer can also use it.

During streaming application failover, the mirroring replication direction would need to be switched. Doing this would require some coordination, but should be possible.

(EDIT: I realized that it is possible to do active/passive + single topic compute. We just need to change failover the replication direction).

I just spent some time thinking about how we might revisit our Multi DC Replication setup for Kafka. Confluent Platform has some really fancy Multi-Region Kafka Cluster stuff, but unfortunately this isn't upstreamed in to Apache Kafka.

You know, we might want to think about this a little more. Confluent Platform does have some extra stuff to support this (namely observer replicas) that Apache Kafka does not have. But, recent Apache Kafka does have support for follower fetching which helps a bit.

Been reading some blogs and talks about this 'stretched' Kafka cluster idea, and it sure would make Multi DC apps much easier. From the linked Kafka talk though, it recommends not doing it unless you can guarantee latency between DCs of <= 100ms. I have a feeling that our SREs would not like to guarantee that.

Been reading some blogs and talks about this 'stretched' Kafka cluster idea, and it sure would make Multi DC apps much easier. From the linked Kafka talk though, it recommends not doing it unless you can guarantee latency between DCs of <= 100ms. I have a feeling that our SREs would not like to guarantee that.

Yeah, I don't think it can be guaranteed 100% (as in it will never ever go above 100ms). median is moving on the 30-40ms range[1] with peaks on the 35ms. But that is statistical data which is aggregated. Can't guarantee that we wont have peaks well above 100ms in e.g. a re-routing situation or some cable cut. That being said, if there is an error budget it might be more feasible.

[1] https://smokeping.wikimedia.org/?target=codfw.Core

Oh, just saw this response, was just chatting with @BBlack about this in IRC.

I'd guess if > 100ms is a rare occasion, Kafka stretch would still be fine.

Oh, just saw this response, was just chatting with @BBlack about this in IRC.

I'd guess if > 100ms is a rare occasion, Kafka stretch would still be fine.

It would be interesting to document what would break and how in that rare occasion.

Ya, will do a lot of research and get back to ya. Hadn't expected this to be an actual option. Research time!

Ottomata renamed this task from [Shared Event Platform] Investigate Event Service Platforms to [Shared Event Platform] Investigate Stream Processing Platforms.Mon, Apr 15, 2:42 PM