Page MenuHomePhabricator
Paste P3458

logstash-dump-and-clean.py
ActivePublic

Authored by bd808 on Jul 15 2016, 8:04 PM.
Referenced Files
F4278249: logstash-dump-and-clean.py
Jul 15 2016, 8:06 PM
F4278239: logstash-dump-and-clean.py
Jul 15 2016, 8:04 PM
Subscribers
None
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Bryan Davis and Wikimedia Foundation. All Rights Reserved.
# Released under the Apache 2.0 license.
import argparse
import httplib
import json
import sys
def fetch(index, host='127.0.0.1', port=9200):
conn = httplib.HTTPConnection(host=host, port=port)
conn.request('GET', '/%s/_search?scroll=1m' % index, json.dumps({
'size': 1000,
'sort': ['_doc'],
}))
data = json.loads(conn.getresponse().read())
scroll_id = data['_scroll_id']
found = len(data['hits']['hits'])
while found != 0:
for hit in data['hits']['hits']:
yield hit
# fetch the next set of scroll results
conn.request('GET', '/_search/scroll?scroll=1m', scroll_id)
data = json.loads(conn.getresponse().read())
scroll_id = data['_scroll_id']
found = len(data['hits']['hits'])
# clear the scroll context on the server to be nice
conn.request('DELETE', '/_search/scroll', json.dumps({
'scroll_id': [scroll_id],
}))
def de_dot(value):
if type(value) == dict:
return {k.replace('.', '_'): de_dot(v) for k,v in value.iteritems()}
elif type(value) == list:
return [de_dot(v) for v in value]
else:
return value
parser = argparse.ArgumentParser(
description='Dump an Elasticsearch index as bulk load records')
parser.add_argument('index', help='index to dump')
parser.add_argument('-s', '--server', default='127.0.0.1',
help='Elasticsearch server')
parser.add_argument('-p', '--port', type=int, default=9200,
help='Elasticsearch port')
args = parser.parse_args()
for log in fetch(args.index, host=args.server, port=args.port):
if 'type' in log and log['type'] == 'mml':
continue
print '{"index":{"_index":"%s","_type":"%s","_id":"%s"}}' % (
args.index, log['_type'], log['_id'])
print json.dumps(de_dot(log['_source']))