Page MenuHomePhabricator

mediawiki.job.cirrusSearchElasticaWrite topics need more partitions!
Closed, ResolvedPublic

Description

A recent change to some the cirrusSearchElasticaWrite caused it to submit a LOT more jobs at once. EventGate and Kafka are handling receiving the messages fine, but MirrorMaker is lagging when these jobs get submitted.

For high volume topics like this, we need more partitions. I believe EventGate's node-rdkafka prodcuer will just do the right thing if we increase the partitions, but I want to check with JobQueue folks (@Pchelolo) first.

Additionally, Erik told me that change-prop is repartitioning this and other high volume topics by some key. Could we just partition by key in the first place? We'd need EventGate and stream config to support specifying the partition key from the event data, but I think we could do this!

Event Timeline

We'd need EventGate and stream config to support specifying the partition key from the event data, but I think we could do this!

That's the key to why we did it via repartitioning - we only have support for intelligent partitioning in change-prop. Performace-vice, it definitely would be better to partition upon producing, but it will create really leaky abstractions - if MW upon producing a job needs to specify the partition - that's no good. Eventgate shouldn't really know specific partitioning either. Before we've had stream configs service, we really didn't have any place to put the partitioning config logic to except change-prop.

I'd be happy to move the smart partitioning from change-prop into event gate and put the config into stream config. That I think would be a preferred solution.

Alternatively, we can make eventgate randomly assign partitions based on the number of partitions in Kafka topic. That will be used just for decreasing the traffic within a single partition. Change-prop will then again repartition the messages in a smart way. This would be easier to implement, but less efficient then the former approach.

What do you think?

we can make eventgate randomly assign partitions

That will happen by default if we just add the partitions to the topics.

I think for key based partitioning, we can add stream configuration to specify a key field name. E.g.

mediawiki.job.cirrusSearchElasticaWrite:
  schema_title: mediawiki/job
  key_field: meta.domain # or whatever

If key_field is set, EventGate will produce the message partitioned by the value of that field in the message.

If key_field is set, EventGate will produce the message partitioned by the value of that field in the message.

Unfortunately that's not enough for the use-cases we use partitioning for. We need to specify an explicit mapping from filed values to partition numbers (actually, we don't care about partition numbers, we just need to group various field values into a determined partition). This is how it looks in change-prop.

The reason behind it is that for some DB-heavy jobs we want to partition the jobs according to DB shards, so that the processor doesn't switch between chunks of jobs belonging to different shards, thus moving the load in a spiky and unpredictable way between the shards.

We can transfer this logic and configuration into stream config, but it's fairly complicated and I'm not sure you'd want to have this complexity inside eventgate. Would you? :)

Hm, could you just also emit the database shard as a field and use that as the partition key?

In event data:

database_shard: s1

In stream config:

key_field: database_shard

Hm, or we could use a HTTP header to specify the key partition value when POSTing to EventGate:

curl -X POST -H 'X-Message-Key: { "database_shard": "s1"}' ...

In event data: database_shard: s1

That could work. We didn't really want to do it initially since it's not really easy to get the DB shard in MW code. We can propose to add a method like that to LBFactory interface, but it feels like an abstraction leak.

we could use a HTTP header to specify the key partition value when POSTing to EventGate

We can add a header to specify the partition too, but neither would really be very nice since we accept batches of events and they are not necessarily going to the same partition.

neither would really be very nice since we accept batches of events

Oright.

I don't love the idea of explicitly specifying the partition #. The # of partitions is subject to change based on Kafka server side configuration/ops. In dev, your Kafka probably only has 1 partition.

it's not really easy to get the DB shard in MW code

Is it easier to get in change-prop code? :p

Ok, I still like the idea of EventGate being able to partition by configured key, but it sounds like that doesn't really help us here, eh?

Ok, I still like the idea of EventGate being able to partition by configured key, but it sounds like that doesn't really help us here, eh?

I think it totally helps us in the case of cirrus search, which is good. For a more complex case, I think in the end you're right and adding the 'database_section' key in Mw code is the best way after all, but we can do it as a last step.

Reviewing how our job queue usage is going, the pre-partitioned queue here backlogs fairly significantly, up to ~500k messages, while the post-partitioned queue only backlogs when the consumer decides to stop reading for 10-30 minutes (separate ticket, T224425). As some approximate stats, on 2020-02-20 08:00-09:00 UTC the commited offset increment of the partitioner went up by ~3M, and the peak backlog over this period was ~500k jobs. This is around 850 jobs/sec, which puts 500k jobs at a 10 minute backlog. This happens for an hour every two hours when the scheduled jobs queue up.

The primary concern here is the lag between edits and when they show up in search. Since the queue that is fed by this partitioner seems to keep up fairly well, our initial assumption is that partitioning this topic as well should lead to less lag between edits and writing those edits to the search engine.

More complex options, like moving partitioning into MW, don't seem particularly desirable. Can we simply add random partitioning to this topic, and keep the existing downstream re-partitioning?

Before we go and create yet more partitions, we should try increasing the concurrency for the partitioner topic itself. Currently it's just 10 since we thought that repartitioning is really quick and that would be enough. Nothing prevents us from increasing the concurrency almost indefinitely - the downstream service here is kafka itself, so there's no concern about overloading it. The only real limit here is CPU and I doubt we're hitting it. I'll bump it to 50 and we can look how it goes.

Oh, sorry. This is about MirrorMaker. Bumping change-prop concurrency will not help here..

Can we simply add random partitioning to this topic, and keep the existing downstream re-partitioning?

That will be ok with the JobQueue and we wouldn't need to change anything on job queue side. Everything should just work out of the box.

@Ottomata can you update the priority for this task?

@herron I can do this, but perhaps yall could take this on? This is adding partitions to topics in Kafka main clusters (we should probably do the same thing in Kafka jumbo too).

Hey @Ottomata, sure

How many partitions do folks think this should be increased to in kafka-main, 3?

Topic:eqiad.mediawiki.job.cirrusSearchElasticaWrite	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: eqiad.mediawiki.job.cirrusSearchElasticaWrite	Partition: 0	Leader: 1002	Replicas: 1002,1001,1003	Isr: 1003,1002,1001
Topic:codfw.mediawiki.job.cirrusSearchElasticaWrite	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: codfw.mediawiki.job.cirrusSearchElasticaWrite	Partition: 0	Leader: 1002	Replicas: 1002,1003,1001	Isr: 1003,1002,1001

Yeah I think 3 is a good start. We can always add more, but we can't remove them :)

Mentioned in SAL (#wikimedia-operations) [2020-04-07T17:40:56Z] <herron> increasing eqiad.mediawiki.job.cirrusSearchElasticaWrite to 3 partitions T240702

Mentioned in SAL (#wikimedia-operations) [2020-04-07T17:49:45Z] <ppchelko@deploy1001> Started restart [cpjobqueue/deploy@83c93d1]: Try to make it notice new partitions T240702

Mentioned in SAL (#wikimedia-operations) [2020-04-07T17:56:35Z] <herron> increasing codfw.mediawiki.job.cirrusSearchElasticaWrite to 3 partitions T240702

Partitions have been increased to 3 in kafka-main and kafka-jumbo, I think we're good here! Thanks for the help @Pchelolo and @Ottomata