Page MenuHomePhabricator

Use kafka for communication from analytics cluster to elasticsearch
Closed, ResolvedPublic

Description

We have a data flow from analytics cluster to elasticsearch to update page ranks. This requires an exception in our firewall rules. The initial discussion (T120281) when opening the firewall was to migrate this flow to kafka at some point and remove this exception. We did not create a task to track this at that time, and unsurprisingly, we are still using a direct connection instead of kafka.

If kafka is still the correct long term solution, we should plan to implement it.

Event Timeline

I am curious, what would the data look like? Would it be possible to use something like https://github.com/confluentinc/kafka-connect-elasticsearch ? I don't know exactly how that works, perhaps it only allows appending new documents, not updating indexes.

I don't know if we could use a generic endpoint, although we could look into it.

These updates are somewhat unique in that we send a script along with the updates and the script decides if we can noop the update to drastically reduce the update load created by this update. Unfortunately I don't have the exact stats, but i would estimate 2/3 of all updates coming from the analytics cluster get thrown out by this script since the value we have stored already is "good enough".

In my mind a relatively simple java or rdkafka based python client could be written to consume some arbitrary record format containing N popularity updates, and spli it back out on the production side with our customized noop scripts. Unfortunately managing this kind of one-off small script is currently not worth the effort, imo, in our system as the overhead to spin up a service is still pretty huge.

wait for kubernetes and stream processing? :D

The new parent task is tangentially related. To cleanly update our puppet we need to switch the TLS certs from the generic puppet cert to domain specific certs. The analytics cluster though can't talk to those domains, so would need some crazy hacks to send http requests to hosts by name, but provide a different name to SSL.

While the overhead of an extra daemon isn't ideal, we do already have a very similar daemon written for mjolnir. We can make some minor adjustments to make that daemon do what we need, along with some minor puppet changes to deploy the daemon to all the elasticsearch servers.

The daemon will need some changes to handle multiple clusters per host, but those can be delayed a bit.

Change 445065 had a related patch set uploaded (by EBernhardson; owner: EBernhardson):
[wikimedia/discovery/analytics@master] Convert transferToES to use kafka

https://gerrit.wikimedia.org/r/445065

Change 445254 had a related patch set uploaded (by EBernhardson; owner: EBernhardson):
[operations/puppet@production] [WIP] Add mjolnir kafka daemon to primary elasticsearch clusters

https://gerrit.wikimedia.org/r/445254

Ran a quick test for data volume we will be shipping over kafka, looks like we will be generating around 2-3GB of compressed (~15GB uncompressed) data into kafka from the once a week batches.

Change 445065 merged by jenkins-bot:
[wikimedia/discovery/analytics@master] Convert transferToES to use kafka

https://gerrit.wikimedia.org/r/445065

For the bulk daemon deployment, and switching the old transfer off we need to complete the following:

  1. Create the kafka topic for them to talk (T200215). This is blocked on someone from ops running it. Actually i can create the topic in eqiad, but i don't have access to any machines that can talk to the zookeeper in codfw.
  2. Once the kafka topic is created we need to merge and deploy https://gerrit.wikimedia.org/r/#/c/operations/puppet/+/445254/ to deploy the daemon
  3. spot check a few elasticsearch instances that it all setup properly. Logging should be showing up in logstash under the mjolnir type. It's possible a scap deploy will be needed, not sure. Check that relforge shut down the previously named systemd service and is only running the new msearch daemon (note that relforge runs the msearch daemon, not the bulk daemon being deployed here).
  4. Deploy the wikimedia/discovery/analytics repository to stat1005. Appropriate patches have already been merged
  5. Shut down the previous popularity update oozie coordinator. Start up the new one (per wikitech:Discovery/Analytics)
  6. Perhaps do a test produce from spark shell of a few thousand scores to ensure everything flows through

Change 450291 had a related patch set uploaded (by EBernhardson; owner: EBernhardson):
[search/MjoLniR@master] Support consuming multiple kafka topics

https://gerrit.wikimedia.org/r/450291

Change 450291 merged by jenkins-bot:
[search/MjoLniR@master] Support consuming multiple kafka topics

https://gerrit.wikimedia.org/r/450291

Change 445254 merged by Gehel:
[operations/puppet@production] Add mjolnir kafka daemon to primary elasticsearch clusters

https://gerrit.wikimedia.org/r/445254

This is all deployed and ran a limited push from analytics to prod last weekend. This next weekend it will do the full update.