Page MenuHomePhabricator

Evaluate Kafka Stretch cluster potential, and if possible, request hardware ASAP
Closed, ResolvedPublic


We are embarking on a project to create more event streams (e.g. MW content). While designing these streams, we are also revisiting our Multi-DC Kafka (main) and stream processing patterns.

I've recently learned that Kafka 'stretch' clusters are feasible in situations where cross DC latency is relatively low (< 100ms). From a brief conversation with @BBlack and @ayounsi, it sounds like our eqiad <-> codfw latency is on average around ~30ms, but without guarantees on this.

A Kafka Stretch cluster is a single Kafka Cluster spanning multiple DCs. Using a single Kafka cluster instead of 2 clusters with MirrorMaker in between greatly simplifies the architecture of Multi DC streaming and event driven applications. However, it may result in increased latency for Kafka clients, as well as some tricky broker failover scenarios if we are not careful.

This task is about evaluating creating a new Kafka Stretch cluster in eqiad and codfw, likely with the intention of it eventually replacing the Kafka main clusters. Since the current DC-ops hardware request deadline is this Friday May 13, if we do want to do this, we should decide before then so we can request hardware for it.

As I do research, I will update this task description.

Best docs I've found so far:


Zookeeper or KRaft coordinator

Zookeeper should be run in more than 2 DCs, e.g. '2.5' DCs. Kafka brokers are stretched between 2, but Zookeeper is 'stretched' between 3 so there is a tiebreaker ZK instance. Perhaps we'd run at least on ZK node in ulsfo?

Newer versions of Kafka are removing the dependency on Zookeeper. A stretch cluster still needs a quorum tiebreaker. Kafka nodes can be run as brokers, controllers, or both. For our tie breaker node, we'd put one controller (non broker) node in a 3rd DC.

Follower Fetching

With follower fetching, consumers can read from partition replicas in their own DC.

Producers and partition leaders

IIUC, producers will always have to produce to the leader of the partition, which will only be on one broker at a time, which means only in one DC at a time. This will introduce latency to some produce requests, especially if acks=all. If acks=all, then at least min.insync.replicas (we'd probably set this to 2 or 3?) must ACK the message before the Kafka producer client will receive its ACK. If I partition X has replicas in B1(eqiad), B2(codfw), B3(codfw), and I produce to partition X from codfw, my message will go: Producer(codfw) -> B1(eqiad) -> (B2(codfw) and B3(codfw)) and then ACK all the way back again, with the message itself crossing the DC twice before the producer receives ACK.

The number of times the message will be sent across the DC increases with the number of replicas we choose. We'll likely want more than 3 replicas in a stretch cluster.

In our current Multi DC MirrorMaker setup, each message only crosses the DC once, when MirrorMaker produces it to the leader in the remote cluster.

The leadership of a partition can change at any time, but will change most often (for us) during regular broker restarts. This means that regular rolling broker restarts couuld shuffle the cross DC network connections. As each broker is restarted, producers to partition leaders on that broker will re-connect to whatever brokers are promoted to leaders for those partitions.

Replica placement

Replica placement matters. The default 'rack' aware placement algorithm can be found here:

I'm trying to reason about replica assignment and cross DC traffic. I'm not 100% sure what I'm about to describe is possible via the default replica assignment algorithm, but lets go with it for now.


  1. Consumers tolerate 1 failure without having to consume cross DC
  2. Producers tolerate 1 failure without having to change which DC they are producing to
  3. Preferred replica leader is evenly spread among all brokers, and also all DCs.

Requirement 1. is possible via follower fetching if there are at least 2 replicas in the local DC.
Requirement 2. is possible if the first 2 replicas listed are in the same DC.

To do this, I think we need a replication factor of at least 4, and need to be able to place replicas in a deterministic way. Let's say we have
4 brokers, 2 in each DC: A1, A2 (eqiad), B1, B2 (codfw) and we create a topic with 4 partitions. We want its replica assignment to be:

p0: [A1, A2, B1, B2]
p1: [B1, B2, A1, A2]
p2: [A2, A1, B2, B1]
p3: [B2, B1, A1, A2]

What matters is that the first 2 replicas in the list are in the same DC, so that when the preferred replica (the first one) goes down, another broker in the same DC takes over as the leader. Actually, we could go farther than this, and say that a DC replicas should always be grouped together in the list. If we had more than 4 replicas (and brokers), that would mean that e.g. A1, A2, A3 would always be grouped together in this list. I don't think we need more than 4 replicas though.

TODO: is the assumption of leader selection in order of replica list true????

Keeping first potential leaders in the same DC is not actually a hard requirement, but it will reduce the variability in cross DC traffic during broker restarts. Producer clients will be less likely to have to change which DC they connect to. We'll have to see if there is a smart way to set broker to accomplish this with the default replica assignment algorithm. If it isn't, it is possible to manually reassign partitions and replicas when we create topics, although this would be pretty cumbersome.

Unfortunately, this does not support the ability to always have producers produce to their nearest replicas. That only works with follower fetching for consumers. As Guozhang wrote, 'Producer nearest partition' is possible, but we'd have to make a 'leader rack aware partitioner' for the Producer to use, and doing so means we lose the ability to partition semantically, (e.g. by page_id).

Cross DC throughput calculations


Let's assume we have cross DC producer connections. With 4 replicas (2 in each DC), we will always have 2 replicas fetching across DCs. How many times will a message cross the DC boundary in this case?

Let's assume even leader placement, so that half of the leaders are in each DC. Let N be the input throughput (bytes, # messages, whatever). Half of leaders will produce locally, and the other half will produce remotely. In the local produce case, each message will be cross the DC boundary 2 times: for each replica in the remote DC. For the remote produce case, each message will cross the DC boundary 3 times: once for the produce, and 2 more for each replica in the remote DC.

(2N + 3N) / 2 == 2.5N

Okay, now let's plug in some Ns!

I'll start with @Milimetric's handy numbers from a comment below. For 2021, the average revision size was 20623 bytes. (The average is quite inflated by big outliers). Let's just round this up and say 30000 bytes. In 2021 there were 533764132 revisions, so 533764132*30000/365/24/60/60 == 507767 bytes / second. Compared with what kafka main is already doing (<5 MB / second) adding a revision text event stream wouldn't be a significant addition of traffic. I suppose if some bot edits e.g. 2MB pages 10 or 100 times a second this could matter though. Even so, might be able to use kafka replica quotas to reduce the impact on the link.

Assuming growth, more use cases, replacing kafka-main, etc., let's just round this up to 1MB per second.

N=1MB per second. 2.5 * 1MB == 2.5MB per second of cross DC traffic.

This is in contrast to the current MirrorMaker based setup, where cross DC kafka traffic is 1*N, so 1MB per second.

Note that this is in the normal operation scenario. If a replica is offline for a while, it will need to fetch data faster than the normal input rate when it comes back online.

I believe this formula is valid whether or not the producer is active/passive or active/active. In either case, half of the partition leaders will be in a remote DC.


With replication factor of 4, my made up over-estimate of cross DC traffic is

  • normal operation: 2.5MB per second
  • Recovery mode after a DC outage: A lot more? We can throttle this though, so it depends on how quickly we need to recover. Let's just x3 it and say 7.5MB / second.

Event Timeline

Ottomata renamed this task from Evaluate Kafka Stretch cluster potential, and if possible, request hardware ASAP. to Evaluate Kafka Stretch cluster potential, and if possible, request hardware ASAP.May 9 2022, 6:13 PM

Sorry, I don't quite get what you mean by this:

Zookeeper should be run in more than 3 DCs, e.g. '2.5' DCs.

I'm a bit confused by the reference for more than 3 DCs? - does that mean that we would be better having zookeeper stretched across 5 DCs?
Or would we be fine with zookeeper instances in 2 primary DCs and running a single tie-breaker instance in a third, smaller DC? Should we have three additional brokers in smaller DCs?

@BTullis oopos typo! fixed. Should have said 'more than 2'.

Ah great, sorry I feel like a pedant now. :-)

I emailed the kafka mailing list with some of these questions and got a really nice response from Guoazhang Wang from Confluent.

I wrote:

I'm evaluating the feasibility of setting up a cross datacenter Kafka 'stretch' cluster at The Wikimedia Foundation.

I've found docs here and there, but they are pretty slim. My biggest concern is the fact that while Follower Fetching helps with potential consumer latency in a stretch cluster, there is nothing that addresses producer latency. I'd have expected the docs I've read to mention this if it was a concern, but I haven't seen it.

Specifically, let's say I'm a producer in DC-A, and I want to produce to partition X with acks=all. Partition X has 3 replicas, on brokers B1 in DC A, B2 in DC-A and B3 in DC-B. Currently, the replica on B3(DC-B) is the partition leader. IIUC, when I produce my message to partition X, that message will cross the DC boundary for my produce request to B3(DC-B), then back again when replica B1(DC-A) fetches, and also when replica B2(DC-A) fetches, for a total of 3 times between DCs.


  • Am I correct in understanding that each one of these fetches contributes to the ack latency?
  • And, as the number of brokers and replica increases, the number of times a message crosses the DC (likely) increases too?
  • When replicas are promoted to be a partition leader, producer clients will shuffle their connections around, often resulting in them connecting to the leader in a remote datacenter. Should I be worried about this unpredictability in cross DC network connections and traffic?

I'm really hoping that a stretch cluster will help solve some Multi DC streaming app architecture woes, but I'm not so sure the potential issues with partition leaders is worth it!

Thanks for any insight y'all have,

Guoazhang's response:

Hello Andrew.

Just to answer your questions first, yes that's correct in your described
settings that three round-trips between DCs would incur, but since the
replica fetches can be done in parallel, the latency is not a sum of all
the round-trips. But if you stay with 2 DCs only, the number of round-trips
would only depend on the number of follower replicas that are on
different DCs with the leader replica.

Jumping out of the question and your described settings, there are a couple
of things you can consider for your design:

  1. For the producer -> leader hop, could you save the cross-DC network? For

example, if your message can potentially go to any partitions (such as it
is not key-ed), then you can customize your partitioner as a "rack-aware"
one that would always try to pick the partition whose leader co-exist
within the same DC as the producer client; even if your message's partition
has to be determined deterministically by the key, in operations you can
still see if most of your active producers are from one DC, then configure
your topic partitions to be hosted by brokers within the same DC. Generally
speaking, there are various ways you can consider saving this hop from
across DCs.

  1. For the leader -> follower hop, you can start from first validating how

many failures cross DCs that you'd like to tolerate. For example, let's say
you have 2N+1 replicas per partition, with N+1 replicas including the
leader on one DC and N other replicas on the other DC, if we can set the
acks to N+2 then it means we will have the data replicated at least on one
remote replica before returning the request, and hence the data would not
be lost if the one whole DC fails, which could be sufficient from many
stretching and multi-colo cases. Then in practice, since the cross-colo
usually takes more latency, you'd usually get much fewer round-trips than N
across DC before satisfying the acks. And your average/p99 latencies would
not increase much compared with just one cross-DC replica.

Quick stats check on revision sizes and diff sizes:

 select year(parse_datetime(event_timestamp, 'YYYY-MM-DD HH:mm:ss.s')) rev_year,
        max(revision_text_bytes) as maximum

   from mediawiki_history
  where snapshot='2022-04'
    and event_entity='revision'
  group by 1
[0.5, 0.75, 0.95, 0.99][0.1, 0.25, 0.5, 0.75, 0.99]
NULL[8, 8, 8, 8]8[8, 8, 8, 8, 8]
1999[28660, 33471, 33471, 33471]33471[1425, 1425, 28660, 33471, 33471]
2001[925, 2511, 9727, 26367]248819[-69633, 6, 125, 530, 12479]
2002[1351, 2847, 13055, 33535]715477[-1025, 1, 39, 247, 5471]
2003[1606, 4703, 22527, 52223]1017601[-1025, 4, 37, 207, 9407]
2004[1855, 5975, 32767, 93183]5733712[-897, 3, 33, 166, 14079]
2005[2227, 8063, 43007, 131071]10245346[-1081, 2, 32, 169, 15871]
2006[2891, 10447, 52735, 145407]10597086[-1281, 1, 30, 176, 16383]
2007[3199, 11407, 56703, 141311]4117000[-1345, 1, 28, 166, 12415]
2008[3503, 12079, 61439, 151551]3970008[-1393, 0, 26, 154, 9599]
2009[3319, 11535, 63231, 153599]3916015[-1073, 0, 23, 122, 7807]
2010[3327, 11487, 63999, 157695]3934446[-865, 0, 23, 116, 6719]
2011[3199, 11023, 64255, 159743]4158061[-809, 0, 24, 120, 6143]
2012[2971, 10239, 63487, 161791]3972179[-769, 0, 23, 95, 4351]
2013[1815, 6207, 49151, 145407]9391200[-1313, 0, 40, 205, 3327]
2014[2383, 7327, 53247, 155647]5592485[-697, 1, 41, 252, 4479]
2015[3443, 8863, 53247, 163839]17086356[-769, 1, 46, 338, 4671]
2016[3957, 10943, 58623, 174079]13074523[-745, 0, 47, 345, 6015]
2017[5215, 14335, 55295, 165887]9626147[-853, 1, 83, 430, 23791]
2018[6143, 19391, 80895, 237567]6697208[-1025, 1, 77, 409, 25743]
2019[6911, 20191, 81919, 229375]6500321[-1281, 8, 79, 375, 20751]
2020[5563, 17375, 63999, 198655]17553256[-801, 18, 120, 516, 22815]
2021[6847, 19967, 81919, 212991]20326020[-801, 7, 86, 418, 12287]
2022[8767, 24063, 81919, 215039]4611948[-2049, 4, 84, 397, 12276]
2025[855, 1724, 6512, 6512]6512[129, 601, 855, 1724, 6512]

And average for 2021 for estimating above:

presto:wmf> select avg(revision_text_bytes) from mediawiki_history where snapshot='2022-04' and event_timestamp like '2021%' and event_entity = 'revision';
Ottomata updated the task description. (Show Details)
Ottomata updated the task description. (Show Details)

@Milimetric got me the avg revision byte size in 2021:

presto:wmf> select avg(revision_text_bytes) from mediawiki_history where snapshot='2022-04' and event_timestamp like '2021%' and event_entity = 'revision';

Hm, looks like there may be some bugs in Follower Fetching atm. Let's keep an eye on that.

Had a really helpful meeting today with @BBlack, @BTullis, @ayounsi, @cmooney and @NOkafor-WMF. Took some rough notes:

BB: very similar looking to cassandra. Will be good to look at that world too.
internal replication traffic, who cares.
concerning aspect: if necessity of reliable write queue could add latency.
Some producers may not be able to tolerate. Could cause backlog on e.g. MW.
Cache servers?
Been pushing to make MW active/active? Better to keep kafka leaders active/active too.

BT:: What is the expected content? migrating kafka main? or new only?

AO: in terms of traffic?

AY: looks okay.
CM: longer term, maybe we need to prioritize traffic?
CM: Most things are 10 or 40Gig between DCs.
CM: Depends on failure scenarios. If latency shoots up, or if site goes offline.
Generally the looser systemms can be coupled. Our latency is 30ms now, but things can happen
on carrier side or us, different path, could get longer. Woul be hard to put constraints.

BB: we have 3 direct links. Even a 4th through chicago. Most realistic problem would be
saturating link with traffic, getting packet loss.

CM: links are on a fixed path. Can break. Not impossible for those paths to change. Carrier
can experience something and flip to a protected link, rerouting the traffic.

BB: If corner cases destroy everything, this might be a bad idea.

AY: Link failures are auutomated, but latency increases on a link are hard to be proactive about.

AO: Consumers will consume cross DC if local replicas fall out of sync.

AY: How does the failure risk change for other types of bugs, e.g. something on one DC, OS upgrades, etc.

BB: Clearly this is very complicated to evaluate the impact. Complicated and tricky. Not clearly wrong,
can't reject it out of hand. Doubt we can figure this out by just spending more time evaluating.
Should try it and gain opeational experience. Maybe a year later we decide its a dumb experiment.

BT: I have the same feeling, Are there other tools? that can help us? App servers will be producing to it
we need to think carefully about why we are introducing additional latency.

BB: Could look at it in capabilities sense. Not just about this one use case. As the owners of the kafka
infra, I would like to experiment to see if we can use this provide. If it doesn't work we can move on.

CM: Do we know people in other orgs that have done this? Real world experience. Find someone to ask.

AO: TODO reach out to mailing list folks.

AY: Zookeeper? We are trying to keep the POPs quite lean.

BB: Interesting, q. What matters to us about edge sites is that they don't have PII.
Could maybe put ZK in chicago.

BT: only a tiebreaker roll.

BT: we need to predict all the bad failure cases.

BB: this might push on netops to do traffic management and prioritization.

BB: 100MB is 800mb which is 1/10 of a link right now. Upgrading to 100Gb link. \
BB: the saturation problem causes.

In summary, NetOps/Traffic folks are okay with this from the WAN perspective. We came to the conclusion that it is worth trying this. It'll be difficult to know how well or poorly this will work from just reading and thinking about it, we need to get our hands dirty.

So, we should go ahead and request hardware to try this out for T307959: [Event Platform] Design and Implement realtime enrichment pipeline for MW page change with content. Since we'll be in experimental mode, let's just go with 4 brokers for now, 2 in each datacenter.

We still have some outstanding research to do:

  • Reach out to mailing list to see if there is anyone with experience running a Kafka Stretch cluster we can talk to.
  • Understand leadership promotion better, make sure we know how this works in potential failure (and high latency) cases.
  • We still need to think about what the streaming app multi DC-ness would look like. @JAllemandou pointed out that we certainly won't run Flink itself in a 'stretch' mode. So, what does that mean for the output of the application? The easiest to do will be to run it active/passive. Failover will much be easier with a Kafka stretch cluster. We could figure out a fancy way to run an active/active single compute by partitioning the work between the two streaming app instances in each DC (e.g. one instance works odd numbered page_ids, the other even).

@gmodena @lbowmaker @dcausse, @JAllemandou, FYI I added a hardware request for 2 new nodes in each DC, slated for Q2 FY2022-2023. Let's discuss tomorrow.

Oh, hm, we need a new ZK cluster!

Or if we want to try KRaft, just another kafka node.

I'm inclined to try KRaft since this is a brand new (experimental) Kafka cluster, and will not require us to create a new ZK cluster. KRaft is not marked as 'production ready' but it is likely to be in Kafka 3.3, slated to be released in August 2022.

I added a request for a tiebreaker controller Kafka node in ulsfo (or eqord).

CC @herron and @elukey on this ticket too, since they also do Kafka SRE work.

I added a request for a tiebreaker controller Kafka node in ulsfo (or eqord).

If the tie-breaker node (whether it's zookeeper or a KRaft node) is in ulsfo it could presumably be run on the ganeti400x cluster couldn't it?

If we do go with eqord (for the lowest possible latency) then we would definitely need hardware because there is no ganeti there.
Just a thought.

Oh that is a great point. It certainly could. I'll note that in the request.

Ottomata added a subtask: Unknown Object (Task).Aug 15 2022, 2:44 PM
Papaul closed subtask Unknown Object (Task) as Resolved.Aug 19 2022, 9:25 PM