from prometheus_client import make_wsgi_app, Summary from prometheus_client.core import (CounterMetricFamily, GaugeMetricFamily, REGISTRY) from wsgiref.simple_server import make_server import json import logging import sys logging.basicConfig() log = logging.getLogger(__name__) app = make_wsgi_app() druid_collector = None class DruidCollector(object): scrape_duration = Summary( 'druid_scrape_duration_seconds', 'Druid scrape duration') def __init__(self): self.supported_metric_names = [ 'query/time', 'query/bytes', 'query/node/time', 'query/node/bytes', 'query/cache/total/numEntries', 'query/cache/total/sizeBytes', 'query/cache/total/hits', 'query/cache/total/misses', 'query/cache/total/evictions', 'query/cache/total/timeouts', 'query/cache/total/errors', 'jvm/pool/committed', 'jvm/pool/init', 'jvm/pool/max', 'jvm/pool/used', 'jvm/bufferpool/count', 'jvm/bufferpool/used', 'jvm/bufferpool/capacity', 'jvm/mem/init', 'jvm/mem/max', 'jvm/mem/used', 'jvm/mem/committed', 'jvm/gc/count', 'jvm/gc/time', ] self.metric_buckets = { 'query/time': [10, 100, 500] } self.metrics = {} @scrape_duration.time() def collect(self): for metric in self.metrics.values(): yield metric def register_datapoint(self,datapoint): if datapoint['metric'] not in self.supported_metric_names: return if not self.metrics: for daemon in ['historical', 'broker']: metrics = { daemon + '_query_time': CounterMetricFamily( 'druid_' + daemon + '_query_time', 'Milliseconds taken to complete a query.', labels=['datasource', 'le']), daemon + '_query_bytes': GaugeMetricFamily( 'druid_' + daemon + '_query_bytes', 'Number of bytes returned in query response.', labels=['datasource', 'le']), daemon + '_query_node_time': GaugeMetricFamily( 'druid_' + daemon + '_query_node_time', 'Milliseconds taken to query individual ' 'historical/realtime nodes.', labels=['datasource', 'server', 'le']), daemon + '_query_node_bytes': GaugeMetricFamily( 'druid_' + daemon + '_query_node_bytes', 'number of bytes returned from querying individual ' 'historical/realtime nodes.', labels=['datasource', 'server', 'le']), daemon + '_querycache_total_numentries': GaugeMetricFamily( 'druid_' + daemon + '_query_cache_total_numentries', 'Number of cache entries.', labels=[]), daemon + '_query_cache_total_sizebytes': GaugeMetricFamily( 'druid_' + daemon + '_query_cache_total_sizebytes', 'Size in bytes of cache entries.', labels=[]), daemon + '_query_cache_total_hits': GaugeMetricFamily( 'druid_' + daemon + '_query_cache_total_hits', 'Number of cache hits.', labels=[]), daemon + '_query_cache_total_misses': GaugeMetricFamily( 'druid_' + daemon + '_query_cache_total_misses', 'Number of cache misses.', labels=[]), daemon + '_query_cache_total_evictions': GaugeMetricFamily( 'druid_' + daemon + '_query_cache_total_evictions', 'Number of cache evictions.', labels=[]), daemon + '_query_cache_total_timeouts': GaugeMetricFamily( 'druid_' + daemon + '_query_cache_total_timeouts', 'Number of cache timeouts.', labels=[]), daemon + '_query_cache_total_errors': GaugeMetricFamily( 'druid_' + daemon + '_query_cache_total_errors', 'Number of cache errors.', labels=[]), } self.metrics.update(metrics) # Transform the metric name into a metrics' key daemon_name = str(datapoint['service'].replace('druid/','').lower()) metric_name_key = str(daemon_name + '_' + datapoint['metric'].replace('/','_').lower()) # If the metric has any labels associated, their names should be # the same as the datapoint's related key. If no label is registered, # excute the else statement with the default [] value. labels = self.metrics[metric_name_key]._labelnames metric_buckets = self.metric_buckets[datapoint['metric']] metric_value = int(datapoint['value']) label_values = [] for label in labels: if label == 'le': continue else: label_values.append(datapoint[label]) if 'le' in labels: for bucket in metric_buckets: if metric_value <= bucket: self.metrics[metric_name_key].add_metric( label_values + [str(bucket)], 1) else: self.metrics[metric_name_key].add_metric( [], float(datapoint['value'])) def druid_app(environ, start_response): if environ['REQUEST_METHOD'] == 'POST': try: request_body_size = int(environ.get('CONTENT_LENGTH', 0)) request_body = environ['wsgi.input'].read(request_body_size) request_body_decoded = json.loads(request_body) global druid_collector druid_collector.register_datapoint(request_body_decoded) status = '200 OK' headers = [('Content-Type', 'text/plain')] except Exception as e: log.exception( "Error while processing the following POST data: {}" .format(request_body) ) status = '400 Bad Request' headers = [('Content-Type', 'text/plain')] start_response(status, headers) return '' else: return app(environ, start_response) def main(): global druid_collector druid_collector = DruidCollector() REGISTRY.register(druid_collector) httpd = make_server('localhost', 8000, druid_app) httpd.serve_forever() if __name__ == "__main__": sys.exit(main())