Page MenuHomePhabricator

Balance Kafka topic partitions on Kafka Jumbo to take advantage of the new brokers
Closed, ResolvedPublic


In T252675 we added 3 new Brokers to Kafka Jumbo, kafka-jumbo100[7,8,9]. In this task we should find a way to move topic partitions to the new brokers to better balance the cluster.

Progress: (topic: messages / second)

  • eqiad.mediawiki.job.ChangeDeletionNotification: 7.9
  • eventlogging_ExternalGuidance: 8.1
  • 8.883333333333333
  • eventlogging_FirstInputTiming: 10.35
  • eventlogging_TemplateDataApi: 10.433333333333334
  • eventlogging_CodeMirrorUsage: 11.016666666666667
  • codfw.mediawiki.revision-tags-change
  • eqiad.mediawiki.revision-tags-change: 11.633333333333333
  • codfw.mediawiki.job.wikibase-InjectRCRecords
  • eqiad.mediawiki.job.wikibase-InjectRCRecords: 11.8
  • codfw.mediawiki.job.ORESFetchScoreJob
  • eqiad.mediawiki.job.ORESFetchScoreJob: 12.766666666666667
  • codfw.mediawiki.job.categoryMembershipChange
  • eqiad.mediawiki.job.categoryMembershipChange: 12.983333333333333
  • eventlogging_MobileWikiAppDailyStats: 13.316666666666666
  • eventlogging_EditAttemptStep: 14.116666666666667
  • eventlogging_QuickSurveyInitiation: 16.766666666666666
  • 17.466666666666665
  • eventlogging_CpuBenchmark: 17.9
  • codfw.mediawiki.revision-score
  • eqiad.mediawiki.revision-score: 17.933333333333334
  • eventlogging_ResourceTiming: 18.866666666666667
  • eventlogging_RUMSpeedIndex: 18.933333333333334
  • codfw.mediawiki.revision-create
  • eqiad.mediawiki.revision-create: 21.033333333333335
  • eventlogging_NavigationTiming: 26.816666666666666
  • codfw.resource_change
  • eqiad.resource_change: 31.53333333333333
  • codfw.mediawiki.recentchange
  • eqiad.mediawiki.recentchange: 32.93333333333333
  • codfw.mediawiki.job.recentChangesUpdate
  • eqiad.mediawiki.job.recentChangesUpdate: 36.05
  • eventlogging_LayoutShift: 37.1
  • eventlogging_DesktopWebUIActionsTracking: 39.28333333333333
  • eventlogging_PaintTiming: 39.75
  • codfw.mediawiki.job.cdnPurge
  • eqiad.mediawiki.job.cdnPurge: 46.266666666666666
  • eventlogging_InukaPageView: 49.63333333333333
  • statsv: 54.9
  • eventlogging_CentralNoticeImpression: 58.86666666666667
  • codfw.wdqs-external.sparql-query
  • eqiad.wdqs-external.sparql-query: 73.2
  • codfw.mediawiki.job.htmlCacheUpdate
  • eqiad.mediawiki.job.htmlCacheUpdate: 76.65
  • __consumer_offsets: 94.89999999999999
  • codfw.wdqs-internal.sparql-query
  • eqiad.wdqs-internal.sparql-query: 105.98333333333333
  • codfw.mediawiki.job.refreshLinks
  • eqiad.mediawiki.job.refreshLinks: 112.6
  • codfw.mediawiki.job.RecordLintJob
  • eqiad.mediawiki.job.RecordLintJob: 113.61666666666666
  • eventlogging_MobileWikiAppLinkPreview: 117.36666666666666
  • codfw.mediawiki.job.wikibase-addUsagesForPage
  • eqiad.mediawiki.job.wikibase-addUsagesForPage: 130.48333333333332
  • eventlogging_MobileWikiAppSessions: 166.25
  • eqiad.resource_change
  • codfw.resource_change: 252.06666666666666
  • codfw.mediawiki.client.session_tick
  • eqiad.mediawiki.client.session_tick: 451.6666666666667
  • eventlogging_SearchSatisfaction: 529.0166666666667
  • eventlogging_VirtualPageView: 1206.0666666666666
  • codfw.resource-purge
  • eqiad.resource-purge: 1877.0166666666669
  • eventlogging-client-side: 1952.9166666666665
  • codfw.mediawiki.cirrussearch-request
  • eqiad.mediawiki.cirrussearch-request: 3334.116666666667
  • atskafka_test_webrequest_text: 3818.0333333333333
  • netflow: 4522.383333333333
  • codfw.mediawiki.api-request
  • eqiad.mediawiki.api-request: 8553.283333333333
  • webrequest_upload (24 partitions): 24/24: 73291.51666666666
  • webrequest_text (24 partitions): 5/24: 122753.08333333334

Event Timeline

I tried that command you mentioned @Ottomata by copying the self-contained binary on to kafka-jumbo. Passing --brokers -2 means to apply for all brokers in cluster.

razzi@kafka-jumbo1002:~$ ./topicmappr rebuild --force-rebuild --brokers -2 --topics webrequest_text --zk-addr conf1004.eqiad.wmnet --zk-prefix kafka/jumbo-eqiad


Broker change summary:
  New broker 1007
  New broker 1008
  New broker 1009
  Replacing 0, added 3, missing 0, total count changed by 3

  Expanding/rebalancing topic with 3 additional broker(s) (this is a no-op unless --force-rebuild is specified)
  Force rebuilding map

Partition map changes:
  webrequest_text p0: [1006 1005 1001] -> [1002 1004 1009] replaced broker
  webrequest_text p1: [1002 1003 1004] -> [1007 1008 1001] replaced broker
  webrequest_text p2: [1005 1003 1006] -> [1001 1006 1005] replaced broker
  webrequest_text p3: [1001 1004 1006] -> [1009 1004 1003] replaced broker
  webrequest_text p4: [1003 1006 1002] -> [1005 1009 1002] replaced broker
  webrequest_text p5: [1004 1002 1003] -> [1003 1001 1008] replaced broker
  webrequest_text p6: [1006 1002 1005] -> [1004 1006 1003] replaced broker
  webrequest_text p7: [1002 1005 1003] -> [1006 1003 1007] replaced broker
  webrequest_text p8: [1005 1001 1003] -> [1008 1007 1001] replaced broker
  webrequest_text p9: [1001 1003 1004] -> [1002 1008 1004] replaced broker
  webrequest_text p10: [1003 1004 1006] -> [1007 1002 1006] replaced broker
  webrequest_text p11: [1004 1006 1002] -> [1001 1005 1008] replaced broker
  webrequest_text p12: [1006 1004 1002] -> [1009 1004 1002] replaced broker
  webrequest_text p13: [1002 1006 1005] -> [1005 1002 1009] replaced broker
  webrequest_text p14: [1005 1002 1003] -> [1003 1006 1005] replaced broker
  webrequest_text p15: [1001 1005 1003] -> [1004 1001 1008] replaced broker
  webrequest_text p16: [1003 1001 1004] -> [1006 1003 1005] replaced broker
  webrequest_text p17: [1004 1003 1006] -> [1008 1007 1001] replaced broker
  webrequest_text p18: [1006 1003 1004] -> [1002 1005 1009] replaced broker
  webrequest_text p19: [1002 1004 1006] -> [1007 1009 1003] replaced broker
  webrequest_text p20: [1005 1006 1002] -> [1001 1008 1004] replaced broker
  webrequest_text p21: [1001 1005 1003] -> [1009 1004 1002] replaced broker
  webrequest_text p22: [1003 1005 1001] -> [1005 1006 1001] replaced broker
  webrequest_text p23: [1004 1001 1003] -> [1003 1007 1006] replaced broker

Broker distribution:
  degree [min/max/avg]: 4/5/4.33 -> 5/7/6.00
  Broker 1001 - leader: 3, follower: 6, total: 9
  Broker 1002 - leader: 3, follower: 5, total: 8
  Broker 1003 - leader: 3, follower: 5, total: 8
  Broker 1004 - leader: 2, follower: 6, total: 8
  Broker 1005 - leader: 3, follower: 5, total: 8
  Broker 1006 - leader: 2, follower: 6, total: 8
  Broker 1007 - leader: 3, follower: 4, total: 7
  Broker 1008 - leader: 2, follower: 6, total: 8
  Broker 1009 - leader: 3, follower: 5, total: 8


New partition maps:

So it looks like it comes up with a workable redistribution, and it's nice to have it enumerate all the changes. However, some reassignments don't share anything with the previous state:

webrequest_text p1: [1002 1003 1004] -> [1007 1008 1001] replaced broker

so it'd have to copy all the data to the new nodes, potentially even interrupting processing for any clients connected to the old nodes (I'm not sure if this is how clients work).

Wow that is very cool!

some reassignments don't share anything with the previous state:

Hm, I guess that's ok, data will be blasting around all over the place anyway.

potentially even interrupting processing for any clients connected to the old nodes

Naw, it should be fine. The replica mover just temporarily adds new replicas, so for a short time period this partition would have 6 replicas instead of 3. Once all the replicas are in sync, it will drop the original replicas, causing only the 3 new replicas to be in sync. Since the preferred leader replica will have changed, this should trigger a leader election and cause connected clients to reconnect to the new leader, in this case 1007.

@Ottomata when I tested topicmappr before, I uploaded the binary directly onto the host; when we do this in production, will it make sense to debianize

Hm, topicmappr just helps in generating the reassignment.json file, right? I think we can use it as a one off tool to generate the reassignment.json file without debianizing it.

Migration plan for partition rebalancing

Goal: all topics with > ~10 messages / second to have their partitions redistributed to include kafka-jumbo7-9.

This will be done in 3 parts: low traffic (< 100 m / s), medium traffic (100 - 1000 m / s), and high traffic (> 1000 m / s). This comment has only low traffic; whether medium and high traffic follow the same pattern will depend on how that goes.

To view kafka-jumbo topics sorted by traffic:

Metrics to watch:

  • Is the producer buffer filling up? Maxes out at 720k, after which messages will be dropped.

  • Is IO slowed down?

Part I: low-traffic topics
  1. Low-traffic topic with 1 partition

eqiad.mediawiki.revision-create 16.686670667466828 messages / second

Contents of eqiad.mediawiki.revision-create.json


Apply with:

kafka-reassign-partitions --zookeeper conf1004.eqiad.wmnet,conf1005.eqiad.wmnet,conf1006.eqiad.wmnet/kafka/jumbo-eqiad --reassignment-json-file eqiad.mediawiki.revision-create.json --execute --throttle 10000000

  1. Low traffic topic with 3 partitions

eqiad.resource_change 37.93333333333334 messages / second

Original json which migrates all partitions:


First, only migrating partition 0:

Contents of eqiad.resource_change.part1.json


kafka-reassign-partitions --zookeeper conf1004.eqiad.wmnet,conf1005.eqiad.wmnet,conf1006.eqiad.wmnet/kafka/jumbo-eqiad --reassignment-json-file eqiad.resource_change.part1.json --execute --throttle 10000000

Then migrating other 2:

Contents of eqiad.resource_change.part2.json


kafka-reassign-partitions --zookeeper conf1004.eqiad.wmnet,conf1005.eqiad.wmnet,conf1006.eqiad.wmnet/kafka/jumbo-eqiad --reassignment-json-file eqiad.resource_change.part2.json --execute --throttle 10000000

  1. Other low-traffic topics

Continue as above for the following low-traffic topics:

Namemessages / second

Json files for each have been written to my home directory on kafka-jumbo1002 via the following command:

./topicmappr rebuild --force-rebuild --brokers -2 --topics '^eqiad\.wdqs-internal\.sparql-query$|^eqiad\.wdqs-external\.sparql-query$|^eqiad\.mediawiki\.job\.wikibase-addUsagesForPage$|^eqiad\.mediawiki\.job\.htmlCacheUpdate$|^eqiad\.mediawiki\.job\.cdnPurge$|^statsv$|^codfw\.mediawiki\.client\.session_tick$|^codfw\.mediawiki\.api-request$|^eventlogging_PaintTiming$|^eventlogging_DesktopWebUIActionsTracking$|^eqiad\.mediawiki\.job\.LocalGlobalUserPageCacheUpdateJob$|^eqiad\.mediawiki\.recentchange$|^eqiad\.mediawiki\.job\.recentChangesUpdate$|^eventlogging_NavigationTiming$|^eventlogging_LayoutShift$|^eqiad\.mediawiki\.page-links-change$|^eventlogging_ResourceTiming$|^eqiad\.mediawiki\.revision-score$|^eventlogging_CpuBenchmark$|^eventlogging_InukaPageView$|^eventlogging_RUMSpeedIndex$|^eventlogging_MobileWikiAppDailyStats$|^eqiad\.mediawiki\.job\.ORESFetchScoreJob$|^eventlogging_QuickSurveyInitiation$|^eventlogging_EditAttemptStep$|^eventlogging_CodeMirrorUsage$|^wdqs_streaming_updater_test$|^codfw\.wdqs-external\.sparql-query$' --zk-addr conf1004.eqiad.wmnet --zk-prefix kafka/jumbo-eqiad

Nice plan, I like the amount of details! Going to add a few suggestions/questions for you:

  1. In the metrics to review I'd take into consideration clients that are not only varnishkafka, since the low traffic volume topics mentioned are mostly coming from Mirror Maker (so mirrored from the Kafka main cluster) and some of them directly from Eventlogging (on eventlog1002). It is very important in my opinion to know what are (some) producers/consumers of each topic that we'll change, so we'll have a quick way to judge if things go sideways for any reason.
  1. the kafka-reassign-partitions can be rewritten in kafka reassign-partitions --reassignment-json-file eqiad.mediawiki.revision-create.json --execute --throttle 10000000, Andrew wrote a nice script that removes the need for zookeeper boilerplate configs etc.. (you can use it also for other kafka-related commands, type kafka on any kafka node to see the list of commands available). I am also curious about the --throttle 10000000, is there a specific reason for such high value?
  1. What is the plan if when executing the topic partition moves something starts to error out (say a consumer etc..) ? Is there a rollback plan that we can quickly use? I am asking since if possible it would be nice to have it tested in kafka-test before starting the procedure :)
  1. As first step, I'd execute the procedure on one/two topics and leave them running for a couple of hours with the new config, watching metrics in the meantime, just to make sure that nothing horrible happens on clients/producers after the maintenance (some bugs might pop up after some time, rather than immediately).

Nice stuff!

In the metrics to review I'd take into consideration clients that are not only varnishkafka

There are 4 main producers of data we should watch: varnishkafka, eventgate-analytics-external and eventgate-analytics, and eventlogging-processor on eventlog1002.

We can watch eventlogging-processor logs on eventlog1002 like:

sudo journalctl -f   -u eventlogging-processor@client-side-*.service

And eventgate-* app logs can be watched in logstash (this URL might as they upgrade logstash...)

Aside from that I think we'll just have to watch the dashboards and especially pay attention to the Kafka broker iowait.

Json files for each have been written to my home directory on kafka-jumbo1002 via the following command:

OH! Cool topicmappr is nice. You've supplied all the topics but it generates individual reassignment files for each. I also just verified that the output for a given topic is the same whether or not you give it a list of topics or just one, so I assume the assignments it generates are deterministic every time.

To view kafka-jumbo topics sorted by traffic:

@razzi, if we balance an eqiad. prefixed topic, we should also balance its corresponding codfw. prefixed one. The codfw topic might not have any traffic in it now, but in the case of a datacenter switchover, it will then have the same throughput as the eqiad one.

Mentioned in SAL (#wikimedia-operations) [2021-01-19T21:46:20Z] <ottomata> wiping kafka-test cluster data and starting from scratch - T255973

With @ottomatta we came up with a way to rollback a partition migration.
When applying a migration, it prints the current state, which can be used to migrate the partitions back,
however while a migration is running, trying to start another gives the error "There is an existing assignment running."

The way to cancel an in-progress migration is to execute the following commands:

zookeeper-shell zookeeper-test1002.eqiad.wmnet/kafka/test-eqiad
rmr /admin/reassign_partitions
rmr /kafka/test-eqiad/controller

At this point the rollback state can be applied, and since the migrations work by adding new replicas to the set of replicas, it will apply instantaneously if nodes it attempts to add are still in the In-Sync Replica (ISR) set.

Attempting to change the throttle gives the same error when there is an existing assignment running; the throttle can be removed using the following commands for each node:

kafka configs --alter --entity-type brokers --entity-name 1006 --delete-config leader.replication.throttled.rate
kafka configs --alter --entity-type brokers --entity-name 1006 --delete-config follower.replication.throttled.rate

Migrated the following topics on kafka-jumbo:


The migrations still to be run are on kafka-jumbo1002 in /home/razzi/rebalance-json.


To start a migration (this uses a throttle of 10 MB/s):

kafka reassign-partitions --reassignment-json-file <topic>.json --execute --throttle 10000000

This will print the current partition assignment which can be used to rollback a migration.
Copy and paste this into ~/rebalance-json/reverts/REVERT-<topic>.json.

To check the progress, compare the topic size on disk on a node that was previously a replica with a node that is being added to be a replica. For example:

razzi@kafka-jumbo1002:~/rebalance-json$ du -sh /srv/kafka/data/eqiad.mediawiki.revision-create-0/
21G     /srv/kafka/data/eqiad.mediawiki.revision-create-0/
razzi@kafka-jumbo1009:~/rebalance-json$ du -sh /srv/kafka/data/eqiad.mediawiki.revision-create-0/
13G     /srv/kafka/data/eqiad.mediawiki.revision-create-0/

To check if a migration has finished (also removes the throttle if it has):

kafka reassign-partitions --reassignment-json-file eqiad.mediawiki.revision-create.json --verify

Once a topic is done, move it to ~/rebalance-json/done/. In this way the directory ~/rebalance-json/ serves as a sort of to-do list.

One more useful command: to change the throttle rate, run the on the node data is coming from and the node the data is going to. For example, if data is being copied from kafka-jumbo1003 to kafka-jumbo1009:

$ kafka configs --alter --entity-type brokers --entity-name 1003 --add-config leader.replication.throttled.rate=10000000,follower.replication.throttled.rate=10000000

$ kafka configs --alter --entity-type brokers --entity-name 1009 --add-config leader.replication.throttled.rate=10000000,follower.replication.throttled.rate=10000000

As we get into the higher-volume topics, we are seeing some alerts about replica max lag and under-replicated partions. As I continue to run migrations, those alerts should be disabled for a few hours at a time and the metrics should be observed manually in Grafana.

Ok! Now that we're on to the final and highest traffic topics, webrequest_upload and webrequest_text, we're switching to migrating one partition at a time. Here are the full migrations plans, in case they get modified in the process.





Each partition can be split into a migration such as:



I'm thinking of writing up the steps for rebalancing partitions in a wiki article such as, and I'm reminded of how I scp'd the topicmappr executable to kafka-jumbo1002 and how that's hacky. Should we make a plan to properly package topicmappr?

If it isn't hard it couldn't hurt!

Although, it is a go library, which I'm not sure we have much tooling around dealing with. I think maybe @ema has made some Go based .debs before?

Although, it is a go library, which I'm not sure we have much tooling around dealing with. I think maybe @ema has made some Go based .debs before?

Hey! Yes we do have some Golang programs debianized, see for example atskafka.

Thanks for your comment @ema. I'll see if I can do the same with kafka-kit.

kafka-kit depends on other packages such as, which is not available in Buster but is available in Bullseye. We could backport all these packages but it would be nontrivial and potentially duplicative of effort.

kafka-kit depends on other packages such as, which is not available in Buster but is available in Bullseye. We could backport all these packages but it would be nontrivial and potentially duplicative of effort.

For go packages we tend to follow this rule - use the Debian deps as much as possible, and include what's missing in the package itself. I agree that importing/packaging dependencies it is too much, so it is fine to just add them to the deb :)

Prometheus doesn't seem to like long range queries, so I can't show more than 30 days back, but we can see the topic data difference converging across all jumbo brokers:

Screen Shot 2021-04-19 at 11.22.14.png (2×4 px, 850 KB)

Very cool!