We are embarking on a project to create more event streams (e.g. MW content). While designing these streams, we are also revisiting our [[ https://docs.google.com/presentation/d/1xxnFcxFJQGfbxnlmgwzCGyS8ULG0clP2QHcZ2OeeL1c/edit#slide=id.p | Multi-DC Kafka (main) and stream processing patterns ]].
I've recently learned that Kafka 'stretch' clusters are feasible in situations where cross DC latency is relatively low (< 100ms). From a brief conversation with @BBlack and @ayounsi, it sounds like our eqiad <-> codfw latency is on average around ~30ms, but without guarantees on this.
A Kafka Stretch cluster is a single Kafka Cluster spanning multiple DCs. Using a single Kafka cluster instead of 2 clusters with MirrorMaker in between greatly simplifies the architecture of Multi DC streaming and event driven applications. However, it may result in increased latency for Kafka clients, as well as some tricky broker failover scenarios if we are not careful.
This task is about evaluating creating a new Kafka Stretch cluster in eqiad and codfw, likely with the intention of it eventually replacing the Kafka main clusters. Since the current DC-ops hardware request deadline is **this Friday May 13**, if we do want to do this, we should decide before then so we can request hardware for it.
As I do research, I will update this task description.
---
Best docs I've found so far:
- https://www.confluent.io/blog/multi-region-data-replication/
(We won't be able to use the 'Observer' feature, as that in Confluent, not Apache Kafka. )Follower fetching is in Apache Kafka though.)
- http://mbukowicz.github.io/kafka/2020/08/31/kafka-in-multiple-datacenters.html
- https://downloads.ctfassets.net/oxjq45e8ilak/6icgmmPFCHtsw7KzUy9cKj/309ef5db510976b9fe018f3b5b5380a7/Two_and_The_Half_Datacenters_Foundations_of_Multi_DC_Kafka_Devoops.pdf
Notes:
###### Zookeeper
Zookeeper should be run in more than 2 DCs, e.g. '2.5' DCs. Kafka brokers are stretched between 2, but Zookeeper is 'stretched' between 3 so there is a tiebreaker ZK instance. Perhaps we'd run at least on ZK node in ulsfo?
###### Follower Fetching
With [[ https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica | follower fetching ]], consumers can read from partition replicas in their own DC.
###### Producers and partition leaders
IIUC, producers will always have to produce to the leader of the partition, which will only be on one broker at a time, which means only in one DC at a time. This will introduce latency to some produce requests, especially if `acks=all`. If `acks=all`, then all in-sync replicas must ACK the message before the Kafka producer client will receive its ACK. If I partition X has replicas in B1(eqiad), B2(codfw), B3(codfw), and I produce to partition X from codfw, my message will go: Producer(codfw) -> B1(eqiad) -> (B2(codfw) and B3(codfw)) and then ACK all the way back again, with the message itself crossing the DC twice before the producer receives ACK.
The number of times the message will be sent across the DC increases with the number of replicas we choose. We'll likely want more than 3 replicas in a stretch cluster.
In our current Multi DC MirrorMaker setup, each message only crosses the DC once, when MirrorMaker produces it to the leader in the remote cluster.
The leadership of a partition can change at any time, but will change most often (for us) during regular broker restarts. This means that regular rolling broker restarts will shuffle the cross DC network connections. As each broker is restarted, producers to partition leaders on that broker will re-connect to whatever brokers arer promoted to leaders for those partitions.
###### Replica placement
Replica placement matters. The default 'rack' aware placement algorithm can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment#KIP36Rackawarereplicaassignment-ProposedChanges
I'm trying to reason about replica assignment and cross DC traffic. I'm not 100% sure what I'm about to describe is possible via the default replica assignment algorithm, but lets go with it for now.
Requirements:
1. Consumers tolerate 1 failure without having to consume cross DC
2. Producers tolerate 1 failure without having to change which DC they are producing to
3. Preferred replica leader is evenly spread among all brokers, and also all DCs.
Requirement 1. is possible via follower fetching if there are at least 2 replicas in the local DC.
Requirement 2. is possible if the first 2 replicas listed are in the same DC.
To do this, I think we need a replication factor of at least 4, and need to be able to place replicas in a deterministic way. Let's say we have
4 brokers, 2 in each DC: A1, A2 (eqiad), B1, B2 (codfw) and we create a topic with 4 partitions. We want its replica assignment to be:
```
p0: [A1, A2, B1, B2]
p1: [B1, B2, A1, A2]
p2: [A2, A1, B2, B1]
p3: [B2, B1, A1, A2]
```
What matters is that the first 2 replicas in the list are in the same DC, so that when the preferred replica (the first one) goes down, another broker in the same DC takes over as the leader. Actually, we could go farther than this, and say that a DC replicas should always be grouped together in the list. If we had more than 4 replicas (and brokers), that would mean that e.g. A1, A2, A3 would always be grouped together in this list. I don't think we need more than 4 replicas though.
**//TODO: is the assumption of leader selection in order of replica list true????//**
Keeping first potential leaders in the same DC is not actually a hard requirement, but it will reduce the variability in cross DC traffic during broker restarts. Producer clients will be less likely to have to change which DC they connect to. We'll have to see if there is a smart way to set broker `rack.id` to accomplish this with the default replica assignment algorithm. If it isn't, it is possible to manually reassign partitions and replicas when we create topics, although this would be pretty cumbersome.
Unfortunately, this does not support the ability to always have producers produce to their nearest replicas. That only works with follower fetching for consumers. As Guozhang wrote, 'Producer nearest partition' is possible, but we'd have to make a 'leader rack aware partitioner' for the Producer to use, and doing so means we lose the ability to partition semantically, (e.g. by page_id).
###### Cross DC throughput calculations
//**THIS SECTION IS STILL A DRAFT...CHECKING NUMBERS!!!!**//
We will always have some cross DC producer connections. And, with 4 replicas (2 in each DC), we will always have 2 replicas fetching across DCs. How many times will a message cross the DC boundary in this case?
Let's assume even leader placement, so that half of the leaders are in each DC. Let N be the input throughput (bytes, # messages, whatever). Half of leaders will produce locally, and the other half will produce remotely. In the local produce case, each message will be cross the DC boundary 2 times: for each replica in the remote DC. For the remote produce case, each message will cross the DC boundary 3 times: once for the produce, and 2 more for each replica in the remote DC.
So. (2N + 3N) / 2 == 2.5N. (right?)
Okay, now let's plug in some Ns!
I'll start with @milimetric's handy numbers from a comment below. For 2021, the average revision size was 6847 bytes. Let's just round this up and say 10000 bytes. In 2021 there were 533764132 revisions, so 533764132*10000/365/24/60/60 == 1692556 bytes / second. Assuming growth, more use cases, replacing kafka-main (peaking 10-15KB per second in), etc., let's round this number up quite a bit to 5MB per second.
N=5MB per second. 2.5 * 5MB == **12.5MB per second of cross DC traffic. **
This is in contrast to the current MirrorMaker based setup, where cross DC kafka traffic should only be 1*N, so 5MB per second.
Note that this is in the normal operation scenario. If a replica is offline for a while, it will need to fetch data faster than the normal input rate when it comes back online. If all producers fail over to a single DC, then the formula becomes 3N instead of 2.5N during that time. **//TODO! This is not correct, also in MW's active/passive case, the NORMAL situation is all producers in one DC. Accomodate this!//**
####### tl;dr
With replication factor of 4, a my over-estimate of cross DC traffic is
- normal operation: 12.5MB per second
- full maintenance mode: 15MB per second
- Recovery mode after a DC outage: A lot more? We can [[ https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas | throttle ]] this though, so it depends on how quickly we need to recover. Let's just x3 it and say 45MB / second.