Page MenuHomePhabricator

mjolnir-kafka-msearch-daemon dropping produced messages after move to search-loader[12]001
Open, Needs TriagePublic8 Estimated Story Points

Description

As a deployed service I need to continue working after moving between hosts so i can continue providing live elasticsearch data to ML training processes.

mjolnir daemons (two separate but related daemons) were moved from elasticsearch hosts (35 per dc) to a single dedicated vm per dc. Since moving the mjolnir-kafka-*msearch*-daemon has been dropping messages and the downstream datasets the depend on the messages produced by this daemon have been incomplete.

Possible causes:

  • Most obvious guess is from messages logged when the daemon is busy: Failed to send a message to the broker: KafkaTimeoutError: Batch for TopicPartition(topic='mjolnir.msearch-prod-response', partition=23) containing 1 record(s) expired: 30 seconds have passed since last append
  • While experimenting with parallelism the internal thread pool was increased from 2 when it was deployed to all elasticsearch servers (25 kafka partitions, gave ~50 threads across the cluster making parallel requests) to 10 on a single instance, and then 25. 10 made some increases in throughput, 25 has same throughput as 10 but higher resource utilization. almost certainly this should be scaled back.

What does the daemon do:

  • These daemons run in the production network and perform various actions in response to events produced in the analytics network.
  • Two parts of the ML training pipeline need to perform requests against the production search indices, but they live in separate networks and cannot directly talk to the elasticsearch clusters. They typically run once a week, but can be manually triggered.
  • The msearch daemon monitors metrics in the elasticsearch cluster related to cirrussearch, and only subscribes to the appropriate kafka topic if the cluster it's pointed at is idle. Effectively that means only the msearch daemon in codfw does anything under normal operations.
  • Analytics side generates fully formed queries for the elasticsearch _msearch api endpoint and puts them into separate messages in a kafka topic
  • Daemons on the production side read the events, perform the relevant http request, and put the response in a second topic.
  • Mostly this means the daemon sits around and waits on IO. For each message recieved from kafka it makes an http request and waits 100-200ms for elasticsearch to respond. Per record processing within the daemon is minimal.
  • Analytics side reads back the results. There are some complications in the daemon that help the analytics side figure out where to stop reading, but they shouldn't be relevant to this task.
  • There are two separate parts of the training process that cause the msearch daemon to run. The first one runs typical search queries and has small responses, the second stage collects feature vectors from the search clusters and has responses from 100-800kB per record. Connections to elasticsearch and messages sent to kafka should all be compressed as this content is highly duplicative.

Related:

  • grafana dashboard for msearch
  • Search host:search-loader2001 in logstash for related logs
  • If my memory is correct the reason we send 1 message per produce request in the msearch is because kafka had issues in the past due to requests being too large. May or may not still be relevant.
  • Reproducing timeout issues on a local instance is going to be difficult at best. Easiest way to reproduce on the production instance would be to directly produce messages to the topic, likely a few MB of real previous messages can be put in a file and re-produced with the kafkacat utility. I've recorded 10k requests from this weeks run at stat1006.eqiad.wmnet:/home/ebernhardson/mjolnir.msearch-prod-request.20200812.ndjson
  • Testing on the msearch daemon in production isn't particularly risky, it's hardcoded to only hit the /_msearch endpoint which is read only, and the daemon only operates against the idle cluster. If both clusters are somehow busy, it simply doesn't run.

Event Timeline

EBernhardson updated the task description. (Show Details)

Adding also another note. The configuration of the msearch response topic is the following:

Topic:mjolnir.msearch-prod-response     PartitionCount:35       ReplicationFactor:3     Configs:retention.ms=86400000

35 partitions probably made sense when the whole ES cluster was consuming/producing, but not now that we have search loader nodes. Kafka as far as I know, doesn't support decreasing partitions for a given topic, so one solution (if data is droppable) is to delete/re-create the topic with less partitions.

The error

Failed to send a message to the broker: KafkaTimeoutError: Batch for TopicPartition(topic='mjolnir.msearch-prod-response', partition=23) containing 1 record(s) expired: 30 seconds have passed since last append

should indicate, IIRC/IIUC, that mjolnir queues messages to produce to the topic in the local buffer for more than 30s, and this causes data to not be sent. Given the exception I suppose that mjolnir uses Kafka Python, that in turn doesn't use librdkafka underneath (that has some transparent retry mechanism for these use cases). Kafka Python has the possibility to retry (https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html), but it is disabled by default to prevent duplicates.

The suspicion that I have is that mjolnir on search-loader cannot keep up producing to 35 partitions, given that it should be probably use round robin to all of them.

Other bits i forgot:

repository: search/MjoLniR
deploy repository: search/MjoLniR-deploy

deploy process:

  1. Commit to primary repo
  2. Increase submodule in deploy repository to match primary repo
  3. scap deploy .. from deployment.eqiad.wmnet in /srv/deployment/search/mjolnir/deploy
  4. sudo service restart mjolnir-kafka-msearch-daemon from host (ex: search-loader2001.codfw.wmnet)
CBogen set the point value for this task to 8.Aug 17 2020, 5:12 PM

Quick test showed that reducing the number of partitions does not fix the issue. Our current suspicion is that current parallelism (done via Python GIL) is lacking. Idea to fix:

  • limit per service workers to 2
  • launch more msearch daemons per loader (similar to carbon cache configuration)

Change 621988 had a related patch set uploaded (by ZPapierski; owner: ZPapierski):
[operations/puppet@production] Multiple instances of msearch_daemon

https://gerrit.wikimedia.org/r/621988

Change 622355 had a related patch set uploaded (by ZPapierski; owner: ZPapierski):
[operations/puppet@production] Remove unnecessary daemon definitions

https://gerrit.wikimedia.org/r/622355

Change 621988 merged by Elukey:
[operations/puppet@production] Multiple instances of msearch_daemon

https://gerrit.wikimedia.org/r/621988

Mentioned in SAL (#wikimedia-operations) [2020-09-08T18:22:35Z] <elukey> rm /srv/prometheus/ops/targets/mjolnir_msearch_eqiad.yaml on prometheus100[3,4] as cleanup after https://gerrit.wikimedia.org/r/621988 - T260305

Change 625952 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] mjolnir: fix syslog identifier in the msearch systemd unit template

https://gerrit.wikimedia.org/r/625952

Change 625952 merged by Elukey:
[operations/puppet@production] mjolnir: fix syslog identifier in the msearch systemd unit template

https://gerrit.wikimedia.org/r/625952

Change 625954 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] mjolnir: fix syslog identifier of the msearch instances

https://gerrit.wikimedia.org/r/625954

Change 625954 merged by Elukey:
[operations/puppet@production] mjolnir: fix syslog identifier of the msearch instances

https://gerrit.wikimedia.org/r/625954

Change 626055 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] profile::mjolnir::kafka_msearch_daemon: clean up old code

https://gerrit.wikimedia.org/r/626055

Change 626055 merged by Elukey:
[operations/puppet@production] profile::mjolnir::kafka_msearch_daemon: clean up old code

https://gerrit.wikimedia.org/r/626055

Change 626056 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] role::elasticsearch::relforge: add missing hiera config for mjolnir

https://gerrit.wikimedia.org/r/626056

Change 626056 merged by Elukey:
[operations/puppet@production] role::elasticsearch::relforge: add missing hiera config for mjolnir

https://gerrit.wikimedia.org/r/626056

Change 626058 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] profile::mjolnir::kafka_msearch_daemon_instance: fix enable value

https://gerrit.wikimedia.org/r/626058

Change 626058 merged by Elukey:
[operations/puppet@production] profile::mjolnir::kafka_msearch_daemon_instance: fix enable value

https://gerrit.wikimedia.org/r/626058

Change 626060 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] role::elasticsearch::cirrus: add missing mjolnir hiera value

https://gerrit.wikimedia.org/r/626060

Change 626060 merged by Elukey:
[operations/puppet@production] role::elasticsearch::cirrus: add missing mjolnir hiera value

https://gerrit.wikimedia.org/r/626060

Change 626105 had a related patch set uploaded (by ZPapierski; owner: ZPapierski):
[operations/puppet@production] Bump msearch daemon parallelism

https://gerrit.wikimedia.org/r/626105

Change 626105 merged by Elukey:
[operations/puppet@production] mjolnir: Bump msearch daemon parallelism

https://gerrit.wikimedia.org/r/626105

I noticed that kafka-python==1.4.3 in the venv, can we try to upgrade it to 1.4.7? https://pypi.org/project/kafka-python/#history

Change 622355 abandoned by ZPapierski:
[operations/puppet@production] Remove unnecessary daemon definitions

Reason:
Already removed

https://gerrit.wikimedia.org/r/622355

I noticed that kafka-python==1.4.3 in the venv, can we try to upgrade it to 1.4.7? https://pypi.org/project/kafka-python/#history

Erik quickly checked and it seems that the last version, 2.0.1, could be a better candidate (the major bump is related to dropping old consumer code).

The next steps that I see are:

  1. Try kafka-python 2.0.1 to see if the consumer errors get fixed (if any, I still didn't get if the problem is there or not).
  2. Test performances after T262385 and decide if the VMs are now ok as replacement for the previous settings (to unblock Analytics that needs to deploy iptables/Ferm rules to Kafka Jumbo)

Try kafka-python 2.0.1 to see if the consumer errors get fixed (if any, I still didn't get if the problem is there or not).

Not seeing a repeat of the consumer errors from before. We should still upgrade, but it doesn't look to be needed before closing this task.

Test performances after T262385 and decide if the VMs are now ok as replacement for the previous settings (to unblock Analytics that needs to deploy iptables/Ferm rules to Kafka Jumbo)

Metrics for this weeks run look acceptable. It's currently about half the throughput of the prevision deployment, but that isn't enough of a change to cause issues in my estimation.

Thanks a lot for all the work on this, really appreciated. As far as I can see it seems that the new search-loader environment seems stable and working, even if not as performant as what it was the old setting on the ES cluster. If needed we can try to increase the number of vcores/ram on the loader hosts, and possibly (long term) think about bare metal hw for next fiscal.