Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F30524298
raw.txt
No One
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Authored By
dcausse
Oct 2 2019, 1:54 PM
2019-10-02 13:54:22 (UTC+0)
Size
1 KB
Referenced Files
None
Subscribers
None
raw.txt
View Options
spark.sql("ADD JAR hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-hive.jar").collect()
spark.sql("CREATE TEMPORARY FUNCTION get_main_search_request AS 'org.wikimedia.analytics.refinery.hive.GetMainSearchRequestUDF'").collect()
import datetime
date_start = datetime.datetime(2019, 9, 10, 10)
date_end = datetime.datetime(2019, 9, 10, 20)
max_res = 2
wiki = 'enwiki'
sample_size = 5000
max_q_by_day = 30
random_seed = 42
from pyspark.sql import functions as F, types as T, Window
import calendar
ts_start = calendar.timegm(date_start.timetuple())
ts_end = calendar.timegm(date_end.timetuple())
row_timestamp = F.unix_timestamp(F.concat(
F.col('year'), F.lit('-'), F.col('month'), F.lit('-'), F.col('day'),
F.lit(' '), F.col('hour'), F.lit(':00:00')))
w = Window.partitionBy(F.col('http.client_ip'), F.col('year'), F.col('month'), F.col('day'))
w = Window.partitionBy(F.col('http.client_ip'), F.col('year'), F.col('month'), F.col('day'))
df = (
spark.read.table('event.mediawiki_cirrussearch_request')
.where(row_timestamp > ts_start)
.where(row_timestamp < ts_end)
.where(F.col('elasticsearch_requests.query_type')[0] == 'near_match')
.withColumn('areq', F.expr('get_main_search_request(database, elasticsearch_requests)'))
.where(F.col('areq').isNotNull())
.withColumn('q_by_day', F.count(F.lit(1)).over(w))
.where(F.col('q_by_day') <= max_q_by_day)
.where(F.col('database') == 'enwiki')
.groupBy(F.col('http.client_ip'), F.col('year'), F.col('month'), F.col('day'))
.agg(F.collect_list(F.struct('database', 'areq.query', 'areq.hits_total', 'areq.suggestion')).alias('ip_requests'))
.withColumn('random_req_index', F.floor(F.rand(random_seed) * F.size(F.col('ip_requests'))))
.select(F.col('ip_requests')[F.col('random_req_index')].alias('req'))
.select(F.col('req.query'))
)
df.show(5)
File Metadata
Details
Attached
Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
8063809
Default Alt Text
raw.txt (1 KB)
Attached To
Mode
P9228 Load a random sample of fulltext search queries
Attached
Detach File
Event Timeline
Log In to Comment