Page MenuHomePhabricator

Deploy mjolnir msearch daemon to the elasticsearch clusters
Closed, ResolvedPublic


To close the firewall hole between analytics and the elasticsearch clusters mjolnir needs another way to query elasticsearch. We implemented one method and used it with relforge previously. The method is essentially to genreate elasticsearch queries in analytics, push them into kafka, and then read their response back on a second topic. As one can imagine there is some extra trickery in knowing when we are done. Kafka isn't really made for this purpose, it's mostly a hack.

This works fine for it's purpose, collecting feature logs for smaller data sets being experimented on. This was never scaled up enough to work for the use case of collecting for a full batch for training production models. Additionally the volume of data generated by this process is quite arbitrary. The typical monthly job to build production search models will be a fairly consistent ~100M (query,page) pairs, but any experimentation from an engineer will generate unknown amounts of data completely depending on whatever is being experimented on.

Some problems with the current deployment:

  • I don't have any hard numbers, but I imagine the data volume is significant while having little long term value. The topic used for query/response should probably have a limited history
  • The daemon should fan out requests to multiple hosts. We wont likely have too many partitions, which will focus the load of query parsing/coordination on a few hosts. It would be better if we could spread that around the cluster. When running against relforge with a single partition topic the load difference between the two servers in the cluster is significant. We can instead use 20 or 30 partitions in kafka which will spread the load similarly with less complexity.
  • We currently check the cluster load at the beginning of collection against production clusters to verify they are indeed idle and we aren't about to add a ton of load to the cluster serving production traffic. Without a direct connection to the cluster that's not too easy.

Potential solutions:

  • Query normalization simply needs to be ported to use the kafka transport. It has all the typical problems though.
  • If we want to reduce the data volume we would need to move more smarts into the daemon. Right now it has to be verbose as we are filling topics with elasticsearch requests and responses. If the daemon was smarter it could receive a much simpler requests and send a simpler response.
  • Fanning out shouldn't be too hard. If we switch over to the elasticsearch-py client, already used by the bulk updates daemon, it has some built in smarts for this.
  • All consumers in both datacenters should subscribe to the same topic on the same cluster with the same group id. The consumer will monitor qps of the full_text stat group against a canary index (enwiki_content). The consumer will only subscribe to kafka when qps is below some threshold, and must stop consuming within a minute of traffic rising above the threshold. In this way only consumers in the idle DC will consume.

There is probably more I havn't thought of.

Event Timeline

Topics created, all in kafka-jumbo as all consumers need to be in the same consumer group, and the msearch-prod topics have the potential for high data volume (to be measured before automating the job that generates messages).

mjolnir.msearch-prod-request and mjolnir.msearch-prod-response with 35 partitions each and 24 hours retention.
mjolnir.msearch-relforge-request and mjolnir.msearch-relforge-response with 6 partitions each and 24 hours retention.

And to update with solutions taken to problems in the task description:

  • Query normalization was ported to use the kafka based transport
  • Data volume is still to be evaluated. Topics were created with only 24 hours of history.
  • Instead of fanning out the requests explicitly we gave the topic 35 partitions which will allow consumers on many hosts.
  • Checking cluster load was resolved by having consumer daemons in both datacenters and setting them up to only subscribe to the kafka topic when there is little to no production search load on the cluster (by monitoring qps of enwiki_content searches in the full_text stats group).

Change 454722 had a related patch set uploaded (by EBernhardson; owner: EBernhardson):
[operations/puppet@production] Deploy msearch daemon to cirrus servers

I've run some data sizing tests for the two processes that use msearch: query normalization and feature collection. The test was limited to enwiki and the dataset was sampled down to 98k normalized queries, which resulted in 1.5M datapoints. The estimated values are calculated by using the ratio of values in this dataset vs a typical full dataset. A typical full run is about 1.85M queries and 75M datapoints. Data sizes determined by looking at kafka_log_Size{cluster="kafka_jumbo",kafka_cluster="jumbo-eqiad", topic="$topic"} in prometheus before and after each step.

req sizeresp sizeest req sizeest resp size
norm query22MB30MB415MB570MB
feature collection12MB1072MB600MB53600MB

est total: 60GB, with replicas 180GB per run. I might be overestimating the feature collection response size, but even with that the data size seems low enough to not be of concern. We could shrink the data size a bit. Removing extra information from the response but keeping it in json looks to save about 15% of the space. If we fully parsed the features into a packed binary format my tests show we could save perhaps 60% of the volume. At the moment i don't think the size is large enough to warrant the extra complexity, but it's an option if needed.

I also noticed something odd, when i did a test run consuming the generated response and calculating its decompressed and compressed size (via len(zlib.compress(record.value))) i came up with 8160MB raw and 389MB compressed. But kafka's on-disk reports 1072MB. Additionally the network traffic reported in grafana when i consumed that adds up to ~7800MB (i didn't figure out how to get that from the consumer side). Overall i'm not sure why all these numbers are different. Fixed a couple bugs and re-ran. 22381 MB raw, 1026M compressed, 392MB packed and compressed. Compressed is close enough to kafka on-disk that it seems correct.

binary compression was estimated via:

def extract(response):
    for hit in response['hits']['hits']:
        page_id = int(hit['_id'])
        features = [(v['name'], v['value']) for v in hit['fields']['_ltrlog'][0]['sltr_log']]
        yield page_id, features

zz = [pair for record in record_batch for response in json.loads(json.loads(record.value.decode('utf8'))['text'])['responses'] for pair in extract(response)

gzip_bin_size += len(zlib.compress(b'\0'.join([struct.pack('ii' + ('f' * len(features)), page_id, len(features), *[float(f[1]) for f in features]) for page_id, features in zz])))

Change 454722 merged by Gehel:
[operations/puppet@production] Deploy msearch daemon to cirrus servers