Page MenuHomePhabricator

Create and test a dump+filter+load process to reindex logstash data that is not ES 2.x safe
Closed, ResolvedPublic

Description

We have kept logstash data going back to 2016-07-01 in Elasticsearch, but none of the current indices are actually clean enough for importing into Elasticsearch 2.x yet. We believe that once 1.28.0-wmf.10 is on all wikis we will have clean indices (expected starting with 2016-07-15).

We should be able to clean the data from the older indices with a combination of filtering of the data and reloading into Elasticsearch using our new default mapping. The process would be roughly:

  • Export records from an index using P3309 or something similar
  • Reformat keys with embedded . via P3357 or something similar
  • Discard mml records via some filtering script
  • Reimport into Elasticsearch via split -l 4000 --filter 'curl -s http://elastic:9200/{indexName}/_bulk --data-binary @- > /dev/null' or a similar loading script

Event Timeline

bd808 raised the priority of this task from Medium to High.Jul 13 2016, 6:21 PM
bd808 updated the task description. (Show Details)

Testing out this dump and clean script on the logstash-2016.07.01 index:

1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (c) 2016 Bryan Davis and Wikimedia Foundation. All Rights Reserved.
5# Released under the Apache 2.0 license.
6
7import argparse
8import httplib
9import json
10import sys
11
12def fetch(index, host='127.0.0.1', port=9200):
13 conn = httplib.HTTPConnection(host=host, port=port)
14 conn.request('GET', '/%s/_search?scroll=1m' % index, json.dumps({
15 'size': 1000,
16 'sort': ['_doc'],
17 }))
18 data = json.loads(conn.getresponse().read())
19 scroll_id = data['_scroll_id']
20 found = len(data['hits']['hits'])
21 while found != 0:
22 for hit in data['hits']['hits']:
23 yield hit
24 # fetch the next set of scroll results
25 conn.request('GET', '/_search/scroll?scroll=1m', scroll_id)
26 data = json.loads(conn.getresponse().read())
27 scroll_id = data['_scroll_id']
28 found = len(data['hits']['hits'])
29 # clear the scroll context on the server to be nice
30 conn.request('DELETE', '/_search/scroll', json.dumps({
31 'scroll_id': [scroll_id],
32 }))
33
34
35def de_dot(value):
36 if type(value) == dict:
37 return {k.replace('.', '_'): de_dot(v) for k,v in value.iteritems()}
38 elif type(value) == list:
39 return [de_dot(v) for v in value]
40 else:
41 return value
42
43parser = argparse.ArgumentParser(
44 description='Dump an Elasticsearch index as bulk load records')
45parser.add_argument('index', help='index to dump')
46parser.add_argument('-s', '--server', default='127.0.0.1',
47 help='Elasticsearch server')
48parser.add_argument('-p', '--port', type=int, default=9200,
49 help='Elasticsearch port')
50args = parser.parse_args()
51
52for log in fetch(args.index, host=args.server, port=args.port):
53 if 'type' in log and log['type'] == 'mml':
54 continue
55 print '{"index":{"_index":"%s","_type":"%s","_id":"%s"}}' % (
56 args.index, log['_type'], log['_id'])
57 print json.dumps(de_dot(log['_source']))

All dumps completed. The 2016.07.0[123] indices have been reimported from the dumps and are showing clean. We can backfill the indices between 2016.07.04 and 2016.07.14 following the cluster upgrade.