Kasocki can be configured to register callbacks for node-rdkafka on event handlers. One of the useful event handlers is the event.stats (event.status?) event. It is called by librdkafka for stats reporting. When configured, it will give a stats object as pasted below.
We should write a micro node module that includes a callback that uses a node-statsd client instance to report useful stats from this node-rdkafka event.stats callback.
node-rdkafka-statsd perhaps?
{
"topics": {
"test4": {
"partitions": {
"-1": {
"rx_ver_drops": 0,
"msgs": 0,
"txbytes": 0,
"txmsgs": 0,
"consumer_lag": -1,
"hi_offset": -1001,
"lo_offset": -1001,
"eof_offset": -1001,
"committed_offset": -1001,
"xmit_msgq_bytes": 0,
"xmit_msgq_cnt": 0,
"msgq_bytes": 0,
"msgq_cnt": 0,
"unknown": false,
"desired": false,
"leader": -1,
"partition": -1,
"fetchq_cnt": 0,
"fetchq_size": 0,
"fetch_state": "none",
"query_offset": 0,
"next_offset": 0,
"app_offset": -1001,
"stored_offset": -1001,
"commited_offset": -1001
},
"0": {
"rx_ver_drops": 0,
"msgs": 0,
"txbytes": 0,
"txmsgs": 0,
"consumer_lag": 0,
"hi_offset": 34784,
"lo_offset": -1001,
"eof_offset": 34784,
"committed_offset": -1001,
"xmit_msgq_bytes": 0,
"xmit_msgq_cnt": 0,
"msgq_bytes": 0,
"msgq_cnt": 0,
"unknown": false,
"desired": true,
"leader": 0,
"partition": 0,
"fetchq_cnt": 30582,
"fetchq_size": 6224678,
"fetch_state": "active",
"query_offset": 0,
"next_offset": 34784,
"app_offset": 6143,
"stored_offset": 6143,
"commited_offset": -1001
}
},
"metadata_age": 5995,
"topic": "test4"
}
},
"brokers": {
"mediawiki-vagrant.dev:9092/0": {
"toppars": {
"test4": {
"partition": 0,
"topic": "test4"
}
},
"throttle": {
"cnt": 0,
"sum": 0,
"avg": 0,
"max": 0,
"min": 0
},
"rtt": {
"cnt": 0,
"sum": 0,
"avg": 0,
"max": 0,
"min": 0
},
"rxpartial": 3,
"rxcorriderrs": 0,
"waitresp_msg_cnt": 0,
"waitresp_cnt": 1,
"outbuf_msg_cnt": 0,
"outbuf_cnt": 0,
"stateage": 7023265,
"state": "UP",
"nodeid": 0,
"name": "mediawiki-vagrant.dev:9092/0",
"tx": 57,
"txbytes": 3609,
"txerrs": 0,
"txretries": 0,
"req_timeouts": 0,
"rx": 56,
"rxbytes": 3406226,
"rxerrs": 0
},
"localhost:9092/bootstrap": {
"toppars": {},
"throttle": {
"cnt": 0,
"sum": 0,
"avg": 0,
"max": 0,
"min": 0
},
"rtt": {
"cnt": 0,
"sum": 0,
"avg": 0,
"max": 0,
"min": 0
},
"rxpartial": 0,
"rxcorriderrs": 0,
"waitresp_msg_cnt": 0,
"waitresp_cnt": 0,
"outbuf_msg_cnt": 0,
"outbuf_cnt": 0,
"stateage": 7058566,
"state": "UP",
"nodeid": -1,
"name": "localhost:9092/bootstrap",
"tx": 5,
"txbytes": 172,
"txerrs": 0,
"txretries": 0,
"req_timeouts": 0,
"rx": 5,
"rxbytes": 8146,
"rxerrs": 0
}
},
"name": "rdkafka#consumer-1",
"type": "consumer",
"ts": 69158506047,
"time": 1472147268,
"replyq": 30583,
"msg_cnt": 0,
"msg_max": 100000,
"simple_cnt": 0
}