import json from urllib.parse import quote, unquote def decode_event(pair): seq_id, line = pair try: raw_event, host, unk, dt, client_ip, user_agent = line.strip().split('\t') except ValueError: return try: event = json.loads(unquote(raw_event)[1:-1]) except json.decoder.JSONDecodeError: return if event.get('schema') != 'TestSearchSatisfaction2': return yield (event, host, unk, dt, client_ip, user_agent) def fix_event(value): event, host, unk, dt, client_ip, user_agent = value fixed = False if event['revision'] == 19240637: # Bad schema had isForced as a required argument event['revision'] = 19250889 fixed = True if isinstance(event['event'].get('sampleMultiplier'), str): # Schema says sampleMultiplier has to be a number event['event']['sampleMultiplier'] = float(event['event']['sampleMultiplier']) fixed = True if fixed: fixed_event = '?{};'.format(quote(json.dumps(event))) yield (fixed_event, host, unk, dt, client_ip, user_agent) path = '/wmf/data/raw/eventlogging_client_side/eventlogging-client-side/hourly/2019/08/02/*/*' ( sc.sequenceFile(path) .flatMap(decode_event) .flatMap(fix_event) .map(lambda x: '\t'.join(x)) .repartition(1) .saveAsTextFile( path='/user/ebernhardson/2019-08-01-TestSearchSatisfaction2', compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec") )