Page MenuHomePhabricator

elasticsearch-pagecache-reporter.py

Authored By
EBernhardson
Dec 2 2020, 9:13 PM
Size
3 KB
Referenced Files
None
Subscribers
None

elasticsearch-pagecache-reporter.py

from argparse import ArgumentParser
from contextlib import contextmanager
from glob import glob
import os
import sys
import elasticsearch
from ctypes import CDLL, POINTER, cast, c_int, c_size_t, c_ssize_t, c_ubyte, c_void_p
from mmap import MAP_SHARED, PROT_READ, PAGESIZE
libc = CDLL('libc.so.6')
assert libc
MAP_FAILED = c_void_p(-1)
# mmap/munmap/mincore definitions mostly copied from
# https://github.com/spotify/mlockexec/blob/master/mlockexec/libc.py
# set correct parameter types
c_off_t = c_ssize_t
# void *mmap(void *addr, size_t length, int prot,
# int flags, int fd, off_t offset);
mmap = libc.mmap
mmap.restype = c_void_p
mmap.argtypes = [c_void_p, c_size_t, c_int, c_int, c_off_t]
# int munmap(void *addr, size_t length);
munmap = libc.munmap
munmap.restype = c_void_p
munmap.argtypes = [c_void_p, c_size_t]
# int mincore(void *addr, size_t length, unsigned char *vec);
libc.mincore.restype = c_int
libc.mincore.argtypes = [c_void_p, c_size_t, POINTER(c_ubyte)]
def mincore(addr, length):
pages = (length + PAGESIZE - 1) // PAGESIZE
vec = (c_ubyte * pages)()
res = libc.mincore(addr, length, cast(vec, POINTER(c_ubyte)))
assert res == 0
return vec
@contextmanager
def open_fd(path: str, flags: int):
fd = os.open(path, flags)
try:
yield fd
finally:
os.close(fd)
@contextmanager
def mmap_cm(addr, length, prot, flags, fd, offset):
addr_mmap = mmap(addr, length, prot, flags, fd, offset)
assert addr_mmap != MAP_FAILED
try:
yield addr_mmap
finally:
munmap(addr_mmap, length)
def file_mincore(path: str):
try:
with open_fd(path, os.O_RDONLY) as fd:
length = os.stat(fd).st_size
# cannot mmap a zero size file
if length == 0:
return []
with mmap_cm(0, length, PROT_READ, MAP_SHARED, fd, 0) as file_mmap:
vec = mincore(file_mmap, length)
return list(vec)
except FileNotFoundError:
# Most likely the file was deleted, at least with current callers
return []
def file_pages_cached(path: str):
return sum(x > 0 for x in file_mincore(path))
def glob_pages_cached(glob_pattern):
return sum(file_pages_cached(fname) for fname in glob(glob_pattern))
def shard_paths(client, indices):
cluster_name = client.cluster.health()['cluster_name']
index_base_path = '/srv/elasticsearch/{cluster_name}/nodes/0/indices'.format(**locals())
assert os.path.exists(index_base_path)
for index_meta in client.cat.indices(indices, format='json', local=True):
index_path = os.path.join(index_base_path, index_meta['uuid'])
index_name = index_meta['index']
if not os.path.exists(index_path):
continue
for shard_path in glob(os.path.join(index_path, '*')):
basename = os.path.basename(shard_path)
if basename == '_state':
continue
shard_id = int(basename)
yield index_meta['index'], shard_id, shard_path
def arg_parser():
parser = ArgumentParser()
parser.add_argument('indices', nargs='+')
parser.add_argument('--port', default=9200, type=int)
return parser
def main(indices, port):
client = elasticsearch.Elasticsearch('localhost:{}'.format(port))
for index_name, shard_id, shard_path in shard_paths(client, ','.join(indices)):
shard_pages = glob_pages_cached(os.path.join(shard_path, 'index/*'))
print('{} {} {} MB'.format(index_name, shard_id, shard_pages * PAGESIZE / 1024 / 1024))
if __name__ == '__main__':
sys.exit(main(**(dict(vars(arg_parser().parse_args())))))

File Metadata

Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
8795557
Default Alt Text
elasticsearch-pagecache-reporter.py (3 KB)

Event Timeline