Page MenuHomePhabricator

Make webperf eventlogging consumers use eventlogging on Kafka
Closed, ResolvedPublic

Description

hafnium hosts eventlogging consumers that consume from eventlogging's :8600 zmq stream of valid eventlogging events. Analytics in the process of replacing eventlogging's backend with Kafka instead of zmq. the zmq stream will soon be turned off.

Consumers:

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

Ah phoo, in order to consume from Kafka, Zookeeper ports on conf100x will have to be opened up to hafnium, which has a public IP. Hm.

Why does hafnium have a public IP anyway?

Change 237479 merged by Ottomata:
Remove eventlogging multiplexer, forward kafka eventlogging events to zmq port 8600

https://gerrit.wikimedia.org/r/237479

The current state of things:

I am running an eventlogging-forwarder on eventlog1001 that is consuming from kafka and forwarding to the ZMQ port :8600 that yall are using. If/when we modify your processes, we can turn off this forwarder.

Ottomata renamed this task from Make webperf eventlogging consumers use eventlogging on Kafka to Make webperf eventlogging consumers use eventlogging on Kafka {stag}.Sep 11 2015, 3:23 PM

@ori this might be worth putting into next up for performance team.

kevinator renamed this task from Make webperf eventlogging consumers use eventlogging on Kafka {stag} to Make webperf eventlogging consumers use eventlogging on Kafka.Oct 2 2015, 3:31 PM

I talked with @Krinkle on IRC yesterday and we came up with a plan of action. There are two parts to this. I will outline the code changes needed to happen for consumers here, and the infrastructure changes over on T131977.

For all eventlogging consumers:

  • import eventlogging instead of zmq+json.
  • add endpoint or input_uri CLI option.
  • configure consumers via puppet to use the --input-uri to consume from relevant Kafka topics. https://gerrit.wikimedia.org/r/#/c/279280/ will make this a little easier.
Krinkle updated the task description. (Show Details)

I talked with @Krinkle on IRC yesterday and we came up with a plan of action. There are two parts to this. I will outline the code changes needed to happen for consumers here, and the infrastructure changes over on T131977.

For all eventlogging consumers:

  • import eventlogging instead of zmq+json.
  • add endpoint or input_uri CLI option.
  • configure consumers via puppet to use the --input-uri to consume from relevant Kafka topics. https://gerrit.wikimedia.org/r/#/c/279280/ will make this a little easier.

With https://gerrit.wikimedia.org/r/279280 merged and webperf::statsv configured to use it, I've got most of this figured out, but there's still a few unanswered questions:

  1. How to connect to Kafka to consumer EventLogging data using the useful eventlogging.connect() abstraction? I've been unable to find an example of this, and analytics/statsv unfortunately uses python-kafka directly, which I'd prefer to avoid.

After reading into the code of eventlogging.connect/EventConsumer, I understand that there's a fair bit of abstraction. Where for the ZMQ stream the process is simple (connect to eventlogging zmq host, and filter results on the fly), for Kafka I'm not sure.

  1. Will the Kafka URI be just the brokers list, or also topics? The example in the code comments at eventlogging/handlers.py look like it requires a URI with topics in it, but looking at the code I don't think that's intended.

Where the ZMQ stream only contains EventLogging data (so the default subscribe='' works fine, we filter on the fly in puppet:/webperf/navtiming.py), Kafka will contain a lot more and the topics parameter is required. It doesn't have an abstraction for schema name to topic name (which I'd expect from eventlogging.connect()).

  1. Should I use eventlogging_$schema as topic and create two separate connections? Or subscribe to a topic that contains all EventLogging data and filter in-process (seems inefficient). Perhaps there's a way to subscribe to two topics over one connection? That'd be preferred.
  1. Should I, and if so how, set a consumer group? It seems useful to have, but not sure how to make it set group_id underneath. I guess via the identify parameter, except that seems to produce a tuple client_id, group_id, and not sure what to set for client_id (statsv, for example, doesn't set a client_id).
  1. We should presumably disable enable_auto_commit at first, to match previous zmq behaviour (similar to statsv).

Perhaps it would make sense to first update statsv.py to use the eventlogging library on hafnium? That way I can convert the perf-team maintained eventlogging subscribers to use the same patterns.

Woo wee lots of questions, ok!

Perhaps it would make sense to first update statsv.py to use the eventlogging library on hafnium

Naw, I don't think statsv should depend on eventlogging. It could, but it is mostly unrelated.

  1. We should presumably disable enable_auto_commit
  2. Should I, and if so how, set a consumer group?

If you disable auto commit, you don't need a consumer group. But yes, you are correct, identity will set a consumer group. You don't need to worry about client_id.

It doesn't have an abstraction for schema name to topic name (which I'd expect from eventlogging.connect()).

Not sure what you mean by this. eventlogging.connect doesn't do anything special except wrap get_reader(uri) and provide a fancy filter function. As you say, you don't use the filter function yourself.

Should I use eventlogging_$schema as topic and create two separate connections?

You can subscribe to multiple topics with one connection. Use topics=eventlogging_NavigationTiming,eventlogging_SaveTiming

Will the Kafka URI be just the brokers list, or also topics? The example in the code comments at eventlogging/handlers.py look like it requires a URI with topics in it, but looking at the code I don't think that's intended.

topics is needed. You can also pass any [[ https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html | kafka-python consumer parameters ]] directly in the kafka URI via query params.

Looking at navtiming.py, maybe you want something like this:

kafka_eventlogging_uri = 'kafka:///kafka1012.eqiad.wmnet:9092topics=eventlogging_NavigationTiming,eventlogging_SaveTiming,enable_auto_commit=False'
event_stream = eventlogging.connect(kafka_eventlogging_uri)

for event in event_stream:
    # event.schema_name() is a function that knows how to return the schema name from an event, without
    # hardcoding the lookup in the dictionary, e.g. event['schema'].
    f = handlers.get(event.schema_name())
        if f is not None:
            f(event)

Woo wee lots of questions, ok!

Perhaps it would make sense to first update statsv.py to use the eventlogging library on hafnium

Naw, I don't think statsv should depend on eventlogging. It could, but it is mostly unrelated.

Aye, statsv doesn't go through EventLogging (would defeat its lightweight purpose). I suppose it is possible (given connect supports passing any raw Kafka topic), but I fully agree this is unrelated. Not sure what I was thinking...

  1. We should presumably disable enable_auto_commit
  2. Should I, and if so how, set a consumer group?

If you disable auto commit, you don't need a consumer group. But yes, you are correct, identity will set a consumer group. You don't need to worry about client_id.

Makes sense. analytics/statsv.py does actually do both right now (sets group_id, and enable_auto_commit=False). I guess that's a (harmless) remnant from when it was enabled?

[..]

topics is needed. You can also pass any [[ https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html | kafka-python consumer parameters ]] directly in the kafka URI via query params.

Looking at navtiming.py, maybe you want something like this: [..]

Splendid. I'll see what I can make with this. Thanks!

Looks like it's not accepting the Kafka read method. At first I forgot to set PYTHONPATH (so it was using the old global install still), but after setting that, it still didn't work.

Two issues stand out:

  • It's trying to connect to 127.0.0.1:2181 for some reason.
  • It's using pykafka (not python-kafka).

Looking at the actual deployment, looks like it's using a version from October 2016. I guess it needs to be updated?

krinkle@hafnium:~$ export PYTHONPATH=/srv/deployment/eventlogging/eventlogging
krinkle@hafnium:~$ python nav-test.py --brokers kafka1012.eqiad.wmnet:9092,kafka1013.eqiad.wmnet:9092,kafka1014.eqiad.wmnet:9092,kafka1018.eqiad.wmnet:9092,kafka1020.eqiad.wmnet:9092,kafka1022.eqiad.wmnet:9092
2017-08-03 22:12:59,991 Successfully loaded pykafka.rdkafka extension.
2017-08-03 22:13:00,038 RequestHandler.stop: about to flush requests queue
2017-08-03 22:13:00,038 Discovered 6 brokers
2017-08-03 22:13:00,049 Discovered 317 topics
2017-08-03 22:13:00,055 RequestHandler.stop: about to flush requests queue
2017-08-03 22:13:00,055 Adding 1 partitions
2017-08-03 22:13:00,057 Connecting to 127.0.0.1:2181
2017-08-03 22:13:00,057 Connection dropped: socket connection error: Connection refused
2017-08-03 22:13:00,648 Connecting to 127.0.0.1:2181
2017-08-03 22:13:00,649 Connection dropped: socket connection error: Connection refused
2017-08-03 22:13:01,001 RequestHandler worker: exiting cleanly
2017-08-03 22:13:01,029 Connecting to 127.0.0.1:2181
2017-08-03 22:13:01,030 Connection dropped: socket connection error: Connection refused
2017-08-03 22:13:01,055 RequestHandler worker: exiting cleanly
2017-08-03 22:13:01,481 Connecting to 127.0.0.1:2181
2017-08-03 22:13:01,481 Connection dropped: socket connection error: Connection refused
^CTraceback (most recent call last):
  File "nav-test.py", line 59, in <module>
    for meta in events:
  File "/srv/deployment/eventlogging/eventlogging/eventlogging/utils.py", line 194, in __iter__
    for event in get_reader(self.url):
  File "/srv/deployment/eventlogging/eventlogging/eventlogging/factory.py", line 122, in get_reader
    iterator = handle(_readers, uri)
  File "/srv/deployment/eventlogging/eventlogging/eventlogging/factory.py", line 89, in handle
    return apply_safe(handler, kwargs)
  File "/srv/deployment/eventlogging/eventlogging/eventlogging/factory.py", line 73, in apply_safe
    result = f(*args, **kwargs)
  File "/srv/deployment/eventlogging/eventlogging/eventlogging/handlers.py", line 497, in kafka_reader
    **kafka_consumer_args)
  File "/usr/lib/python2.7/dist-packages/pykafka/topic.py", line 196, in get_balanced_consumer
    return BalancedConsumer(self, self._cluster, consumer_group, **kwargs)
  File "/usr/lib/python2.7/dist-packages/pykafka/balancedconsumer.py", line 237, in __init__
    self.start()

Looking at the actual deployment, looks like it's using a version from October 2016. I guess it needs to be updated?

Yaaaa: T131977#3084290, T118772#3084302

Makes sense. analytics/statsv.py does actually do both right now (sets group_id, and enable_auto_commit=False). I guess that's a (harmless) remnant from when it was enabled?

This is mostly about the behavior of monitoring. If we did auto commit, and statsv went offline for say, an hour, when it started back up the it would read the past hour from kafka and immediately emit those events to statsd, causing a big erroneous spike in metrics. If we could emit those metrics to graphite with a timestamp set, it'd might be ok to do this, but statsd assumes everything it receives is for now.

Makes sense. analytics/statsv.py does actually do both right now (sets group_id, and enable_auto_commit=False). I guess that's a (harmless) remnant from when it was enabled?

If we did auto commit, and statsv [..] statsd assumes everything it receives is for now.

Yeah, I understand enable_auto_commit=False :)

I was asking about the consumer group, as it seems analytics/statsv is configured contrary to your recommendation of not needing a consumer group if auto commit is disabled. Anyway, as mentioned, its not a big deal, and just a harmless left-over, right?

  1. We should presumably disable enable_auto_commit
  2. Should I, and if so how, set a consumer group?

If you disable auto commit, you don't need a consumer group. But yes, you are correct, identity will set a consumer group. You don't need to worry about client_id.

Makes sense. analytics/statsv.py does actually do both right now (sets group_id, and enable_auto_commit=False). I guess that's a (harmless) remnant from when it was enabled?

analytics/statsv is configured contrary to your recommendation of not needing a consumer group if auto commit is disabled.

Ah true! I take it back a little bit. Setting a consumer group without offset commits will also support two more things aside from offset resumption: IDing the client in the broker metrics, and consumer balancing. If you were consuming from a multi-partition topic, kafka client processes would auto load balance themselves if they are in the same consumer group.

But ya, for your purposes, it is not functionally necessary, but you might want to set it anyway, just to be nice. :)

Change 372483 had a related patch set uploaded (by Krinkle; owner: Krinkle):
[operations/puppet@production] webperf: Convert navtiming.py to use KafkaConsumer

https://gerrit.wikimedia.org/r/372483

Change 372483 merged by Ottomata:
[operations/puppet@production] webperf: Convert navtiming.py to use KafkaConsumer

https://gerrit.wikimedia.org/r/372483

Done! Looking ok!

You should really use base::service_unit in puppet to manage your systemd service templates, rather than doing so yourself. a systemctl daemon-reload needs to be run any time a systemd service unit file changes; if it isn't, systemd will still be using the old one. When puppet ran, it updated your files on hafnium, but systemd couldn't start navtiming.py because it was still using the old cached systemd service file without the --brokers flag. I had to manually run systemctl daemon-reload before it would work.

FYI, I went modified https://gerrit.wikimedia.org/r/#/c/372483/ a bit to make it lookup statsd and kafka brokers from hiera in the role, and pass them to the module classes via parameters. I did this for statsv as well. Whenever you get around to doing this to webperf::ve, I think you can do the same.

:)

@Ottomata Thanks a lot for doing that. Looking at the Navigation Timing metrics in Graphite through Grafana, everything looks good. Both the metric values as well as the hit rates don't seem to be affected.

The change was deployed around 13:30 UTC today (2017-08-29).

Navigation Timing:
https://grafana.wikimedia.org/dashboard/db/navigation-timing?refresh=5m&orgId=1&from=1504000621553&to=1504029651559&var-metric=domInteractive

  • Median, percentiles and hit rate: Unaffected.

Server Board (hafnium):
https://grafana.wikimedia.org/dashboard/file/server-board.json?refresh=1m&orgId=1&from=1504000621553&to=1504029651559&var-server=hafnium&var-network=eth0

  • Notable drop in eth0 traffic reception (download) from ~2Mbps to ~1Mbps. Presumably due to switch from ZMQ to Kafka also changing the subscriber from all schemas to just the 2 relevant schemas.
  • Otherwise unaffected.

For clarification, can this task be removed as a blocker for sunsetting salt?

I saw in T118772#3529414 @Krinkle made the comment:

it probably doesn't make sense to set up as a "git-deployed service" from the deployment hosts with Scap3 and all.

For clarification, can this task be removed as a blocker for sunsetting salt?

Indeed. Instead of converting our use of Trebuchet-deployed python-eventlogging to Scap3, we're instead removing the use of python-eventlogging in favour of python-kafka directly.

The webperf/navtiming.py service has been converted.

The webperf/ve.py service still uses python-eventlogging, but presumably discontinuing Trebuchet won't affect the existing clone on hafnium while we finish the conversion.

Change 375105 had a related patch set uploaded (by Krinkle; owner: Krinkle):
[operations/puppet@production] webperf: Refactor ve.py and add unit tests

https://gerrit.wikimedia.org/r/375105

Change 375106 had a related patch set uploaded (by Krinkle; owner: Krinkle):
[operations/puppet@production] webperf: Convert ve.py from ZMQ to KafkaConsumer

https://gerrit.wikimedia.org/r/375106

Change 375105 merged by Ottomata:
[operations/puppet@production] webperf: Refactor ve.py and add unit tests

https://gerrit.wikimedia.org/r/375105

Change 375106 merged by Ottomata:
[operations/puppet@production] webperf: Convert ve.py from ZMQ to KafkaConsumer

https://gerrit.wikimedia.org/r/375106

Deployed. I think it looks good! Please double check.

Last one remaining is coal. That one currently runs on a different host (graphite1001) and writes directly to Graphite disk as a custom backend.

For T158837, we're also looking into removing coal in favour of a simple EventLogging subscriber (e.g. on hafnium) that writes to Prometheus instead. If that turns out to be easy and doable in the short term, I'll do that instead and close this after coal is decommissioned. If it turns out to be more involved, I'll convert it to Kafka as well.

Krinkle changed the task status from Stalled to Open.Nov 20 2017, 9:29 PM

Un-blocking this from T175087 as that will take longer to complete. In the interim, I'll prioritise getting coal migrated to consume EventLogging from Kafka instead of ZMQ so that Analytics can shutdown ZMQ support.

Change 403560 had a related patch set uploaded (by Krinkle; owner: Krinkle):
[operations/puppet@production] [WIP] coal: Consume EventLogging from Kafka instead of ZMQ

https://gerrit.wikimedia.org/r/403560

Change 403560 abandoned by Krinkle:
[WIP] coal: Consume EventLogging from Kafka instead of ZMQ

Reason:
Closing, Ian to fork/re-upload.

https://gerrit.wikimedia.org/r/403560

Change 415218 had a related patch set uploaded (by Imarlier; owner: Imarlier):
[operations/puppet@production] [WIP] coal: Process from Kafka instead of from ZMQ

https://gerrit.wikimedia.org/r/415218

Change 415218 merged by Elukey:
[operations/puppet@production] coal: Process from Kafka instead of from ZMQ

https://gerrit.wikimedia.org/r/415218

Change 421573 had a related patch set uploaded (by Imarlier; owner: Imarlier):
[operations/puppet@production] coal: add logging, and handle ValueError case

https://gerrit.wikimedia.org/r/421573

Change 421573 merged by Dzahn:
[operations/puppet@production] coal: add logging, and handle ValueError case

https://gerrit.wikimedia.org/r/421573

Change 421933 had a related patch set uploaded (by Imarlier; owner: Imarlier):
[operations/puppet@production] [WIP] coal: be smarter about consuming from Kafka

https://gerrit.wikimedia.org/r/421933

Change 421933 merged by Elukey:
[operations/puppet@production] coal: be smarter about consuming from Kafka

https://gerrit.wikimedia.org/r/421933

Change 422926 had a related patch set uploaded (by Imarlier; owner: Imarlier):
[operations/puppet@production] coal: Need to use a unique consumer group in each data center

https://gerrit.wikimedia.org/r/422926

Change 422926 merged by Elukey:
[operations/puppet@production] coal: Need to use a unique consumer group in each data center

https://gerrit.wikimedia.org/r/422926

Coal has now been running for 4 days, and appears to be performing as expected.