Page MenuHomePhabricator
Paste P8862

TestSearchSatisfaction2_backfill_2019_08_01.py
ActivePublic

Authored by EBernhardson on Mon, Aug 5, 4:33 PM.
import json
from urllib.parse import quote, unquote
def decode_event(pair):
seq_id, line = pair
try:
raw_event, host, unk, dt, client_ip, user_agent = line.strip().split('\t')
except ValueError:
return
try:
event = json.loads(unquote(raw_event)[1:-1])
except json.decoder.JSONDecodeError:
return
if event.get('schema') != 'TestSearchSatisfaction2':
return
yield (event, host, unk, dt, client_ip, user_agent)
def fix_event(value):
event, host, unk, dt, client_ip, user_agent = value
fixed = False
if event['revision'] == 19240637:
# Bad schema had isForced as a required argument
event['revision'] = 19250889
fixed = True
if isinstance(event['event'].get('sampleMultiplier'), str):
# Schema says sampleMultiplier has to be a number
event['event']['sampleMultiplier'] = float(event['event']['sampleMultiplier'])
fixed = True
if fixed:
fixed_event = '?{};'.format(quote(json.dumps(event)))
yield (fixed_event, host, unk, dt, client_ip, user_agent)
path = '/wmf/data/raw/eventlogging_client_side/eventlogging-client-side/hourly/2019/08/02/*/*'
(
sc.sequenceFile(path)
.flatMap(decode_event)
.flatMap(fix_event)
.map(lambda x: '\t'.join(x))
.repartition(1)
.saveAsTextFile(
path='/user/ebernhardson/2019-08-01-TestSearchSatisfaction2',
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
)