Page MenuHomePhabricator

Write node-rdkafka event.stats callback that reports stats to statsd
Closed, ResolvedPublic5 Estimated Story Points

Description

Kasocki can be configured to register callbacks for node-rdkafka on event handlers. One of the useful event handlers is the event.stats (event.status?) event. It is called by librdkafka for stats reporting. When configured, it will give a stats object as pasted below.

We should write a micro node module that includes a callback that uses a node-statsd client instance to report useful stats from this node-rdkafka event.stats callback.

node-rdkafka-statsd perhaps?

{
  "topics": {
    "test4": {
      "partitions": {
        "-1": {
          "rx_ver_drops": 0,
          "msgs": 0,
          "txbytes": 0,
          "txmsgs": 0,
          "consumer_lag": -1,
          "hi_offset": -1001,
          "lo_offset": -1001,
          "eof_offset": -1001,
          "committed_offset": -1001,
          "xmit_msgq_bytes": 0,
          "xmit_msgq_cnt": 0,
          "msgq_bytes": 0,
          "msgq_cnt": 0,
          "unknown": false,
          "desired": false,
          "leader": -1,
          "partition": -1,
          "fetchq_cnt": 0,
          "fetchq_size": 0,
          "fetch_state": "none",
          "query_offset": 0,
          "next_offset": 0,
          "app_offset": -1001,
          "stored_offset": -1001,
          "commited_offset": -1001
        },
        "0": {
          "rx_ver_drops": 0,
          "msgs": 0,
          "txbytes": 0,
          "txmsgs": 0,
          "consumer_lag": 0,
          "hi_offset": 34784,
          "lo_offset": -1001,
          "eof_offset": 34784,
          "committed_offset": -1001,
          "xmit_msgq_bytes": 0,
          "xmit_msgq_cnt": 0,
          "msgq_bytes": 0,
          "msgq_cnt": 0,
          "unknown": false,
          "desired": true,
          "leader": 0,
          "partition": 0,
          "fetchq_cnt": 30582,
          "fetchq_size": 6224678,
          "fetch_state": "active",
          "query_offset": 0,
          "next_offset": 34784,
          "app_offset": 6143,
          "stored_offset": 6143,
          "commited_offset": -1001
        }
      },
      "metadata_age": 5995,
      "topic": "test4"
    }
  },


  "brokers": {
    "mediawiki-vagrant.dev:9092/0": {
      "toppars": {
        "test4": {
          "partition": 0,
          "topic": "test4"
        }
      },
      "throttle": {
        "cnt": 0,
        "sum": 0,
        "avg": 0,
        "max": 0,
        "min": 0
      },
      "rtt": {
        "cnt": 0,
        "sum": 0,
        "avg": 0,
        "max": 0,
        "min": 0
      },
      "rxpartial": 3,
      "rxcorriderrs": 0,
      "waitresp_msg_cnt": 0,
      "waitresp_cnt": 1,
      "outbuf_msg_cnt": 0,
      "outbuf_cnt": 0,
      "stateage": 7023265,
      "state": "UP",
      "nodeid": 0,
      "name": "mediawiki-vagrant.dev:9092/0",
      "tx": 57,
      "txbytes": 3609,
      "txerrs": 0,
      "txretries": 0,
      "req_timeouts": 0,
      "rx": 56,
      "rxbytes": 3406226,
      "rxerrs": 0
    },
    "localhost:9092/bootstrap": {
      "toppars": {},
      "throttle": {
        "cnt": 0,
        "sum": 0,
        "avg": 0,
        "max": 0,
        "min": 0
      },
      "rtt": {
        "cnt": 0,
        "sum": 0,
        "avg": 0,
        "max": 0,
        "min": 0
      },
      "rxpartial": 0,
      "rxcorriderrs": 0,
      "waitresp_msg_cnt": 0,
      "waitresp_cnt": 0,
      "outbuf_msg_cnt": 0,
      "outbuf_cnt": 0,
      "stateage": 7058566,
      "state": "UP",
      "nodeid": -1,
      "name": "localhost:9092/bootstrap",
      "tx": 5,
      "txbytes": 172,
      "txerrs": 0,
      "txretries": 0,
      "req_timeouts": 0,
      "rx": 5,
      "rxbytes": 8146,
      "rxerrs": 0
    }
  },

  "name": "rdkafka#consumer-1",
  "type": "consumer",
  "ts": 69158506047,
  "time": 1472147268,
  "replyq": 30583,
  "msg_cnt": 0,
  "msg_max": 100000,
  "simple_cnt": 0
}

Event Timeline

@Ottomata Actually, could you make this a separate micro-module in npm so that ChangeProp could do the same thing?

or as a separate node module for use with node-rdkafka,

Sounds like you like option 2 the best, I think I concur! :)

This can benefit from code already existing on varnishkafka (for inspiration) that parses a flat json and sends it back to statsd

Milimetric changed the point value for this task from 0 to 8.Sep 15 2016, 4:16 PM

@Ottomata: in what repo does this code go, I imagine it no longer goes into kasocki

Nuria, a new git repo. This should be a standalone node module.

node-rdkafka-statsd or perhaps node-rdkafka-stats (if we want to abstract
out the statsd specific part, not sure if we do) are possible names.

<notes>
Gerrit module.
Node module to talk to statsd?
ES6 syntax?
Mocha for testing (see kasocki)
As minimal as possible.
</notes>

Some similar code:

https://github.com/wikimedia/kasocki
https://github.com/wikimedia/operations-puppet-varnishkafka/blob/master/files/varnishkafka_ganglia.py

Every time i have created a gerrit depot I have requested it here: https://www.mediawiki.org/wiki/Git/New_repositories/Requests

Let me know if you can create it, otherwise i will request it

Ah! I have been reminded, that the more recent way we get stats from varnishkafka (and librdkafka) now is via logster. I wrote JsonLogster parser long ago to do this:

https://github.com/wikimedia/operations-debs-logster/blob/master/logster/parsers/JsonLogster.py

It might be easier to follow that code than the ganglia stuff.

@Ottomata, we have a nice metric reporting abstraction in service-runner. I would recommend to accept a metrics reporter with this interface.

+1, this interface is basically the same as nodes-statsd, ja? So yeah, likely this should work like this:

var StatsD = require('node-statsd'), 
client = new StatsD({ host: 'statsd.eqiad.wmnet'});  // or metrics-reporter

var rdkafkaStatsdCb = require('node-rdkafka-statsd')(client);

var kafka = require('node-rdkafka');
var kafkaConsumer = new kafka.KafkaConsumer({
    ...
    'statistics.interval.ms': 30000,
    'event.stats': rdkafkaStatsdCb
});

@Ottomata, it is almost identical to node-statsd, but also adds some minor API extensions like batching support. We aren't using node-statsd any more, and instead switched the statsd backend to https://github.com/brightcove/hot-shots for its support of batching.

As long as the interface is the same, it should work. Coo

Nuria changed the point value for this task from 8 to 5.Nov 7 2016, 8:44 PM

Change 324277 had a related patch set uploaded (by Nuria):
Consolidate docs on README

https://gerrit.wikimedia.org/r/324277

Change 324327 had a related patch set uploaded (by Ottomata):
Expand rdkafka stats whitelist

https://gerrit.wikimedia.org/r/324327

Change 324327 merged by Nuria:
Expand rdkafka stats whitelist

https://gerrit.wikimedia.org/r/324327

Change 324277 merged by Ottomata:
Consolidate docs on README

https://gerrit.wikimedia.org/r/324277