Page MenuHomePhabricator
Paste P13520

elasticsearch-pagecache-reporter.py
ActivePublic

Authored by EBernhardson on Dec 2 2020, 9:13 PM.
Tags
None
Referenced Files
F33932651: elasticsearch-pagecache-reporter.py
Dec 2 2020, 9:51 PM
F33932574: elasticsearch-pagecache-reporter.py
Dec 2 2020, 9:13 PM
Subscribers
None
from argparse import ArgumentParser
from contextlib import contextmanager
from glob import glob
import os
import sys
import requests
from ctypes import CDLL, POINTER, cast, c_int, c_size_t, c_ssize_t, c_ubyte, c_void_p
from mmap import MAP_SHARED, PROT_READ, PAGESIZE
libc = CDLL('libc.so.6')
assert libc
MAP_FAILED = c_void_p(-1)
# mmap/munmap/mincore definitions mostly copied from
# https://github.com/spotify/mlockexec/blob/master/mlockexec/libc.py
# set correct parameter types
c_off_t = c_ssize_t
# void *mmap(void *addr, size_t length, int prot,
# int flags, int fd, off_t offset);
mmap = libc.mmap
mmap.restype = c_void_p
mmap.argtypes = [c_void_p, c_size_t, c_int, c_int, c_off_t]
# int munmap(void *addr, size_t length);
munmap = libc.munmap
munmap.restype = c_void_p
munmap.argtypes = [c_void_p, c_size_t]
# int mincore(void *addr, size_t length, unsigned char *vec);
libc.mincore.restype = c_int
libc.mincore.argtypes = [c_void_p, c_size_t, POINTER(c_ubyte)]
def mincore(addr, length):
pages = (length + PAGESIZE - 1) // PAGESIZE
vec = (c_ubyte * pages)()
res = libc.mincore(addr, length, cast(vec, POINTER(c_ubyte)))
assert res == 0
return vec
@contextmanager
def open_fd(path: str, flags: int):
fd = os.open(path, flags)
try:
yield fd
finally:
os.close(fd)
@contextmanager
def mmap_cm(addr, length, prot, flags, fd, offset):
addr_mmap = mmap(addr, length, prot, flags, fd, offset)
assert addr_mmap != MAP_FAILED
try:
yield addr_mmap
finally:
munmap(addr_mmap, length)
def file_mincore(path: str):
try:
with open_fd(path, os.O_RDONLY) as fd:
length = os.stat(fd).st_size
# cannot mmap a zero size file
if length == 0:
return []
with mmap_cm(0, length, PROT_READ, MAP_SHARED, fd, 0) as file_mmap:
vec = mincore(file_mmap, length)
return list(vec)
except FileNotFoundError:
# Most likely the file was deleted, at least with current callers
return []
def file_pages_cached(path: str):
return sum(x > 0 for x in file_mincore(path))
def glob_pages_cached(glob_pattern):
return sum(file_pages_cached(fname) for fname in glob(glob_pattern))
def request(url, params=None):
try:
response = requests.get(url, params=params, headers={
'Accept': 'application/json',
})
except requests.exceptions.RequestException as e:
log.error('Encountered %s communicating with elasticsearch at %s', e, url)
raise
if response.status_code < 200 or response.status_code >= 300:
log.error('Status code %d returned from elasticsearch at %s', response.status_code, url)
raise Exception('Non-2xx status code returned from elasticsearch')
return response.json()
def shard_paths(es_url, indices):
cluster_name = request(es_url)['cluster_name']
index_base_path = '/srv/elasticsearch/{cluster_name}/nodes/0/indices'.format(**locals())
assert os.path.exists(index_base_path)
cat_url = es_url + '/_cat/indices/' + indices
for index_meta in request(cat_url, {'format': 'json', 'local': 'true'}):
index_path = os.path.join(index_base_path, index_meta['uuid'])
index_name = index_meta['index']
if not os.path.exists(index_path):
continue
for shard_path in glob(os.path.join(index_path, '*')):
basename = os.path.basename(shard_path)
if basename == '_state':
continue
shard_id = int(basename)
yield index_meta['index'], shard_id, shard_path
def arg_parser():
parser = ArgumentParser()
parser.add_argument('indices', nargs='+')
parser.add_argument('--port', default=9200, type=int)
return parser
def main(indices, port):
es_url = 'http://localhost:{}'.format(port)
for index_name, shard_id, shard_path in shard_paths(es_url, ','.join(indices)):
shard_pages = glob_pages_cached(os.path.join(shard_path, 'index/*'))
print('{} {} {:.2f} MB'.format(index_name, shard_id, shard_pages * PAGESIZE / 1024 / 1024))
if __name__ == '__main__':
sys.exit(main(**(dict(vars(arg_parser().parse_args())))))