Page MenuHomePhabricator

Investigate proper set up for using Kafka MirrorMaker with new main Kafka clusters.
Closed, ResolvedPublic

Description

We currently (or will soon) have 3 distinct Kafka clusters:

  • analytics-eqiad - The is the large Kafka cluster in the Analytics VLAN.
  • main-eqiad - kafka100[12], used for 'main' (AKA production) Kafka messages in eqiad
  • main-codfw- kafka200[12] used for 'main' (AKA production) Kafka messages in codfw. (As of 2015-01 this is blocked.)

Messages in the 2 main Kafka clusters will be mirrored to the analytics-eqiad Kafka cluster for ingestion into Hadoop and whatever other analytics things we got going. This will be done using Kafka MirrorMaker.

Over in T114191, @aaron and @ori describe wanting to consume in eqiad from main-codfw, and vice versa, for cache invalidation. In T117933, the Services team is working on change propagation for RESTBase (which I think is essentially cache invalidation). I believe they also want RESTBase clusters in both eqiad and codfw to consume the same messages. I.e. codfw RESTbase will need change propagation events from both eqiad and codfw main Kafka clusters (Services folks, correct me if I am wrong).

There are two ways to accomplish this:

  1. Cross Datacenter Consumption
  2. Kafka MirrorMaker

As suggested in T114191, consumers can consume messages from Kafka cross datacenters. This may work well enough for our use cases, but it is not the recommended way to do things.

For analytics-eqiad, using MirrorMaker to consume from both of the main Kafka clusters is easy. Producers will either produce main messages to a local main cluster, OR they will be analytics producers and produce directly to the analytics-eqiad cluster (e.g. varnishkafka). MirrorMaker will not consume from analytics-eqiad.

But since the 2 main Kafka clusters are functionally the same, in order to use MirrorMaker between them we have to ensure that they don't mirror topics in a feedback loop. If we want to mirror messages between main-eqiad and main-codfw so that consumers local to each DC only have to consume from their local main cluster, we have to make sure that producers in each DC produce to a topic that only receives messages for that DC. E.g. topic main-eqiad.mediawiki.page_edit would be produced to by producers in eqiad, and topic main-codfw.mediawiki.page_edit would be produced to by producers in codfw. MirrorMaker could then mirror main-eqiad.mediawiki.page_edit to main-codfw cluster, and main-codfw.mediawiki.page_edit to main-eqiad cluster. Using the same topic name in each cluster would lead to a feedback loop, where the MirrorMaker in each cluster would re-produce the mirrored message back to the original cluster.

https://engineering.linkedin.com/kafka/running-kafka-scale is a good reference for how LinkedIn does mirroring. In short, they have local and aggregate clusters in each DC. Producers in each DC produce to their local cluster. MirrorMaker in each DC consumes from all local clusters in each DC and produces to the DCs aggregate cluster. In this setup, the main local clusters don't have all messages from all DCs, but each DC's aggregate cluster does, which allows all Kafka consumption in each DC to be done locally from the DC's aggregate cluster, instead of reaching out across DCs.

We don't have plans to set up more Kafka clusters, so we need to figure out if we are ok with having consuming cross-DC, or if we should go the route of naming topics after DCs so we can mirror messages between main Kafka clusters.

Event Timeline

Ottomata claimed this task.
Ottomata raised the priority of this task from to Needs Triage.
Ottomata updated the task description. (Show Details)
Ottomata added subscribers: Ottomata, GWicke, aaron and 6 others.

So we are basically aiming at a master-master Kakfa set-up? Regardless of different queues for the two DCs, the trickiest part here is event de-duplication in the sense of consumer awareness. If consumers exist in both DCs, then we should ensure they are not reading the same events and act on them. As in, if there is a consumer in eqiad that consumes events from main-eqiad.*, the one in codfw shouldn't be doing so.

In the case of RESTBase, our actions are idempotent, but do consume resources and disk space if handled multiple times: RESTBase is stateless per se, but it relies on Cassandra, for which a cross-DC replication policy is already in place (you can write data to either DC, and it'll show up in queries on both sides). That basically means that we don't care as much to have all of the events on both sides at the same time; all we need is to make sure that event consumption happens in at least one DC.

One last question. If we do plan to go for a master-master(-like) set-up, why is it important to differentiate the source of the events? In my mind, the DCs are equivalent and will be serving the same content and perform the same actions. E.g. reqA happens in eqiad, while reqB is handled in codfw. As a consumer, I don't care about the source of the, say, edit event, but I do care that I don't process the event twice.

RESTBase is stateless per se, but it relies on Cassandra, for which a cross-DC replication policy is already in place (you can write data to either DC, and it'll show up in queries on both sides).

Oh, right, interesting. Since Cassandra is cross-DC already, yall don't need cross-DC consumption, right? A propagation event in codfw will be picked up by the codfw consumer, make a change in Cassandra, and then that change will be replicated to eqiad via Cassandra cross-DC support, correct?

If that is the case, then master-master isn't helpful for you.

One last question. If we do plan to go for a master-master(-like) set-up, why is it important to differentiate the source of the events?

The reason is how master-master is done. In this case, A MirrorMaker instance would run in eqiad and codfw. Each MirrorMaker would consume from the opposite DC's cluster and produce to the local cluster.

Let's assume topics are named the same in both clusters. Say reqA is produced to topicA in main-eqiad. MirrorMaker in codfw would consume reqA from main-eqiad and produce to main-codfw. But, MirrorMaker in eqiad is configured to consume from main-codfw and produce to main-eqiad. It would consume reqA in topicA in main-codfw and produce it again to main-eqiad. And so and and so on.

MirrorMaker isn't much more than a glorified auto consumer->producer. It doesn't have any way of knowing that a message in a given topic was produced by MirrorMaker or some other real producer.

If Services doesn't need events from eqiad to show up in codfw (and vice versa), then perhaps we can revisit this later. Unless @aaron needs it for cache invalidation. @aaron?

Cross-DC consumption without replication would mean that events from an unavailable DC would also be unavailable, which I think is not acceptable. We should replicate events to other DCs with limited delay.

There are quite a few use cases that can benefit from a separation of replicated vs. locally produced messages, by writing replicated topics to source dc prefixed topics. I believe this is also standard practice in Kafka land & easy to support robustly. For example, the topics for a returning DC can be caught up to the latest state on its return fairly easily, while doing the same in a merged topic would be impossible after the fact. It is still possible to produce a merged & order-preserving stream of all events across the dcs in a consumer.

So, my vote would be to keep things simple & replicate to dc-prefixed topics in the other dcs.

The point for me here are not names, but rather the ability of consumers in different DCs to discern whether they should consume events from the other DC as well. In a normal-operation mode, each consumer (group) would consume only messages from its local DC. But the problem arises when consumption stops in one of the DCs. In such a scenario, the consumers in the other DC should consume foreign topics as well. But how will they know that is the case?

I guess in a first phase we could manually instruct them to do so, but automatising consumption fail-over would be a pretty neat thing to do.

I guess in a first phase we could manually instruct them to do so, but automatising consumption fail-over would be a pretty neat thing to do.

I think the plan in the multi-DC working group is to have this information in etcd.

Ok, so it sounds like the master-master with topics named after DCs is needed then, yes?

@Ottomata, I think in conventional replication terminology the mirrormaker stuff is all master-slave, not master-master. Mirrormaker consumes from a master & writes to a slave.

Are there any common naming / namespacing schemes for replicated topics in Kafka?

Indeed you are right, it is much more like master-slave since the topics are distinct. The setup we are describing feels a little like master-master just because both are accepting writes, and both are 'replicating' to each other.

I don't know of any naming schemes. Most information I can find about this is put out by LinkedIn, and they suggest using local and aggregate clusters, instead of direct mirroring between local clusters. I suppose we should just prefix or suffix the topics with the Kafka cluster name. I think I'd prefer suffixing, but I'm not sure. E.g. mediawiki.page_edit.main-eqiad

I think I'd prefer suffixing, but I'm not sure. E.g. mediawiki.page_edit.main-eqiad

Sounds good. My suggestion would be to pick e.g. mediawiki.page_edit.main_eqiad, mediawiki.page_edit.main.eqiad, or mediawiki.page_edit.eqiad instead (consistent separator character.) Because - (hyphen) is imho not obvious as a special character, the way . (dot) is.

main-eqiad is the name (I have chosen :?) for the Kafka cluster, so I'd like to keep it consistent with that. This is also the path used in the Zookeeper, i.e. /kafka/main-eqiad. In this case the hyphen isn't really a special character, the full name of this Kafka cluster is 'main-eqiad'.

main-eqiad is the name for the Kafka cluster, so I'd like to keep it consistent with that. [..] the hyphen isn't really a special character, the full name of this Kafka cluster is 'main-eqiad'.

Okay :)

I do think that we DEFINITELY want to rely events to active listeners in both datacenters. What we don't want is e.g. in the case of restbase or most other jobs that imply state changes to be run twice, one per datacenter. This holds true for any mediawiki-related "job" probably.

What we *really* want to be able to do is to tell easily to our services which of the clusters to actually use.