Page MenuHomePhabricator
Paste P13149

nel-kafkacat-exporter.py
ActivePublic

Authored by CDanis on Nov 3 2020, 9:18 PM.
Tags
None
Referenced Files
F32427626: nel-kafkacat-exporter.py
Nov 3 2020, 9:19 PM
F32427625: Command-Line Input
Nov 3 2020, 9:18 PM
Subscribers
None
#!/usr/bin/env python3
"""
Pipe NEL kafakcat into this and get some Prometheus counters as a textfile.
kafkacat -C -b logstash1010.eqiad.wmnet -G cdanis-kafkacat-test \
'^(codfw|eqiad)\.w3c\.reportingapi\.network_error$' \
| nel-kafkacat-exporter.py
"""
__author__ = 'Chris Danis'
__version__ = '0.0.1'
__copyright__ = """
Copyright © 2020 Chris Danis & the Wikimedia Foundation
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the License for the specific language governing permissions
and limitations under the License.
"""
import fileinput
import json
import logging
import time
import geoip2.database
from prometheus_client import CollectorRegistry, Counter, write_to_textfile
OUTFILE = 'test.prom'
#city_db = geoip2.database.Reader("/usr/share/GeoIP/GeoIP2-City.mmdb")
country_db = geoip2.database.Reader("/usr/share/GeoIP/GeoIP2-Country.mmdb")
isp_db = geoip2.database.Reader("/usr/share/GeoIP/GeoIP2-ISP.mmdb")
registry = CollectorRegistry()
last_written = 0
by_type = Counter(
'reports_by_type', 'Reports received by type', ['type'],
namespace='nel', registry=registry,
)
timeouts_by_country = Counter(
'tcptimeouts_by_country', 'tcp.timed_out reports received by country', ['country'],
namespace='nel', registry=registry,
)
timeouts_by_continent = Counter(
'tcptimeouts_by_continent', 'tcp.timed_out reports received by continent', ['continent'],
namespace='nel', registry=registry,
)
timeouts_by_asn = Counter(
'tcptimeouts_by_asn', 'tcp.timed_out reports received by AS number', ['asn'],
namespace='nel', registry=registry,
)
def maybe_write(force=False):
global last_written
now = time.time()
if force or now - 10 > last_written:
logging.error("writing")
write_to_textfile(OUTFILE, registry)
last_written = now
for line in fileinput.input():
try:
j = json.loads(line)
by_type.labels(type=j['body']['type']).inc()
if j['body']['type'] != 'tcp.timed_out':
continue
addr = j['http']['client_ip']
ip_asn = isp_db.isp(addr).autonomous_system_number
timeouts_by_asn.labels(asn=ip_asn).inc()
ip_countrycode = country_db.country(addr).country.iso_code
timeouts_by_country.labels(country=ip_countrycode).inc()
ip_continent = country_db.country(addr).continent.code
timeouts_by_continent.labels(continent=ip_continent).inc()
except:
logging.exception('error processing a line')
finally:
maybe_write()
maybe_write(True)

Event Timeline

CDanis changed the title of this paste from Command-Line Input to nel-kafkacat-exporter.py.Nov 3 2020, 9:19 PM
CDanis edited the content of this paste. (Show Details)
CDanis updated the paste's language from autodetect to python.Apr 8 2022, 2:34 PM