As a deployed service I need to continue working after moving between hosts so i can continue providing live elasticsearch data to ML training processes.
mjolnir daemons (two separate but related daemons) were moved from elasticsearch hosts (35 per dc) to a single dedicated vm per dc. Since moving the mjolnir-kafka-*msearch*-daemon has been dropping messages and the downstream datasets the depend on the messages produced by this daemon have been incomplete.
Possible causes:
- Most obvious guess is from messages logged when the daemon is busy: Failed to send a message to the broker: KafkaTimeoutError: Batch for TopicPartition(topic='mjolnir.msearch-prod-response', partition=23) containing 1 record(s) expired: 30 seconds have passed since last append
- While experimenting with parallelism the internal thread pool was increased from 2 when it was deployed to all elasticsearch servers (25 kafka partitions, gave ~50 threads across the cluster making parallel requests) to 10 on a single instance, and then 25. 10 made some increases in throughput, 25 has same throughput as 10 but higher resource utilization. almost certainly this should be scaled back.
What does the daemon do:
- These daemons run in the production network and perform various actions in response to events produced in the analytics network.
- Two parts of the ML training pipeline need to perform requests against the production search indices, but they live in separate networks and cannot directly talk to the elasticsearch clusters. They typically run once a week, but can be manually triggered.
- The msearch daemon monitors metrics in the elasticsearch cluster related to cirrussearch, and only subscribes to the appropriate kafka topic if the cluster it's pointed at is idle. Effectively that means only the msearch daemon in codfw does anything under normal operations.
- Analytics side generates fully formed queries for the elasticsearch _msearch api endpoint and puts them into separate messages in a kafka topic
- Daemons on the production side read the events, perform the relevant http request, and put the response in a second topic.
- Mostly this means the daemon sits around and waits on IO. For each message recieved from kafka it makes an http request and waits 100-200ms for elasticsearch to respond. Per record processing within the daemon is minimal.
- Analytics side reads back the results. There are some complications in the daemon that help the analytics side figure out where to stop reading, but they shouldn't be relevant to this task.
- There are two separate parts of the training process that cause the msearch daemon to run. The first one runs typical search queries and has small responses, the second stage collects feature vectors from the search clusters and has responses from 100-800kB per record. Connections to elasticsearch and messages sent to kafka should all be compressed as this content is highly duplicative.
Related:
- grafana dashboard for msearch
- Search host:search-loader2001 in logstash for related logs
- If my memory is correct the reason we send 1 message per produce request in the msearch is because kafka had issues in the past due to requests being too large. May or may not still be relevant.
- Reproducing timeout issues on a local instance is going to be difficult at best. Easiest way to reproduce on the production instance would be to directly produce messages to the topic, likely a few MB of real previous messages can be put in a file and re-produced with the kafkacat utility. I've recorded 10k requests from this weeks run at stat1006.eqiad.wmnet:/home/ebernhardson/mjolnir.msearch-prod-request.20200812.ndjson
- Testing on the msearch daemon in production isn't particularly risky, it's hardcoded to only hit the /_msearch endpoint which is read only, and the daemon only operates against the idle cluster. If both clusters are somehow busy, it simply doesn't run.