Page MenuHomePhabricator

Search weighted tags from search index dumps

Authored By
dcausse
May 22 2025, 1:38 PM
Size
2 KB
Referenced Files
None
Subscribers
None

Search weighted tags from search index dumps

import wmfdata
from pyspark.sql.functions import udf
from pyspark.sql import functions as F, types as T, Window
from pyspark.sql.types import BooleanType, MapType, StructType, FloatType, StructField, StringType, ArrayType
spark = wmfdata.spark.create_session()
@udf(returnType=MapType(StringType(), ArrayType(StructType([StructField("tag", StringType(), True), StructField("score", FloatType(), True)]))))
def unpack_tags(tags):
unpacked = {}
if not tags:
return {}
for tag in tags:
family, tag_score = tag.split('/', 1)
if family == 'classification.ores.drafttopic':
family = 'classification.prediction.drafttopic'
if family == 'classification.ores.articletopic':
family = 'classification.prediction.articletopic'
if '|' in tag_score:
tag, score = tag_score.split('|', 1)
score = int(score)/1000
else:
tag = tag_score
score = 1.0
if family not in unpacked:
unpacked[family] = [{"tag": tag, "score": score}]
else:
unpacked[family].append({"tag": tag, "score": score})
return unpacked
@udf(returnType=BooleanType())
def has_tag(tags, family, tag):
if family in tags:
ts = tags[family]
for t in ts:
if t["tag"] == tag:
return True
return False
@udf(returnType=ArrayType(StringType()))
def get_tags(tags, family):
if family in tags:
return [t["tag"] for t in tags[family]]
return []
pages_with_tags = (spark.read.table("discovery.cirrus_index_without_content")
.where("cirrus_replica='codfw' AND snapshot='20250420' and wiki == 'enwiki' AND namespace=0")
.filter(F.size(F.col("weighted_tags")) > 0)
.select("page_id", "title", "weighted_tags")
.withColumn("weighted_tags", unpack_tags(F.col("weighted_tags"))))
pages_with_tags.show(10, False)
# Top 20 topics from pages in enwiki NS_MAIN with link recommendation
(pages_with_tags
.filter(has_tag("weighted_tags", F.lit("recommendation.link"), F.lit("exists")))
.withColumn("topics", F.explode(get_tags("weighted_tags", F.lit("classification.prediction.articletopic"))))
.groupBy("topics")
.agg(F.count("*").alias("cnt"))
.orderBy(F.col("cnt").desc())
.show(20, False))

File Metadata

Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
20226308
Default Alt Text
Search weighted tags from search index dumps (2 KB)

Event Timeline