Page MenuHomePhabricator
Paste P9228

Load a random sample of fulltext search queries
ActivePublic

Authored by dcausse on Oct 2 2019, 1:37 PM.
Tags
None
Referenced Files
F30524306: raw.txt
Oct 2 2019, 1:56 PM
F30524303: raw.txt
Oct 2 2019, 1:56 PM
F30524298: raw.txt
Oct 2 2019, 1:54 PM
F30524281: raw.txt
Oct 2 2019, 1:37 PM
Subscribers
None
spark.sql("ADD JAR hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-hive.jar").collect()
spark.sql("CREATE TEMPORARY FUNCTION get_main_search_request AS 'org.wikimedia.analytics.refinery.hive.GetMainSearchRequestUDF'").collect()
import datetime
date_start = datetime.datetime(2019, 9, 10, 10)
date_end = datetime.datetime(2019, 9, 10, 20)
max_res = 2
wiki = 'enwiki'
sample_size = 5000
max_q_by_day = 30
random_seed = 42
from pyspark.sql import functions as F, types as T, Window
import calendar
ts_start = calendar.timegm(date_start.timetuple())
ts_end = calendar.timegm(date_end.timetuple())
row_timestamp = F.unix_timestamp(F.concat(
F.col('year'), F.lit('-'), F.col('month'), F.lit('-'), F.col('day'),
F.lit(' '), F.col('hour'), F.lit(':00:00')))
w = Window.partitionBy(F.col('http.client_ip'), F.col('year'), F.col('month'), F.col('day'))
df = (
spark.read.table('event.mediawiki_cirrussearch_request')
.where(row_timestamp > ts_start)
.where(row_timestamp < ts_end)
.where(F.col('elasticsearch_requests.query_type')[0] == 'near_match')
.withColumn('areq', F.expr('get_main_search_request(database, elasticsearch_requests)'))
.where(F.col('areq').isNotNull())
.withColumn('q_by_day', F.count(F.lit(1)).over(w))
.where(F.col('q_by_day') <= max_q_by_day)
.where(F.col('database') == 'enwiki')
.groupBy(F.col('http.client_ip'), F.col('year'), F.col('month'), F.col('day'))
.agg(F.collect_list(F.struct('database', 'areq.query', 'areq.hits_total')).alias('ip_requests'))
.withColumn('random_req_index', F.floor(F.rand(random_seed) * F.size(F.col('ip_requests'))))
.select(F.col('ip_requests')[F.col('random_req_index')].alias('req'))
.select(F.col('req.query'), F.col('req.hits_total'))
)
df.show(5)

Event Timeline

dcausse edited the content of this paste. (Show Details)