Page MenuHomePhabricator
Paste P9281

extract wdqs sparql queries
ActivePublic

Authored by dcausse on Wed, Oct 9, 1:39 PM.
from pyspark.sql import functions as F, types as T, Window
df = (
spark.read.table("wmf.webrequest")
.where(F.col('year') == '2019')
.where(F.col('month') == '10')
.where(F.col('day') == '08')
.where(F.col('hour') == '11')
.where(F.array_contains(F.col('tags'), 'sparql'))
.cache()
)
df.groupBy(df.http_method).agg(F.count(F.lit(1))).show(20)
queries = df.where(df.http_method == 'GET').select('uri_query')
from urllib.parse import parse_qsl
def ext_query(uq):
uq = uq[1:]
parsed = parse_qsl(uq)
try:
matches = [q[1] for q in parsed if q[0] == 'query']
return matches[0]
except IndexError:
return None
ext_query_udf = F.udf(ext_query)
sparql_queries = (queries.withColumn('sparql_query', ext_query_udf(df.uri_query))
.where(F.col('sparql_query').isNotNull())
.select(F.col('sparql_query')))
fetched_queries = sparql_queries.distinct().orderBy(F.rand()).limit(10000).toPandas()
with open('/home/dcausse/sparqlqueries.lst', 'w', encoding='utf-8') as f:
for idx, q in fetched_queries.iterrows():
f.write(q['sparql_query'])
f.write('\n---\n')