Page MenuHomePhabricator

EventBus / Change Propagation DC Failover Scenario
Closed, ResolvedPublic

Description

T123954 had a lot of discussion about how to set up replication of Kafka topics between datacenters. This ticket continues that discussion while focusing on the Change Propagation use case. The main question to debate here is between A: merged topics in aggregate Kafka clusters (e.g. mediawiki.revision create with data from all datacenters) vs. B: separate topics for all datacentetrs (e.g. eqiad.mediawiki.revision_create).

There may be an edge case where certain failover scenarios will lose messages in A (merged topics).

Event Timeline

With per-DC processing it is fairly straightforward to checkpoint offsets (or timestamps) per DC in something like etcd. In the case of a fail-over , those can then be used to continue processing of replicated events in another DC, and also to cleanly switch back once the original DC returns.

The same is a lot harder on a merged topic, as it contains events from multiple DCs interleaved in a semi-random fashion. For example, it is not easily possible to know whether all replicated events for a DC have been drained.

Another issue is that the merge order will differ between DCs. This means that clients following a merged event stream are likely to miss events in the case of a DC fail-over. With separate per-DC topics, deterministic merging can happen in a service providing something like a websocket interface. Offsets per DC partition can be checkpointed to etcd, and then used to cleanly fail over the stream to another DC, without dropping messages.

Still need to think about this, but a quick question.

Why the suggestion to use etcd? What does checkpointing offsets in etcd gain over using the usual Kafka offset commit protocol?

mmm.. that topics need to know about DC setup seems not so optimal, but I also need to think about this a bit more, kafka clients need to be able to deal with duplication of events and unordering regardless as that is bound to happen somewhat if we have guaranteed delivery.

Why the suggestion to use etcd? What does checkpointing offsets in etcd gain over using the usual Kafka offset commit protocol?

My understanding is that etcd will be the authoritative data source for DC fail-over decisions. It will be set up with multi-DC replication to support this.

This gives us two checkpoint systems: Zookeeper for frequent local offset storage, and etcd for less frequent checkpointing and DC fail-over.

Even if we go the route of per DC topics (which seems odd) domains should not be : eqiad.mediawiki.revision_create rather mediawiki.revision_create . Internally this domain via load balancer can be sent to eqiad.mediawiki.revision_create (we are assuming http posts right? )

Ok HMM! I just had a chance to think about this a bunch and talk to Marko about it.

So, Mediawiki will ever only be primary in a single datacenter at once. That means for Mediawiki generated events, messages will only be produced to one of the main (local) Kafka clusters in the primary datacenter at any given time. The change propagation service will run in each DC, and consume messages only from the local Kafka cluster. When mw_primary is switched, the new primary DC will start producing messages to its main (local) Kafka cluster, and the change propagation service jobs will start getting new events there.

This doesn't fully cover all cases. If eqiad blows up before all messages in the main-eqiad Kafka cluster are processed, then yes, messages will be lost. But, this is an extreme edge case. If a primary DC explodes, there will be more things to worry about than 100% MW/RESTBase consistency, no?

In most fail or switch over scenarios, change propagation will still be active in the standby DC, and will be able to drain and process any remainder messages in the main (local) Kafka cluster. Once it gets through them, it will be inactive until mw_primary is changed.

Am I missing anything? Is there a case where change propagation in one DC needs to know about events in the other DCs?

This doesn't fully cover all cases. If eqiad blows up before all messages in the main-eqiad Kafka cluster are processed, then yes, messages will be lost. But, this is an extreme edge case. If a primary DC explodes, there will be more things to worry about than 100% MW/RESTBase consistency, no?

We often have backlogs of millions of jobs, which can be triggered by a single template edit. I would expect this to be the case in EventBus as well. Losing the entire event history / backlog in any DC switch-over would not be ideal. If we set things up that way, we might as well not bother to replicate.

The other aspect is consistent merging for public consumer fail-over. If we expose random merges per DC, then there will be no way to switch DCs without losing messages for consumers of the merged stream.

Losing the entire event history / backlog in any DC switch-over would not be ideal.

This is still an extreme case. This would only happen if we lost a full Kafka cluster in a datacenter for good. In the normal switchover and failover case, the access to the Kafka cluster might be down for a period of time, but when that access is restored, messages would still be consumable, and change propagation jobs would pick up where they left off.

If we set things up that way, we might as well not bother to replicate.

If we were to completely lose a whole Kafka cluster, then the replication will fail us anyway as there will be messages that would never be replicated. We want to replicate for reasons other than HA change propagation. However, since your use case writes to Cassandra which is already replicated cross DC, then yes, I agree. I'm not sure it makes sense to use Kafka cross DC replication for change propagation jobs.

The other aspect is consistent merging for public consumer fail-over. If we expose random merges per DC, then there will be no way to switch DCs without losing messages for consumers of the merged stream.

This is true even with DC-named topics instead of merged topics. The offsets will be totally different, and we don't yet have a way to consume consistently based on content timestamps only.

Let me rephrase to describe how this sounds to me: You seem to be arguing that we don't need cross-DC replication, as DCs aren't likely to fail (for long), and switching consumers of a merged stream from one DC to another without dropping messages is not straightforward to support with Kafka 0.8. Is this a fair summary?

If so, then the logical conclusion to me would be to drop replication, and just make EventBus DC-local only. However, that does not satisfy our requirements:

  • We would like to do better than delaying or even losing potentially millions of updates in the case of an actual DC outage.
  • We would like to (eventually) have a sane and reliable way of switching consumers of a merged stream form one DC to another, or even load-balancing stream requests between DCs (active-active). (While I agree that this is not convenient with Kafka 0.8, it should be fairly straightforward with 0.10.)
  • We would like to be able to gradually move consumers of events between DCs, for example for load balancing purposes, or to gradually switch datacenters (see also this paper).

All of these things are a lot easier if topics are replicated in a way that allows regular processing in remote DCs.

It is not clear to me why we would want to forgo all of this for a marginal simplification in topic naming. Could you expand on this?

You seem to be arguing that we don't need cross-DC replication, as DCs aren't likely to fail (for long), and switching consumers of a merged stream from one DC to another without dropping messages is not straightforward to support with Kafka 0.8. Is this a fair summary?

Sort of, yes.

  • We would like to do better than delaying or even losing potentially millions of updates in the case of an actual DC outage.

This is the main point I don't understand. We'd only lose messages if an entire Kafka cluster implodes and is unrecoverable. If that happened, we'd lose messages if topics were named in either of the suggested forms. If access to a Kafka cluster is lost (via some network failure perhaps?), message consumption would be delayed, as you say, but that seems totally fine, no?

  • ...switching consumers of a merged stream form one DC to another, or even load-balancing stream requests between DCs...
  • We would like to be able to gradually move consumers of events between DCs, for example for load balancing purposes, or to gradually switch datacenters (see also this paper).

That is a really nice paper, thanks! Their system sounds pretty amazing. It'd be really nice if we could have build a multi-homed system here, but I don't think we have Google's technology!

With the input logs available in multiple datacenters,
Photon workers are able to run independently in each of these datacenters
to join the same input event, but workers coordinate their output to guarantee
that each input event is joined and outputted at-most-once. The critical state
shared between the workers consists of the set of event ids that have already
been joined.

(I see why you mentioned etcd before :) ) Correct me if I'm wrong here (and I very well may be), but in order to support gradual failover and multi-homed load balancing, they are processing each event in every DC, and then committing in only one DC. Before an output commit (in change propagation, I suppose this would be a render storage in cassandra?), the worker checks to see if the output has already been committed by workers elsewhere. If it has, it drops the output.

Our experience has been that bolting failover onto previously singly-homed
systems has not worked well. These systems end up being complex to build, have
high maintenance overhead to run, and expose complexity to users. Instead, we
started building systems with multi-homing designed in from the start, and
found that to be a much better solution. Multi-homed systems run with better
availability and lower cost, and result in a much simpler system overall.

Kafka is not a multi-homed system (yet?). Using DC named topics and Kafka offset commits feels like trying to bolt on cross DC failover.

It is not clear to me why we would want to forgo all of this for a marginal simplification in topic naming. Could you expand on this?

It isn't yet clear to me that using DC named topics actually gets you what you want. Ah! But I need to think a little bit more about this. I'd take that time before I submit this comment...but we have our meeting starting in 7 minutes! I'm not 100% opposed to DC named topics, just mostly opposed. But I can be convinced. Let's touch on this (briefly) in the meeting, and then I'll keep thinking about it and post again today.

Ok!

Done a lot more thinking about this, and also read over https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message in much more detail.

I'm coming around! The use case that convinced me is the ability to process a change propagation job in either DC. Since your cassandra cluster is already replicated between DCs, it doesn't matter which DC runs a given job. I can see how it would be very handy to be able to disable change propagation processors in eqiad and have those jobs picked up by processors in codfw. I think this is usefulness is unique to cassandra at WMF, as I don't know of any other cross DC master/master state store that we use.

So ja! IF we are able to consume based on message timestamp (which will be possible in 0.10), and IF the topics are named per DC, and IF you have a way of checkpointing the latest consumed timestamp (via etcd?) across DCs, then I think this is possible! It may be possible to do this with Kafka 0.8 much less reliably now by [[asking https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest? | brokers for offsets at timestamp ]]. This might not be worth relying on, as the offsets you get are based on local broker log segments, which will differ on each broker, and will differ even more between brokers of different clusters.

By going with DC named topics, we can also at least postpone T124469 if we want. We could easily replicate these topics directly between the main Kafka clusters in each DC, since we can guaruntee that producers in eqiad will never produce to the 'codfw.*' topics, and vice versa.

One more thing though!

What yall are proposing sounds really complicated. While balancing these job processors between DCs is very cool and could be handy, I'm not sure the complexity that you need to add is worth it, especially since Mediawiki is only master/slave. WMF's whole multi datacenter design is based around this. Cassandra allows you to be more flexible than Mediawiki does, so what you want to do is technically possible. So if you think its really worth the effort then I'm fine with it, but it seems a little crazy to me!

Mediawiki is only master/slave. WMF's whole multi datacenter design is based around this.

Actually, new infrastructure is supposed to do better than that. The fact that MediaWiki was not designed with multi-DC operation in mind makes it very hard to even do a basic fail-over test. Eventually, real fail-overs or even active-active operation should be routine.

Regarding the complexity, I think it is actually relatively limited:

  • Prefix all topics with the DC name (site in puppet).
  • Prefix local consumption with the DC name (also site).
  • In multi-DC clients like ChangePropagation, support consuming events from multiple DCs.

We don't have a use case for a merged event stream right now. If we had one, it would essentially be a merge sort over several topics:

  • Get the next message(s) from each DC topic.
  • Return the one with the highest timestamp, get next message.
  • Repeat.

Ha, ja this is the complex part:

In multi-DC clients like ChangePropagation, support consuming events from multiple DCs.

You're going to have to build global checkpointing and consumer balancing yourself, instead of letting Kafka deal with it for you.

But anyway, I'm fine with naming the topics based on DCs. This will let you do multi DC consumption if you build it, whereas I'm convinced that merged topics would make that much harder.

Change 278752 had a related patch set uploaded (by Ottomata):
Add DC named topics to event bus topic config

https://gerrit.wikimedia.org/r/278752

Change 278752 abandoned by Ottomata:
Add DC named topics to event bus topic config

Reason:
eventlogging-service-eventbus is now configured to prefix DC names before event production, but after topic config checking. This is no longer needed.

https://gerrit.wikimedia.org/r/278752

Ottomata claimed this task.