Page MenuHomePhabricator

Add support for active/active double compute streams in the EventStreams HTTP service
Closed, ResolvedPublic

Description

Some streams have decided to adopt a double compute, active/active approach.
In such scenario the the kafka topics defined are logical replicas of each others and are not complementing themselves like most of our streams that are running single compute.
The EventStreams HTTP service has been built with this assumption and thus is not yet suited to serve double compute streams.

Proposal

Assuming a double compute stream defined as follow

name: my-stream.v1
topics:
  - eqiad.my-stream.v1
  - codfw.my-stream.v1

The EventStreams HTTP service can allow overriding the stream definitions on a per DC basis (values-codfw.yaml vs values-eqiad.yaml)
values-eqiad.yaml :

stream_config_defaults:
  my-stream.v1:
    topics:
       - eqiad.my-stream.v1
    topics_allowed:
       - codfw.my-stream.v1
       - eqiad.my-stream.v1

values-codfw.yaml:

stream_config_defaults:
  my-stream.v1:
    topics:
       - codfw.my-stream.v1
    topics_allowed:
       - codfw.my-stream.v1
       - eqiad.my-stream.v1
Operations

A client accessing the service running eqiad will only consume eqiad.my-stream.v1, the client will receive a Last-Event-ID of the form:

[
 {"topic": "eqiad.my-stream.v1", "partition": 0, "timestamp": 1575906290000},
 {"topic": "eqiad.my-stream.v1", "partition": 1, "timestamp": 1575906290000}
]

(we use useTimestampForId)

To resume operations after the connection ends.

Resuming from the same DC is not an issue since the topic should match.
Resuming from another DC the service should continue to serve data from that topic eventhough this is not the default one but one allowed in the topics_allowed array.

For instance:
the client sends a request to codfw with:

[
 {"topic": "eqiad.my-stream.v1", "partition": 0, "timestamp": 1575906290000},
 {"topic": "eqiad.my-stream.v1", "partition": 1, "timestamp": 1575906290000}
]

The service should understand that given the configuration:

stream_config_defaults:
  my-stream.v1:
    topics:
       - codfw.my-stream.v1
    topics_allowed:
       - eqiad.my-stream.v1
       - codfw.my-stream.v1

eqiad.my-stream.v1 is allowed and can continue consuming from this topic even though this is not the default topic a client should consume from when hitting codfw.

AC:

  • a double compute stream can be configured in the eventstreams HTTP endpoint
  • a consumer should not fail if it tries to resume consumption on a topic that's a replica of the topic populated in that DC

Details

Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Fix "No topics available for consumption"repos/data-engineering/eventstreams!14dcausseT382065-fix-no-topics-availablemaster
Inspect topics_allowed to extend the list of allowed topicsrepos/data-engineering/eventstreams!13dcausseT382065-allowed-topic-replicasmaster
Customize query in GitLab

Event Timeline

dcausse renamed this task from Add support for active/active double compute streams in the EventStreams HTTP servie to Add support for active/active double compute streams in the EventStreams HTTP service.Dec 12 2024, 1:47 PM

I like this proposal. And as long as clients are aware of its semantics, it LGTM.

Could eventual replication lag interfere the timestamp from the Last-Event-ID a client should resume from? I don't have a good feel for our current cross dc lag.

I like this proposal. And as long as clients are aware of its semantics, it LGTM.

Could eventual replication lag interfere the timestamp from the Last-Event-ID a client should resume from? I don't have a good feel for our current cross dc lag.

Unsure... I'm assuming that mirror-maker is propagating the original kafka message timestamp? But if the lag introduced by mirror-maker is high enough for the client to have time to consume events and then reconnects after a switchover before these events get propagated to the other kafka cluster then I suppose the timestamp we will try to position the consumer to does not yet exist. I suspect that the kafka consumer will reset to earliest latest offsets in that case and will lead to duplicates being consumed.
The number of duplicates in such cases might highly depend on the mirrormaker lag.

This proposal LGTM. Nit: Instead of accepted_topic_replicas, how about just an allowed_topics that in code defaults to the value of topics if not set? KafkaSSE already has an allowedTopics concept, and EventStreams just always sets it to topics from ESC.

You'd have to put both topics in allowed_topics. You wouldn't need DC specific helmfile values overrides.

But, this all makes me wonder. Are we sure we shouldn't just use separate streams for double compute?

A stream is supposed to be composed of topics. Having two topics that really are distinct things in the same stream just feels wrong.

I know you want to abstract away the DC replication detail from clients, and that makes sense. Maybe you should do this proposal as written, but ALSO declare the streams as separate streams in ESC?

I like this proposal. And as long as clients are aware of its semantics, it LGTM.

Could eventual replication lag interfere the timestamp from the Last-Event-ID a client should resume from? I don't have a good feel for our current cross dc lag.

Unsure... I'm assuming that mirror-maker is propagating the original kafka message timestamp?

That would be my guess too, but TBH I did not validate it.

But if the lag introduced by mirror-maker is high enough for the client to have time to consume events and then reconnects after a switchover before these events get propagated to the other kafka cluster then I suppose the timestamp we will try to position the consumer to does not yet exist. I suspect that the kafka consumer will reset to earliest latest offsets in that case and will lead to duplicates being consumed.

Right. In this case the latest on the replica would precede the last event consumed by the client before the reconnect.
I think duplicates are fine, as long as we explicit with at least once delivery semantics.

I know you want to abstract away the DC replication detail from clients, and that makes sense. Maybe you should do this proposal as written, but ALSO declare the streams as separate streams in ESC?

Having separate streams declared in ESC seems the right thing to do given the EP architecture. But in practice, what benefits would it bring? My first reaction is that we'd be over-abstracting an implementation detail exposed to clients by the EventStreams API boundary.

But in practice, what benefits would it bring?
[...]
exposed to clients by the EventStreams API boundary.

EventStreams API is only one boundary ;)

Event Platform tooling assumes that if you want to consume a stream, you want to consume all of its topics. Having the streams correctly declared separately would avoid having to add special cases to any tooling that might consume this data. Internal (Kafka) consumers are not automatically switched between DCs.

This proposal LGTM. Nit: Instead of accepted_topic_replicas, how about just an allowed_topics that in code defaults to the value of topics if not set? KafkaSSE already has an allowedTopics concept, and EventStreams just always sets it to topics from ESC.

You'd have to put both topics in allowed_topics

Sure, I wanted to avoid having the possibility to configure something inconsistent by having distinct list in topics and allowed_topics with doing a union of these two settings. I'm fine with allowed_topics if you want to stay closer to the KafkaSSE semantics.

But, this all makes me wonder. Are we sure we shouldn't just use separate streams for double compute?

A stream is supposed to be composed of topics. Having two topics that really are distinct things in the same stream just feels wrong.

Yes, I agree that something feels slightly odd but I think disagree that these are distinct things, conceptually there's a single stream it's just that we compute it twice because we have two kafka clusters. I'm not entirely sure but I believe the oddity is rooted somewhere in the way we run mirrormaker and our multi-cluster kafka cluster setup.
The fact that we have to deal with multiple topics (eqiad.stream and codfw.stream) for a single stream is already far from ideal, this leaves consumer of these streams having to deal with the difficulty of consuming multiple topics (which is hard if you care about event ordering). The single compute processors we have take the easy approach and defer that difficulty to downstream consumers.
Additionally our topic naming scheme forces producers to be aware of the DC they run in by being explicit on the topic they produce to (ESC's list of topics is primarily targeted at consumers), I wonder if an approach where you don't have to prefix the topic you produce to would not be better and do something different with mirrormaker to clearly identify (via some naming conventions or other config) that a mirrored topic is a complement for that stream in case that stream is single compute, for double compute streams we would not even have to mirror such topics...
But I think I digress... :)
All that to say that the statement "a stream is supposed to be composed of topics" is only true because our multi-cluster & mirrormaker setup, would we need to aggregate multiple topics otherwise? I'm not convinced that 2 ESC streams is right, practically speaking I don't like the fact that consumers will have to make a very arbitrary decision by choosing one or another.

I know you want to abstract away the DC replication detail from clients, and that makes sense. Maybe you should do this proposal as written, but ALSO declare the streams as separate streams in ESC?

I'm not sure what this would buy us to have both approaches in place?

If you prefer I can perhaps declare a stream with a single topic and decide that it's eqiad.stream, I can probably tell my producer to escape the limitation that the topic it produces to must be part of the topics list?

the statement "a stream is supposed to be composed of topics" is only true because our multi-cluster & mirrormaker setup, would we need to aggregate multiple topics otherwise?

Yes you are right. The weirdness is indeed because of the way we handle replication and a limitation in MirrorMaker 1. T277467: Upgrade to Kafka MirrorMaker 2 and revisit Kafka topic prefix convention

In MirrorMaker 2 world, we'd need to aggregate (for single compute), but the topics used for produce vs consume would just be different. E.g. in local DC A, The topic to produce to would be stream, (without any prefix) and in the remote DC B, the topics to consume from would be stream (the DC B produced topic) and A.stream the MirrorMaker 2 auto-prefixed replicated topic. This could be abstracted via ESC if/when we did this.

But, for double compute...you are right. In that case, you'd always produce to stream and consume from stream. Would we even need to replicate double compute topics? We could, but the consumers wouldn't use the replicated topic (by default?).

If you prefer I can perhaps declare a stream with a single topic and decide that it's eqiad.stream, I can probably tell my producer to escape the limitation that the topic it produces to must be part of the topics list?

Actually, this might be a very good solution! Only topics that are prefixed with eqiad. or codfw. are replicated. If in each DC, you produced only to stream (no DC prefix):

  • it will not be replicated
  • The topic name will be the same in both DCs
  • you won't need any modifications to EventStreams to use it.

Sure, I wanted to avoid having the possibility to configure something inconsistent by having distinct list in topics and allowed_topics with doing a union of these two settings. I'm fine with allowed_topics if you want to stay closer to the KafkaSSE semantics.

(In case we end up going with ^ solution.) Yeah good point. I'm okay with allowed_topic_replicas too. Maybe a good concept then is topic_alternates? or something? Hm, but what if we had more than 2 topics part of a stream (e.g. if we had more active producing DCs/Kafka clusters?). I suppose we'd need some distinction to indicate that in this special case only a single topic should be consumed? Hm.

allowed_topics indeed could make folks mess up and consume both topics, but they'd have to very intentionally do so (by setting a custom Last-Event-ID). Hm, I think allowed_topics might be a more generic and more use solution. But I'm okay with either :)

So I think we have the options below:
1/ Adapt the EventStreams HTTP services
Mainly the approach suggested in this ticket, the service config is adapted to serve the topics produced in this DC but is relaxed to allow consuming the mirrored topic produced from the other DC.
Pros: Transparent to consumers, minimal lag, consumers do consume the stream closest to where they query
Cons: pushing EP limitations regarding double compute streams to this service, fail-over scenario (e.g. long maintenance on one DC stream processor) is unclear, feels hackish

2/ Adapt double compute stream processors to produce to a topic without prefix
With double compute processors the data is already available in both kafka-main clusters, the ESC could only declare a single topic that would be available in both kafka-main clusters. The EventStream HTTP services would work similarly.
Pros: would possibly fit with existing EP, would save space on kafka-main
Cons: non-negligible effort to migrate existing stream processors (needs a migration plan), is making a bet on future mirrormaker topic prefix conventions (T277467), fail-over scenario is unclear, probably needs to switch to event time for kafka timestamp to ensure at least once semantic when failing over. Relatively new approach and it's possible that we overlooked some issues.

3/ Configure the EventStreams HTTP service to consume from a single topic
The service configuration would override the ESC with a single topic (we would arbitrarily chose e.g. eqiad.topic).
Pros: no code change
Cons: depending from where you consume you might consuming data going through extra hops (extra lag), fail-over scenario (e.g. long maintenance on the eqiad stream processor) is not great and requires a config change in the service to declare codfw.stream instead of eqiad.stream This would cause existing consumers to fail having to resume consuming without a Last-Event-ID, still feels hackish

Personally at this point I think I prefer option 3, while option 2 seems the most ideal I fear of the extra work esp. if once we tackle T277467 we learn new things and realize that the strategy we took does not fit.

Q, for option 2, you have:

making a bet on future mirrormaker topic prefix conventions (T277467)

For double compute, do the future topic prefix conventions matter? For producers, single compute and double compute is the same: produce to non-prefixed stream topic. For consumers, it matters, but no more so than it does now: consumers have to be configured (or automatically configured via ESC?) to consume from the correct topics. In double compute, that is a single local topic, for single compute it is the local + the replicated topic.

Cons: non-negligible effort to migrate existing stream processors (needs a migration plan),

Oh! Indeed if we changed this convention for all double compute streams sure. But I thought we were just talking about a new(?) stream you were making for T374921: Configure https://stream.wikimedia.org to expose rdf-streaming-updater.mutation. We could adopt this Option 2 convention for new streams, and just document that old streams aren't using it.

fail-over scenario is unclear

Is there fail over? I thought in double compute world there wasn't. Do you have consumers in codfw consume from replicated eqiad topic?

I thought fail over in this case was just traffic routing to a single DC, but that DC would still only consume its local topic?


Between Option 1 and 3, I really don't mind. The code changes needed for Option 1 are not large, and the topics_allowed or topic_alternates or even topic_replicas solutions are generic concepts that solve this problem. I don't think they are hacky. :) As long as they are clearly named and documented for what they do and what they are for, they are fine with me!

Q, for option 2, you have:

making a bet on future mirrormaker topic prefix conventions (T277467)

For double compute, do the future topic prefix conventions matter? For producers, single compute and double compute is the same: produce to non-prefixed stream topic. For consumers, it matters, but no more so than it does now: consumers have to be configured (or automatically configured via ESC?) to consume from the correct topics. In double compute, that is a single local topic, for single compute it is the local + the replicated topic.

I meant here all possible talks & design choices that might happen in T277467 that could possibly contradict what we decide here, it might be just fine but it felt that we might need to have more thorough discussions about this :)

Cons: non-negligible effort to migrate existing stream processors (needs a migration plan),

Oh! Indeed if we changed this convention for all double compute streams sure. But I thought we were just talking about a new(?) stream you were making for T374921: Configure https://stream.wikimedia.org to expose rdf-streaming-updater.mutation. We could adopt this Option 2 convention for new streams, and just document that old streams aren't using it.

I want to expose existing kafka topics produced by the flink job rdf-streaming-udpater. Even just for this processor and these topics the effort might be non-negligible since it's being used by all WDQS nodes internally. The migration path is unclear to me, but basically involves two strategies:

  • no downtime (cost is space) by populating the data to these new unprefixed topics
  • or with downtime by renaming the topics if possible.

fail-over scenario is unclear

Is there fail over? I thought in double compute world there wasn't. Do you have consumers in codfw consume from replicated eqiad topic?

I thought fail over in this case was just traffic routing to a single DC, but that DC would still only consume its local topic?

client -> eventstreams@eqiad -> kafka-main@eqiad -> topic stream
client -> eventstreams@codfw -> kafka-main@codfw -> topic stream

If say we shutdown all codfw then we only need proper kafka timestamps that are coherent between the two active flink jobs (why I mention kafka timestamps being assigned event times).
The problem is for more granular fail-over scenario, e.g.:
If say we shutdown only flink@codfw here we might need a config change to force eventstream@codfw to consume from kafka-main@eqiad

I want to expose existing kafka topics produced by the flink job rdf-streaming-udpater

Ah! For some reason I thought it was a special topic being produced for public consumption, not ones used by the blazegraph updater itself.

Okay. Then I agree, I don't think we should migrate wdqs-updater to a new topic layout just for this. Goodbye Option 2!

If say we shutdown all codfw then we only need proper kafka timestamps that are coherent between the two active flink jobs (why I mention kafka timestamps being assigned event times).

Ah yes. I see event times, and we prefer (and use?) system receive time for Kafka timestamp (for reasons), which means the timestamps used when switching might get weird.

Okay!

Option 1 then?

Option 1 then?

Sounds good, thanks!
We can probably continue discussing naming & other details in https://gitlab.wikimedia.org/repos/data-engineering/eventstreams/-/merge_requests/13, I'll rename to allowed_topics and possibly include some logic to merge topics & allowed_topics arrays to limit config mistakes.

Gehel triaged this task as High priority.Jan 6 2025, 2:41 PM
Gehel moved this task from Incoming to Scaling on the Wikidata-Query-Service board.

Deployed Eventstreams v0.10.0 on beta and it throws this error when listening to a stream:

{"message":"No topics available for consumption. This likely means that the configured allowedTopics do not currently exist.","origin":"KafkaSSE","name":"ConfigurationError","allowedTopics":[null],"statusCode":500}

Deployed Eventstreams v0.10.0 on beta and it throws this error when listening to a stream:

{"message":"No topics available for consumption. This likely means that the configured allowedTopics do not currently exist.","origin":"KafkaSSE","name":"ConfigurationError","allowedTopics":[null],"statusCode":500}

certainly my fault... this array with [null] is suspicious... will try to fix it, sorry about that

Deployed v0.11.0 on beta and confirmed it's fixed