Page MenuHomePhabricator

Create kafka topic for mjolinr bulk daemon and decide on cluster
Closed, ResolvedPublic

Description

This kafka topic will be produced to once a week from an oozie workflow. It will be consumed by two separate consumer groups, one for each datacenter. Data size is estimated as ~3GB per week (with gzip compression enabled), but all generated at once.

Kafka cluster: main-{eqiad,codfw}
Topic name: (eqiad|codfw).cirrussearch.page-index-update
Number of partitions: 3

Event Timeline

Change 447468 had a related patch set uploaded (by EBernhardson; owner: EBernhardson):
[wikimedia/discovery/analytics@master] Point transfer to es at main-eqiad kafka cluster

https://gerrit.wikimedia.org/r/447468

I don't have anything against this but please also sync with @mobrovac and ops :)

fdans triaged this task as Medium priority.Jul 26 2018, 3:30 PM
fdans added projects: Services, SRE.
fdans moved this task from Incoming to Kafka Work on the Analytics board.

As requestsed, I've sent an email to ops list, cc'd to mobrovac, giving a high level overview of the solution and why it is needed

If my reading of puppet is right, what we need will be:

From a kafkamon1001.eqiad.wmnet:

kafka-topics.sh --zookeeper conf1004:2181/kafka/main-eqiad --create --topic eqiad.cirrussearch.analytics-updates --partitions 3
kafka-topics.sh --zookeeper conf2001:2181/kafka/main-codfw --create --topic eqiad.cirrussearch.analytics-updates --partitions 3

As far as i can tell our mirror maker doesn't use a white/blacklist. I could be mistaken, but i think the data will simply flow between them without any changes. (nb. had to change the topic name)

Do you need the data mirrored between the two different Kafka clusters? If so, you'll need to prefix the topics with the datacenter names. MirrorMaker only mirrors eqiad.* topics from main-eqiad -> main-codfw, and codfw.* topics from main-codfw -> main-eqiad.

Yes it will need to be mirrored, to be consumed in each datacenter separately. I'll readjust the naming, also switch _ to . to match how topics seem to already exist:

eqiad.cirrussearch.analytics-updates

Change 447468 merged by jenkins-bot:
[wikimedia/discovery/analytics@master] Point transfer to es at main-eqiad kafka cluster

https://gerrit.wikimedia.org/r/447468

I assume the task description implies the topic would get multiple messages every week, and that the total data size would be ~3GB (as opposed to one 3GB message). If so, LGTM. Note that we have snappy compression enabled in main, so the producer can simply send plain messages and they would be compressed on the fly. One question here: instead of burst 3GB of data into Kafka in one go, is there a possibility of spacing the messages out a bit to ensure the normal functioning of the Kafka cluster?

Regarding topic mirroring, here's the current situation in the main cluster. The topics are prefixed with eqiad. and codfw. and designate the origin DC of the message being produced to Kafka, i.e. if messages are sent to main-eqiad they should end up in a eqiad.-named topic and idem for codfw. These are mirrored to the other DC for fault tolerance. This means that we have same-name topics with different prefixes in both DC: eqiad.topic1 and codfw.topic1. At any given moment, the assumption is that only the active-DC-named topic will have traffic, implying that when we do a DC switch the other topic starts getting traffic. I assume that you need the data replicated in both DCs in order to keep both ES clusters up to date. Because of the aforementioned assumption wrt topic activity, the solution is to have consumers reacting to both topics (eqiad.topic1 and codfw.topic1) in both DCs. That way, regardless of the currently-active DC, both ES clusters will read all of the messages being produced by the oozie workflow.

This, however, does mean that, for consistency, during a DC switch-over, the oozie workflow needs to be redirected to send traffic to codfw.topic1. Looking at the above patch, I see the prefix being hard-coded in the configuration. @Ottomata is there a way we could introduce a variable there with the active DC notion and template the config file with it?

so the producer can simply send plain messages and they would be compressed on the fly

compression.type is a producer setting. We usually use snappy, but if you prefer gzip I believe it should work.

This, however, does mean that, for consistency, during a DC switch-over, the oozie workflow needs to be redirected to send traffic to codfw.topic1

Eric can correct me if I'm wrong, but I believe the reason he requested just an eqiad.* prefixed topic is that these messages are generated from the Hadoop cluster which only exists in eqiad. Even during a datacenter switch, there will never be any messages written directly to main-codfw, since there are no codfw producers of this data.

Eric can correct me if I'm wrong, but I believe the reason he requested just an eqiad.* prefixed topic is that these messages are generated from the Hadoop cluster which only exists in eqiad. Even during a datacenter switch, there will never be any messages written directly to main-codfw, since there are no codfw producers of this data.

Ah, you have made me realise I omitted one part of the explanation wrt the active DC and switch-overs. The point is that regardless of where the job originates (in this case the oozie workflow), if the active DC is set to codfw, all entities should send their messages to the codfw Kafka cluster. The idea behind the switch-over is to have all the moving pieces in place for/if/when an actual switch-over is really needed. In other words, relying and assuming eqiad is the only active DC should be avoided.

I mentioned this to Marko in IRC, but I'm not sure if his previous statement is quite right. For analytics/Hadoop stuff, we don't support the concept of datacenter switches, since Hadoop doesn't exist in codfw. We don't want to directly produce cross DC, so I'd say the eqiad topic is the only one we'd need. But, this might need more discussion...

Indeed, the discussion is probably out of the scope of this ticket.

That said, it would be good to have the topic created with both prefixes even if only one will be used for now to keep the symmetry with other existing topics in the main Kafka cluster.

I assume the task description implies the topic would get multiple messages every week, and that the total data size would be ~3GB (as opposed to one 3GB message). If so, LGTM. Note that we have snappy compression enabled in main, so the producer can simply send plain messages and they would be compressed on the fly. One question here: instead of burst 3GB of data into Kafka in one go, is there a possibility of spacing the messages out a bit to ensure the normal functioning of the Kafka cluster?

I ran some quick tests to see how much we will be pushing. The below is all limiting the data to hewiki, which produces ~537k records. The full dataset last week was 120.8M records, so hewiki represents ~0.4% of the total volume. This produces the records over 41 seconds if using a single producer, or 20 seconds using 3 producers (the limit we set on the previous import process). Extrapolating the to the full dataset we end up at somwhere around 1.5-3 hours to produce the full dataset with a single producer. Is spreading the 3GB over 2.5 hours sufficient? If not i can add some intentional delays into the system, or maybe split it into a few jobs that run a few hours apart.

Regarding topic mirroring, here's the current situation in the main cluster. The topics are prefixed with eqiad. and codfw. and designate the origin DC of the message being produced to Kafka, i.e. if messages are sent to main-eqiad they should end up in a eqiad.-named topic and idem for codfw. These are mirrored to the other DC for fault tolerance. This means that we have same-name topics with different prefixes in both DC: eqiad.topic1 and codfw.topic1. At any given moment, the assumption is that only the active-DC-named topic will have traffic, implying that when we do a DC switch the other topic starts getting traffic. I assume that you need the data replicated in both DCs in order to keep both ES clusters up to date. Because of the aforementioned assumption wrt topic activity, the solution is to have consumers reacting to both topics (eqiad.topic1 and codfw.topic1) in both DCs. That way, regardless of the currently-active DC, both ES clusters will read all of the messages being produced by the oozie workflow.

Right, our use case is to ensure updates are applied to clusters in each datacenter. I'll setup the consumers to read from both topics as that is very straight forward. I agree with the general summary above that what the producers should do in a datacenter switchover is still unclear. Until that is more clear limiting the oozie job to only produce to the kafka cluster in the local dc seems sane.

@EBernhardson, do you have an idea of how large your individual messages will be? I know CirrusSearch job queue stuff has been quite large, and this has caused some problems.

@EBernhardson, do you have an idea of how large your individual messages will be? I know CirrusSearch job queue stuff has been quite large, and this has caused some problems.

These messages are actually exceptionally tiny. They amount to the following (times 120M):

{
    "_index": "hewiki_content",
    "_id": 654321,
    "_source": {
        "popularity_score": 0.000001234,
    }
}

The main difference is the CirrusSearch job queue contains the text content of wiki pages which is highly variable and expands into the multi-MB range. Here was are sending a bunch of floats.

I ran a slightly longer test using 5M records, this allows it to run long enough to collect some information in the grafana dashboards. Using a single producer the steady state looks to be about 12k messages/s and 250kBps.

https://grafana.wikimedia.org/dashboard/db/kafka-by-topic?orgId=1&var-datasource=eqiad%20prometheus%2Fops&var-kafka_cluster=jumbo-eqiad&var-kafka_broker=All&var-topic=test_ebernhardson0&from=1533240244315&to=1533241130293

Hm ok. We can probably handle that in main-eqiad, but it would be very bursty and dominate the Kafka cluster during its run. We're only pushing around 2.5K / second in main right now: https://grafana.wikimedia.org/dashboard/db/kafka?refresh=5m&orgId=1

Could you throttle the production of this data?

Change 450158 had a related patch set uploaded (by EBernhardson; owner: EBernhardson):
[wikimedia/discovery/analytics@master] transfer_to_es kafka tweaks

https://gerrit.wikimedia.org/r/450158

I've added some rate limiting and tested it set to 1k messages/s:

Hm ok. We can probably handle that in main-eqiad, but it would be very bursty and dominate the Kafka cluster during its run. We're only pushing around 2.5K / second in main right now: https://grafana.wikimedia.org/dashboard/db/kafka?refresh=5m&orgId=1

Could you throttle the production of this data?

I've added some rate limiting and tested it at 1k messages/sec/producer and limited to a single producer:

https://grafana.wikimedia.org/dashboard/db/kafka-by-topic?orgId=1&var-datasource=eqiad%20prometheus%2Fops&var-kafka_cluster=jumbo-eqiad&var-kafka_broker=All&var-topic=test_ebernhardson0&from=1533246840728&to=1533247460261

For the job I set it at 1400/s which should spread the message production out over 24 hours.

+1, this rate looks acceptable to me. Thank you, @EBernhardson ! @Ottomata do we have to also tweak something in MirrorMaker to ensure the double rate will be supported (just checking to make sure) ?

Not 100%, but I believe MirrorMaker will handle the additional throughput. If it doesn't, it will start lagging, and we can adjust things then (like adding more instances).

@EBernhardson, the topic name should be (eqiad|codfw).cirrussearch.page-index-updates now?

Not 100%, but I believe MirrorMaker will handle the additional throughput. If it doesn't, it will start lagging, and we can adjust things then (like adding more instances).

@EBernhardson, the topic name should be (eqiad|codfw).cirrussearch.page-index-updates now?

Almost, the patches i've put up use the singular(rather than plural): `(eqiad|codfw).cirrussearch.page-index-update'

I've edited the task description to match.

Oh yes, singular is better, that was a typo great.

Change 450158 merged by jenkins-bot:
[wikimedia/discovery/analytics@master] transfer_to_es kafka tweaks

https://gerrit.wikimedia.org/r/450158