Page MenuHomePhabricator

Introduce Spicerack.kafka module, along with the method to transfer offset state between consumer groups and clusters
Closed, ResolvedPublic

Description

A problem statement
New WDQS Flink-based updater uses Kafka to communicate between parts of the process. To effectively bootstrap new WDQS instances, we need to be able to copy data from already bootstrapped instances to the new ones. This process requires copying both the Blazegraph journal data and kafka offsets. We already got the first part, but we need to be able to transfer offsets between different Kafka consumer groups and cluster (for a cross DC transfer).

This proposal is about introducing a Spicerack module to do just that. The process itself is independent from our need, it can be reused by anybody - hence the idea to make it into Spicerack. This Spicerack module will use puppet generated config to handle any potential changes in Kafka configuration.

New module (Kafka) will introduce the method:
transfer_kafka_position(self, topics: List[str],from_site: str, from_cluster: str, from_consumer_group: str,to_site: str, to_cluster: str, to_consumer_group: str)

,which will allow the transfer from a given cluster, site and consumer group to another. For internal cluster transfers, simple offset will be transferred. For cross cluster ones, offset will be approximated through the original timestamp.

Third party dependencies
python-kafka - version 1.4.3 (available in Buster) is enough.

Relevant task
More complete story on our needs - T276469

Event Timeline

@Zbyszko thanks a lot for being the first user of our new process as outlined in https://wikitech.wikimedia.org/wiki/Spicerack#How_to_contribute

New module (Kafka) will introduce the method:

I see no problem for the new module, dependencies are fine.

transfer_kafka_position(self, topics: List[str],from_site: str, from_cluster: str, from_consumer_group: str,to_site: str, to_cluster: str, to_consumer_group: str)

I have some questions and comments for this method:

  • Does this method returns anything?
  • Would it make sense to rename it to transfer_position given that is part of a Kafka class? The repetition seems unnecessary
  • I see that you're passing quite few parameters and they are duplicated between source and destination. I'm wondering if maybe it would make sense to add a small KafkaConsumer (sorry for the name I'm not familiar with Kafka, it probably makes no sense, but it's just to explain my point) @dataclass that has the site, cluster and consumer group. Basically something like:
@dataclass
class KafkaConsumer:
    site: str
    cluster: str
    group: str  # or consumer_group, whatever feels more right in Kafka terms

transfer_position(self, topics: List[str], from: KafkaConsumer, to: KafkaConsumer)

I see also that the proposed patch has now also a set_kafka_position_by_timestamp method, that could benefit from the above dataclass and for which I would too propose to rename it removing the kafka part.

  • Does this method returns anything?

No, there's no need - if there are issues, there will be exceptions raised, otherwise it's ok.

  • Would it make sense to rename it to transfer_position given that is part of a Kafka class? The repetition seems unnecessary

I like that suggestion, I modified it a little bit so it makes more sense in Kafka context (it was missing from the original as well).

  • I see that you're passing quite few parameters and they are duplicated between source and destination.

Makes sense, I modified the code accordingly.

joanna_borun triaged this task as Medium priority.