Page MenuHomePhabricator

Replace kafka-node with more mature bindings, ideally using librdkafka
Closed, DuplicatePublic

Description

The kafka-node library lacks thorough error handling & recovery, and does not support Kafka 0.10+. Shortcomings in fault handling have contributed to at least one outage.

To address this, we should look into replacing it with a better kafka binding. The librdkafka C library is well-maintained, and is what we are investigating for Python as well. There are node bindings at node-kafka-native, which look like a good starting point. There is active development, but we'll have to check how mature it really is.

Requirement: Load balancing between consumers in a single-partition consumer group

For ChangeProp, we have a specific requirement for balancing single-partition topics across many consumers in the same consumer group. We have implemented this on top of kafka-node with a randomized take-over algorithm, which talks directly to Zookeeper. We'll have to provide similar functionality in the new driver.

Event Timeline

GWicke updated the task description. (Show Details)

Yeehaw! I don't have much time to help at the moment, but I am invested in this too.

I spent an hour hacking with node-kafka-native once, and it successfully builds against later librdkafka versions. It will need modifications to make it work with newer librdkafka APIs, especially ones around high level consumer group and offsets management. At the moment, node-kafka-native does a lot of this stuff itself in Javascript after it gets data back from librdkafka. Now that most of this logic is inside librdkafka itself, I hope that mostly we will just need to write bindings for thew newer librdkafka APIs, and can scrap a bit of the custom Javascript.

I looked a bit into options for load balancing single-partition consumer groups. The algorithm specified by Kafka & implemented by librdkafka (see here and here) is client-side, and is a deterministic algorithm based on the sort order of consumers, by consumer id. This means that we should be able to evenly distribute the consumption of single-partition topics by randomizing consumer ids.

We already assign random consumer ids in kafka-node, but it seems that what was missing to make this work was sorting by consumer id in the rebalance implementation. The list returned from Zookeeper's getChildren method is ["not sorted and no guarantee is provided as to its natural or lexical order"](https://zookeeper.apache.org/doc/r3.3.3/api/org/apache/zookeeper/ZooKeeper.html#getChildren(java.lang.String, org.apache.zookeeper.Watcher)).

I looked a bit into options for load balancing single-partition consumer groups. The algorithm specified by Kafka & implemented by librdkafka (see here and here) is client-side, and is a deterministic algorithm based on the sort order of consumers, by consumer id. This means that we should be able to evenly distribute the consumption of single-partition topics by randomizing consumer ids.

Indeed. When I initially thought of random IDs, I dismissed the algorithm as it's lacking strong guarantees that all rules will not be processed by the same worker, but given that for each rule we create a new client, we can dismiss that edge case.

I have prototyped the change-propagation service using the kafka-native driver. The prototype doesn't really work (obviously), but the exercise gave some some understanding on the scope of work required.

Here's the list of issues I found with the kafka-native driver:

  1. It is bound to an older librdkafka version, so it doesn't support all the newer features from 0.9 and 0.10. But it's even worse - it doesn't support consumer groups properly, and the owner of the package explicitly states that he will not implement it.
  2. It implicitly assumes topic autocreation, which is not the case in production. Also, in tests we want to manually create topics which's not possible either.
  3. I have managed to SIGABRT it in a couple of tests. The most obvious and worrying one is that it fails with SIGABRT if I try to specify the client id in the consumer
  4. It's whole commit management code is completely useless for us. Not only that it stores commits locally (which is easily fixable by replacing the OffsetManager implementation to use ZK), but the whole logic is different from what we need. Current logic is following: get a set of messages, call a message callback with all the messages, after the receive_callback promise is resolved - commit all the offsets. In our case we want to manually control the commit process as we have the TaskQueue for concurrency limiting.
  5. It has quite poor error handling, I didn't find a way to properly handle some error types.
  6. The project seems to be abandoned by the owner, the last release was 5 months ago, no activity after that.

Basically, after playing with it for a little while in context of change-prop, it seems like 70% of the JS code is completely useless for us. Native bindings code might be a nice start, but we wanna use a newer librdkafka version, so they will need modification too.

Overall, a significant amount of work is required to adapt the kafka-native driver to our use-case, so I'm wondering, whether it would be wise to create our own driver, having the kafka-native bindings code as an inspiration?

Hey! I've actually been working on this! We need a sync up!

librdkafka actually implements most of what I need, especially around auto balanced consumers. I've been able to scrap a lot of code since functionality is now in librdkafka. I've been focusing on high level consumer, but I think it should be possible to mostly feature port https://github.com/confluentinc/confluent-kafka-python to nodejs. I'll put something up on github in a bit.

@Ottomata Awesome. We have a sync-up meeting on July 11, so it's not too long to wait. Tomorrow I will try to formalise a set of requirements we need from the driver in change-prop so that we could share the implementation.

Awesome. Yeah I'll need lots of help on this. I'm dancing in a new world here with this all this Nan + nodejs + librdkafka + callbacks in 3 languages + Promises stuff. :)

Here's my super WIP attempt:
https://github.com/ottomata/node-kafka-native/tree/new_consumer

Currently:

var kafka_native = require('./index')



var consumer = new kafka_native.Consumer({
    driver_options: {
        'bootstrap.servers': 'localhost:9092',
        'group.id': new Date().toString(),
        'enable.auto.commit': false,
    },
    topic_options: {
        'auto.offset.reset': 'smallest',
        'auto.commit.enable': false,
    }
});

consumer.subscribe(['test4']);

function recv(m) {
    // Don't print end of topic/partition errors
    if (!('errcode' in m) || m.errcode != -191) {
        console.log(m);
    }
}

while (true) {
    recv(consumer.poll());
}

Innnteresting. At the Kafka Summit back in May, I met a guy from Blizzard who said they had an in house node Kafka client. I asked him about open sourcing it, and he said he wanted to, but wasn't sure if Blizzard would let them.

Today he emailed me about this:
https://github.com/Blizzard/node-rdkafka

@Ottomata Haha, they came up with the same name as I did. This actually complicates things quite a bit, because now we have to decide what to do again... I didn't look at there code too closely yet, but it seems (obviously) way more mature then my work.

I can see a couple of things they didn't do right from my perspective, like packing the messages object using Nan::Set which is from my testing 5 times slower then ObjectWrap, but overall it seems very nice as the best software is the one written not by you.

So, the question is - should we try to use my client, or join forces with blizzard? I will play with their client for some time today to gather some info on performance and stability and will report here.

Yeah tough question! I don’t know the answer. I’d defer to whatever you
think is best here, since you know much more about the node binding world
than I do. They do have assign though :)