from pyspark.sql import functions as F, types as T, Window df = ( spark.read.table("wmf.webrequest") .where(F.col('year') == '2019') .where(F.col('month') == '10') .where(F.col('day') == '08') .where(F.col('hour') == '11') .where(F.array_contains(F.col('tags'), 'sparql')) .cache() ) df.groupBy(df.http_method).agg(F.count(F.lit(1))).show(20) queries = df.where(df.http_method == 'GET').select('uri_query') from urllib.parse import parse_qsl def ext_query(uq): uq = uq[1:] parsed = parse_qsl(uq) try: matches = [q[1] for q in parsed if q[0] == 'query'] return matches[0] except IndexError: return None ext_query_udf = F.udf(ext_query) sparql_queries = (queries.withColumn('sparql_query', ext_query_udf(df.uri_query)) .where(F.col('sparql_query').isNotNull()) .select(F.col('sparql_query'))) fetched_queries = sparql_queries.distinct().orderBy(F.rand()).limit(10000).toPandas() with open('/home/dcausse/sparqlqueries.lst', 'w', encoding='utf-8') as f: for idx, q in fetched_queries.iterrows(): f.write(q['sparql_query']) f.write('\n---\n')