Page MenuHomePhabricator

Support multiple partitions per topic in EventBus
Closed, DeclinedPublic

Description

Some of the CP rules are already pretty high-volume, so they could benefit from partitioning the topic for a more even load distribution between workers. Since the topics are auto created by EventBus service, we need to have some kind of configuration there to create topics with > 1 partitions.

We can manually increase the number of partitions for already existing topics, but new topics should get created with a proper number of partitions.

Right now we don't care about the particular partitioning strategy to use, because ChangeProp is gonna produce to the partitioned topics, and frankly, but it would be nice to think about that in advance.

Event Timeline

Pchelolo created this task.Feb 10 2017, 6:09 PM
Restricted Application added a subscriber: Aklapper. · View Herald TranscriptFeb 10 2017, 6:09 PM

Interesting! This would be a lot more configuration:

  • Per topic partitions, (we'd have to bring back EventBus based topic creation)
  • Per topic schema-field-key. We'd have to produce each message to Kafka using a key, and this would have to be configured per topic.

But, Q: I thought there were cross DC issues with change-prop and multi topic partitions that prevented us from doing this, no?

Per topic schema-field-key. We'd have to produce each message to Kafka using a key, and this would have to be configured per topic.

Right now this is not a huge priority. We can just use the default keyed partitioner and update the REST API to get the optional 'key' parameter. Then the producer can decide on the partitioning strategy (as I think it should)

I thought there were cross DC issues with change-prop and multi topic partitions that prevented us from doing this, no?

Hm, I'm not aware about this. I think there were questions about ordering, but these are irrelevant if we use a page title or some 'original-even-id' as a partitioning key.

The primary use-case I have in mind for this is to partition the transcodes-resource-change topic - it has quite a bit of traffic and on on the CP workers already starts to max out on CPU (the one purging varnishes probably). See parent task for details.

It's not a high priority, I just keep filing tasks with ideas I have for the upcoming JobQueue migration project. It will be way more relevant a bit later.

Per discussion on the JobQueue meeting:

  1. we've established that currently ChangeProp can process maximum 350 jobs/s per worker T181007 as the worker uses up all the CPU. There are some ideas how to optimize CPU usage like using keep-alive which would increase the maximum rate, but it's cannot be optimized indefinitely.
  2. In order to be able to scale horizontally multiple workers should be able to work on a single topic. This can be achieved with partitioning the topics.
  3. The perfect candidate for a partition key is DB shard - job load is more-or-less equally distributed by the database shards.
  4. EventBus service should support receiving a message key (probably as an optional path parameter in the /events endpoint) and the extension should supply the DB shard as a key.
  5. EventBus extension should support setting the number of topic partitions per topic in the topic config.
Pchelolo added a subscriber: Joe.Nov 23 2017, 10:01 AM

One issue I've encountered is that I can't find an easy way to find out which shard which domain belongs to from the extension code or how to provide this configuration to the EventBus service. @Joe do you know if it's possible to get the domain => shard mapping in the MW extension code somehow?

One issue I've encountered is that I can't find an easy way to find out which shard which domain belongs to from the extension code or how to provide this configuration to the EventBus service. @Joe do you know if it's possible to get the domain => shard mapping in the MW extension code somehow?

Looks like the mappings can be obtained from the $wgLBFactoryConf global variable.

After some discussion with @mobrovac we think that it's better to replicate the sharding mapping in the EventBus service instead of providing it together with the event.

The obvious disadvantage is that we're duplicating the configuration, but that's hardly an issue cause the config almost never change and even if they go out of sync it's not a big deal since we only need this config to heavenly distribute jobs across partitions.

The advantages are:

  1. The shard name is an implementation detail so providing it as a part of event seems like violation of encapsulation
  2. We might discover that some small in size wiki is very heavy on the queue, so we might want to reshuffle them independently from the DB shards
  3. Since the number of partitions will be less then the number of shards, several shards will be assigned to each partition. As the shards are designed to be equalish in size, we can rely on automatic balancing just providing the shard as a key. But if we realize that this doesn't make fair distribution of load we can reshuffle things independently from MW shards.
  4. This allows us to have different mappings for different jobs as some job types might be more heavy on a certain wiki.
Restricted Application added a project: Analytics. · View Herald TranscriptNov 23 2017, 10:58 AM

Although this is required for being able to horizontally scale the Kafka-based queue, optimization work done in T181007 made it possible to sustain 900 events/sec with a single partition (and that might not be the limit, perhaps now we're hitting concurrency limits) which is 5 times more then required currently for the most high-traffic jobs. So this is not a pressing issue any more right now.

We're moving this to Radar. Ping us or move it back if it becomes high priority again. cc @Ottomata

Milimetric moved this task from Incoming to Radar on the Analytics board.Nov 27 2017, 5:20 PM
Pchelolo closed this task as Declined.Mar 21 2018, 6:21 PM
Pchelolo edited projects, added Services (done); removed Services (next).

We've decided to go with a different approach when CP is handling partitioning. Declining.