Page MenuHomePhabricator
Paste P6247

(An Untitled Masterwork)
ActivePublic

Authored by elukey on Nov 2 2017, 9:00 AM.
Tags
None
Referenced Files
F10565491:
Nov 2 2017, 9:00 AM
Subscribers
None
from prometheus_client import make_wsgi_app, Summary
from prometheus_client.core import (CounterMetricFamily, GaugeMetricFamily,
REGISTRY)
from wsgiref.simple_server import make_server
import json
import logging
import sys
logging.basicConfig()
log = logging.getLogger(__name__)
app = make_wsgi_app()
druid_collector = None
class DruidCollector(object):
scrape_duration = Summary(
'druid_scrape_duration_seconds', 'Druid scrape duration')
def __init__(self):
self.supported_metric_names = [
'query/time',
'query/bytes',
'query/node/time',
'query/node/bytes',
'query/cache/total/numEntries',
'query/cache/total/sizeBytes',
'query/cache/total/hits',
'query/cache/total/misses',
'query/cache/total/evictions',
'query/cache/total/timeouts',
'query/cache/total/errors',
'jvm/pool/committed',
'jvm/pool/init',
'jvm/pool/max',
'jvm/pool/used',
'jvm/bufferpool/count',
'jvm/bufferpool/used',
'jvm/bufferpool/capacity',
'jvm/mem/init',
'jvm/mem/max',
'jvm/mem/used',
'jvm/mem/committed',
'jvm/gc/count',
'jvm/gc/time',
]
self.metric_buckets = {
'query/time': [10, 100, 500]
}
self.metrics = {}
@scrape_duration.time()
def collect(self):
for metric in self.metrics.values():
yield metric
def register_datapoint(self,datapoint):
if datapoint['metric'] not in self.supported_metric_names:
return
if not self.metrics:
for daemon in ['historical', 'broker']:
metrics = {
daemon + '_query_time': CounterMetricFamily(
'druid_' + daemon + '_query_time',
'Milliseconds taken to complete a query.',
labels=['datasource', 'le']),
daemon + '_query_bytes': GaugeMetricFamily(
'druid_' + daemon + '_query_bytes',
'Number of bytes returned in query response.',
labels=['datasource', 'le']),
daemon + '_query_node_time': GaugeMetricFamily(
'druid_' + daemon + '_query_node_time',
'Milliseconds taken to query individual '
'historical/realtime nodes.',
labels=['datasource', 'server', 'le']),
daemon + '_query_node_bytes': GaugeMetricFamily(
'druid_' + daemon + '_query_node_bytes',
'number of bytes returned from querying individual '
'historical/realtime nodes.',
labels=['datasource', 'server', 'le']),
daemon + '_querycache_total_numentries': GaugeMetricFamily(
'druid_' + daemon + '_query_cache_total_numentries',
'Number of cache entries.',
labels=[]),
daemon + '_query_cache_total_sizebytes': GaugeMetricFamily(
'druid_' + daemon + '_query_cache_total_sizebytes',
'Size in bytes of cache entries.',
labels=[]),
daemon + '_query_cache_total_hits': GaugeMetricFamily(
'druid_' + daemon + '_query_cache_total_hits',
'Number of cache hits.',
labels=[]),
daemon + '_query_cache_total_misses': GaugeMetricFamily(
'druid_' + daemon + '_query_cache_total_misses',
'Number of cache misses.',
labels=[]),
daemon + '_query_cache_total_evictions': GaugeMetricFamily(
'druid_' + daemon + '_query_cache_total_evictions',
'Number of cache evictions.',
labels=[]),
daemon + '_query_cache_total_timeouts': GaugeMetricFamily(
'druid_' + daemon + '_query_cache_total_timeouts',
'Number of cache timeouts.',
labels=[]),
daemon + '_query_cache_total_errors': GaugeMetricFamily(
'druid_' + daemon + '_query_cache_total_errors',
'Number of cache errors.',
labels=[]),
}
self.metrics.update(metrics)
# Transform the metric name into a metrics' key
daemon_name = str(datapoint['service'].replace('druid/','').lower())
metric_name_key = str(daemon_name + '_' + datapoint['metric'].replace('/','_').lower())
# If the metric has any labels associated, their names should be
# the same as the datapoint's related key. If no label is registered,
# excute the else statement with the default [] value.
labels = self.metrics[metric_name_key]._labelnames
metric_buckets = self.metric_buckets[datapoint['metric']]
metric_value = int(datapoint['value'])
label_values = []
for label in labels:
if label == 'le':
continue
else:
label_values.append(datapoint[label])
if 'le' in labels:
for bucket in metric_buckets:
if metric_value <= bucket:
self.metrics[metric_name_key].add_metric(
label_values + [str(bucket)], 1)
else:
self.metrics[metric_name_key].add_metric(
[], float(datapoint['value']))
def druid_app(environ, start_response):
if environ['REQUEST_METHOD'] == 'POST':
try:
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
request_body = environ['wsgi.input'].read(request_body_size)
request_body_decoded = json.loads(request_body)
global druid_collector
druid_collector.register_datapoint(request_body_decoded)
status = '200 OK'
headers = [('Content-Type', 'text/plain')]
except Exception as e:
log.exception(
"Error while processing the following POST data: {}"
.format(request_body)
)
status = '400 Bad Request'
headers = [('Content-Type', 'text/plain')]
start_response(status, headers)
return ''
else:
return app(environ, start_response)
def main():
global druid_collector
druid_collector = DruidCollector()
REGISTRY.register(druid_collector)
httpd = make_server('localhost', 8000, druid_app)
httpd.serve_forever()
if __name__ == "__main__":
sys.exit(main())