Page MenuHomePhabricator
Paste P7054

elastic_cluster_latency.py
ActivePublic

Authored by EBernhardson on Apr 30 2018, 4:55 PM.
from collections import OrderedDict
import copy
import json
import logging
from pprint import pprint
import random
import re
import requests
import sys
import time
ARCHIVE_INDEX_RE = re.compile('^.*_archive_\d+$')
LOG = logging.getLogger('dupe_cluster')
NUKE_CLUSTER = False
# Load cluster state from saved json and create
# indices found inside
CREATE_FROM_STATE = False
# Create archive indices matching all general indices.
# When false 10 duplicates of existing indices are created
# to use with mutation tests
CREATE_ARCHIVE = False
def clean_settings(settings, exclude=set(['provided_name', 'creation_date', 'uuid', 'version'])):
settings = dict(settings)
settings['index'] = {k: v for k, v in settings['index'].items() if k not in exclude}
try:
del settings['index']['routing']['allocation']['total_shards_per_node']
except KeyError:
pass
return settings
def nuke_cluster():
if 'localhost:9200' not in ES_URL:
raise Exception('must be localhost:9200')
requests.delete(ES_URL + '_all')
def wait_for_green():
url = ES_URL + '_cluster/health'
while True:
res = requests.get(url).json()
if res['status'] == 'green':
return
time.sleep(0.1)
def wait_for_recovery(index):
url = ES_URL + index + '/_recovery'
done = False
while not done:
res = requests.get(url).json()
done = True
for shard in res[index]['shards']:
if shard['stage'] != 'DONE':
done = False
break
if not done:
time.sleep(0.1)
def dupe_cluster(remote_state, report, max_indices=None):
LOG.info('Loading indices from state into cluster')
total = len(remote_state['metadata']['indices'])
if max_indices is not None:
total = min(total, max_indices)
processed = 0
created = 0
for index_name, config in remote_state['metadata']['indices'].items():
processed += 1
if config['state'] != 'open':
continue
index_url = ES_URL + index_name
if requests.head(index_url).status_code == 200:
LOG.info('[%5d/%5d] Already exists' % (processed, total))
continue
start = time.time()
res = requests.put(index_url, json={
"settings": clean_settings(config['settings']),
"mappings": config['mappings'], # {k: v for k, v in config['mappings'].items() if k != "archive"},
}).json()
took = time.time() - start
report('dupe', took)
LOG.info('[%5d/%5d] [%dms] Processed %s' % (processed, total, 1000 * took, index_name))
if 'acknowledged' in res and res['acknowledged']:
created += 1
if config['aliases']:
res = requests.post(ES_URL + '_aliases', json={
"actions": [
{"add": {"index": index_name, "alias": alias}}
for alias in config['aliases']
]
}).json()
if 'acknowledged' not in res or not res['acknowledged']:
pprint(res)
if max_indices is not None and created >= max_indices:
break
else:
pprint(res)
def create_archive(state, report):
LOG.info('Creating archive indices')
wikis = {}
for index_name, config in state['metadata']['indices'].items():
if 'archive' not in config['mappings']:
if 'general' in index_name:
LOG.info(config['mappings'].keys())
continue
wiki = '_'.join(index_name.split('_')[:-2])
wikis[wiki] = config
total = len(wikis)
processed = 0
for wiki, config in wikis.items():
processed += 1
index_name = '%s_archive_%d' % (wiki, int(time.time()))
index_url = ES_URL + index_name
if requests.head(index_url).status_code == 200:
continue
settings = clean_settings(config['settings'])
start = time.time()
res = requests.put(index_url, json={
"settings": clean_settings(config['settings']),
"mappings": {
"archive": config['mappings']['archive']
}
})
took = time.time() - start
LOG.info('[%5d/%5d] [%dms] %s' % (processed, total, took * 1000, wiki))
report('create_archive', took)
def wrap_timer(f):
def inner(*args, **kwargs):
start = time.time()
res = f(*args, **kwargs)
return res, time.time() - start
inner.__name__ = f.__name__
return inner
def get_mutable_indices(cluster_state):
available = []
for index_name in cluster_state['routing_table']['indices'].keys():
if ARCHIVE_INDEX_RE.match(index_name):
available.append(index_name)
return available
def move_shard(cluster_state):
# Fetch fresh cluster state
cluster_state, _ = fetch_cluster_state()
# Ensure we only mutate "our" archive indices instead of accidentally
# moving a 40G production shard.
mutable_indices = get_mutable_indices(cluster_state)
if not mutable_indices:
return cluster_state, float('nan')
for _ in range(10):
index = random.choice(mutable_indices)
routing = cluster_state['routing_table']['indices'][index]
index_config = cluster_state['metadata']['indices'][index]['settings']['index']
if int(index_config['number_of_replicas']) > 2:
continue
shard_id, shard = random.choice(routing['shards'].items())
nodes = set(cluster_state['nodes'].keys())
for instance in shard:
nodes.remove(instance['node'])
if not nodes:
LOG.warning('No free nodes!')
continue
from_node = random.choice(shard)['node']
to_node = nodes.pop()
start = time.time()
res = requests.post(ES_URL + '_cluster/reroute', json={
"commands": [
{"move": {
"index": index, "shard": shard_id,
"from_node": from_node, "to_node": to_node
}}
]
})
took = time.time() - start
parsed = res.json()
if 'acknowledged' in parsed and parsed['acknowledged']:
LOG.info('Moved shard %s of %s from %s to %s', shard_id, index, from_node, to_node)
new_cluster_state = dict(cluster_state, **parsed['state'])
wait_for_recovery(index)
else:
# Verbose and not too helpful
# LOG.warning(parsed)
# TODO: Some failure
continue
return new_cluster_state, took
raise Exception()
@wrap_timer
def fetch_cluster_state(cluster_state=None):
res = requests.get(ES_URL + '_cluster/state')
return res.json()
def change_index_mappings(cluster_state):
mutable_indices = get_mutable_indices(cluster_state)
if not mutable_indices:
return cluster_state, float('nan')
for _ in range(10):
index_name = random.choice(mutable_indices)
index_config = cluster_state['metadata']['indices'][index_name]
if 'page' not in index_config['mappings']:
continue
fields = set()
for type_config in index_config['mappings'].values():
fields = fields.union(type_config['properties'].keys())
# Alternatively, create field with random name?
if 'zomg' in fields:
continue
LOG.info('Adding zomg field to %s', index_name)
start = time.time()
res = requests.put(ES_URL + index_name + '/_mapping/page', json={
"properties": {
"zomg": {
"type": "text"
}
}
})
took = time.time() - start
parsed = res.json()
if 'acknowledged' in parsed and parsed['acknowledged']:
cluster_state['metadata']['indices'][index_name]['mappings']['page']['properties']['zomg'] = {"type": "text"}
else:
# TODO: Some failure
raise Exception()
return cluster_state, took
raise Exception('Could not find index to add name field to')
@wrap_timer
def create_index(cluster_state):
# Choose a random index to duplicate
config = random.choice(cluster_state['metadata']['indices'].values())
index_name = 'erikfakewiki_archive_' + str(random.random())[2:]
index_url = ES_URL + index_name
res = requests.put(index_url, json={
"settings": clean_settings(config['settings']),
"mappings": {k: v for k, v in config['mappings'].items() if k != "archive"},
})
wait_for_green()
# Going to pretend we don't need to update cluster state, and let the regular
# fetch per-iteration get the new info
return cluster_state
def delete_index(cluster_state):
mutable_indices = get_mutable_indices(cluster_state)
if mutable_indices:
index_name = random.choice(mutable_indices)
LOG.info('Deleting index %s', index_name)
start = time.time()
requests.delete(ES_URL + index_name)
took = time.time() - start
# TODO: Remove from other parts of state we look at
del cluster_state['metadata']['indices'][index_name]
del cluster_state['routing_table']['indices'][index_name]
else:
took = float('nan')
return cluster_state, took
def remove_replica(cluster_state):
mutable_indices = get_mutable_indices(cluster_state)
if not mutable_indices:
return cluster_state, float('nan')
max_attempts = max(10, len(cluster_state['metadata']['indices']) // 10)
attempts = 0
for _ in range(max_attempts):
index_name = random.choice(mutable_indices)
config = cluster_state['metadata']['indices'][index_name]
index_config = dict(config['settings']['index'])
if 'auto_expand_replicas' not in index_config:
continue
low, high = map(int, index_config['auto_expand_replicas'].split('-', 1))
if high < 2:
continue
auto_expand_replicas = '%d-%d' % (low, high - 1)
LOG.info('Removing replica from %s', index_name)
start = time.time()
requests.put(ES_URL + index_name + '/_settings', json={
"index": {
"auto_expand_replicas": auto_expand_replicas,
}
})
took = time.time() - start
config['settings']['index']['auto_expand_replicas'] = auto_expand_replicas
return cluster_state, took
raise Exception('Could not find index to remove replica from')
def add_replica(cluster_state):
mutable_indices = get_mutable_indices(cluster_state)
if not mutable_indices:
return cluster_state, float('nan')
# Look for indices with < 2 replicas
for index_name in mutable_indices:
config = cluster_state['metadata']['indices'][index_name]
if 'auto_expand_replicas' not in config['settings']['index']:
continue
if int(config['settings']['index']['auto_expand_replicas'].split('-', 1)[1]) < 2:
break
else:
# Choose one randomly
for _ in range(10):
index_name = random.choice(mutable_indices)
config = cluster_state['metadata']['indices'][index_name]
if 'auto_expand_replicas' in config['settings']['index']:
break
else:
raise Exception('Could not find index to add replica to')
low, high = map(int, config['settings']['index']['auto_expand_replicas'].split('-', 1))
auto_expand_replicas = "%s-%s" % (low, high + 1)
start = time.time()
requests.put(ES_URL + index_name + '/_settings', json={
"index": {
"auto_expand_replicas": auto_expand_replicas,
}
})
# TODO: Does this work? Might be a race condition? It's also checking
# something very generic.
wait_for_green()
took = time.time() - start
LOG.info('Added replica to %s', index_name)
config['settings']['index']['auto_expand_replicas'] = auto_expand_replicas
return cluster_state, took
def measure_latency(report, n_iterations=100):
LOG.info('Start latency measurements')
actions = [
move_shard, # change_index_mappings, change_index_settings,
create_index, delete_index, remove_replica, add_replica,
]
cluster_state = None
for i in range(n_iterations):
LOG.info('Starting iteration %d / %d', i, n_iterations)
start = time.time()
cluster_state, _ = fetch_cluster_state()
report('fetch_cluster_state', time.time() - start)
random.shuffle(actions)
for action in actions:
LOG.info('Running %s', action.__name__)
cluster_state, took = action(cluster_state)
report(action.__name__, took)
def make_reporter(path):
with open(path, 'wb') as f:
while True:
metric, took = yield
f.write('%s:%f\n' % (metric, took))
f.flush()
if __name__ == "__main__":
logging.basicConfig(level='INFO')
logging.getLogger("requests").setLevel(logging.WARNING)
random.seed(0)
ES_URL = sys.argv[1]
if ES_URL[-1] != '/':
ES_URL += '/'
report_path = sys.argv[2]
max_indices = int(sys.argv[3])
reporter = make_reporter(report_path)
reporter.send(None)
def report(metric, took):
reporter.send((metric, took))
if NUKE_CLUSTER:
nuke_cluster()
if CREATE_FROM_STATE:
cluster_state_path = 'cluster_state.json'
LOG.info('Read cluster state: %s' % (cluster_state_path))
with open(cluster_state_path, 'rb') as f:
source_cluster_state = json.load(f)
dupe_cluster(source_cluster_state, report, max_indices)
LOG.info('Waiting for cluster in green state')
wait_for_green()
LOG.info('Green!')
cluster_state, _ = fetch_cluster_state()
if CREATE_ARCHIVE:
create_archive(cluster_state, report)
else:
LOG.info('Ensuring at least 10 mutable indices')
# We need some wikis for mutation tests to work with.
available = len(get_mutable_indices(cluster_state))
for _ in range(10 - available):
create_index(cluster_state)
LOG.info('Waiting for cluster in green state')
wait_for_green()
LOG.info('Green!')
measure_latency(report)