Page MenuHomePhabricator
Authored By
EBernhardson
Aug 5 2019, 4:33 PM
Size
1 KB
Referenced Files
None
Subscribers
None
import json
from urllib.parse import quote, unquote
def decode_event(pair):
seq_id, line = pair
try:
raw_event, host, unk, dt, client_ip, user_agent = line.strip().split('\t')
except ValueError:
return
try:
event = json.loads(unquote(raw_event)[1:-1])
except json.decoder.JSONDecodeError:
return
if event.get('schema') != 'TestSearchSatisfaction2':
return
yield (event, host, unk, dt, client_ip, user_agent)
def filter_query_array(value):
event = value[0]
return 'query' in event['event'] and not isinstance(event['event']['query'], str)
def fix_event(value):
event, host, unk, dt, client_ip, user_agent = value
fixed = False
if event['revision'] == 19240637:
# Bad schema had isForced as a required argument
event['revision'] = 19250889
fixed = True
if isinstance(event['event'].get('sampleMultiplier'), str):
# Schema says sampleMultiplier has to be a number
event['event']['sampleMultiplier'] = float(event['event']['sampleMultiplier'])
fixed = True
if fixed:
fixed_event = '?{};'.format(quote(json.dumps(event)))
yield (fixed_event, host, unk, dt, client_ip, user_agent)
path = '/wmf/data/raw/eventlogging_client_side/eventlogging-client-side/hourly/2019/08/02/*/*'
(
sc.sequenceFile(path)
.flatMap(decode_event)
.flatMap(fix_event)
.map(lambda x: '\t'.join(x))
.repartition(1)
.saveAsTextFile(
path='/user/ebernhardson/2019-08-01-TestSearchSatisfaction2',
compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
)

File Metadata

Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
7824949
Default Alt Text
raw.txt (1 KB)

Event Timeline