Page MenuHomePhabricator

Increase partitions of mediawiki.content_history_reconcile.v1
Closed, ResolvedPublic

Description

The topic mediawiki.content_history_reconcile.v1 in Kafka Jumbo has a single partition.

Monthly, the Airflow DAG mw_content_reconcile_mw_content_history_monthly emits a bunch of messages to the topic mediawiki.content_history_reconcile.v1

Because it sends a lot of messages in a short period of time, it triggers the alert MediawikiContentHistoryReconcileEnrichHighKafkaConsumerLag.
Nothing is wrong, the application eventually catch-up with pending messages and the alert is automatically fixed, but it might take hours to do so.

To speed up the process, we want to increase the number of partitions, as well as the number of parallel workers in the Flink application processing these messages.

Task is done if:

  • eqiad.mediawiki.content_history_reconcile.v1 and codfw.mediawiki.content_history_reconcile.v1 have more partitions.
  • The Flink application can parallelize the work with the new partitions.

Event Timeline

A question we haven't solved yet is: How many partitions should we use?

It looks like one partition was decided at some point, and because this topic doesn't receive data everyday, I believe we don't need a high number. Maybe we could start with 3 partitions for now? Any suggestions @brouberol ?

We had a discussion on Slack, we'll start with 3 partitions, it should help balance the storage across the brokers, speed up the process, and we don't need many partitions because this process runs only once a month.

because this process runs only once a month.

Daily reconcile does run every day, but it generates ~10k events.

Monthly reconcile, which goes back to the beginning of time for all wikis, indeed runs around the 2nd of the month and it does generate ~1.2M events. It shouldn't generate that many by now though, and we have T385112 where we have found various reasons and fixed them, but we still have more to fix.

To speed up the process, we want to increase the number of partitions, as well as the number of parallel workers in the Flink application processing these messages.

I actually like that this process takes multiple hours, because that also means that the associated refine job that lands this data on HDFS on an hourly basis can also process less data per hour than it would otherwise. We have indeed seen refine fail when lots of data is found.

Additionally, the Flink job hits the MediaWiki APIs to get the content. We have to be mindful of the amount of transactions per second we send their way. We have temporarily bumped the task managers to ~20 at times, which put a pressure of about ~1000 TPS on MW, but we have now downsized to 2 task managers.

So slower here is actually good!

That's interesting.
In general, I wouldn't reduce the number of partitions of a topic to slow down a pipeline, I assume we could slow down the pipeline by not increasing the parallelism of the Flink application, or adding some kind of back off system in the process. But I understand the concern. So, at least it seems that it doesn't make sense to increase the number of workers of the PyFlink application, and if we don't need that, one of the reasons to increase partitions is also not needed.

In my opinion, in general, it looks potentially a bit dangerous to have a single partition on a topic; for example, if due to an issue, we need to send 500M records for reconcile to that topic, we'll need days to process that, and one broker will get a lot of data suddenly.

But if we think it works fine now and we don't need to change it, I'm ok with the current approach.

We thought this would be a quick improvement after seing the pipeline for the first time during the Ops week, but we were missing that context.

In my opinion, in general, it looks potentially a bit dangerous to have a single partition on a topic; for example, if due to an issue, we need to send 500M records for reconcile to that topic, we'll need days to process that, and one broker will get a lot of data suddenly.

This sounds reasonable to me. +1 to add the partitions so that in the eventuality we get slammed with events, that doesn't put unreasonable stress to the one broker.

I assume we could slow down the pipeline by not increasing the parallelism of the Flink application

+1 to not increase Flink parallelism now.

(Side note: I just bumped TaskManagers to 20 temporarily due to T411803).

Thanks for the info @xcollazo. This is actually a good example for this ticket, even if we increase to 20 TaskManagers temporarily, the throughput is limited by the topic having only 1 partition. Having 3 partitions should help in similar scenarios.

The topics have now 3 partitions. We are not changing the number of tasks on Flink for now, but this change will allow us to parallelize better when needed.

Topic:codfw.mediawiki.content_history_reconcile.v1	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: codfw.mediawiki.content_history_reconcile.v1	Partition: 0	Leader: 1012	Replicas: 1012,1013,1018	Isr: 1012,1018,1013
	Topic: codfw.mediawiki.content_history_reconcile.v1	Partition: 1	Leader: 1011	Replicas: 1011,1013,1018	Isr: 1011,1013,1018
	Topic: codfw.mediawiki.content_history_reconcile.v1	Partition: 2	Leader: 1014	Replicas: 1014,1011,1018	Isr: 1014,1011,1018

opic:eqiad.mediawiki.content_history_reconcile.v1	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: eqiad.mediawiki.content_history_reconcile.v1	Partition: 0	Leader: 1010	Replicas: 1010,1014,1017	Isr: 1010,1017,1014
	Topic: eqiad.mediawiki.content_history_reconcile.v1	Partition: 1	Leader: 1010	Replicas: 1010,1013,1018	Isr: 1010,1013,1018
	Topic: eqiad.mediawiki.content_history_reconcile.v1	Partition: 2	Leader: 1013	Replicas: 1013,1011,1018	Isr: 1013,1011,1018