Page MenuHomePhabricator
Paste P76406

Search weighted tags from search index dumps
ActivePublic

Authored by dcausse on May 22 2025, 1:38 PM.
Tags
None
Referenced Files
F60378770: Search weighted tags from search index dumps
May 22 2025, 1:38 PM
Subscribers
import wmfdata
from pyspark.sql.functions import udf
from pyspark.sql import functions as F, types as T, Window
from pyspark.sql.types import BooleanType, MapType, StructType, FloatType, StructField, StringType, ArrayType
spark = wmfdata.spark.create_session()
@udf(returnType=MapType(StringType(), ArrayType(StructType([StructField("tag", StringType(), True), StructField("score", FloatType(), True)]))))
def unpack_tags(tags):
unpacked = {}
if not tags:
return {}
for tag in tags:
family, tag_score = tag.split('/', 1)
if family == 'classification.ores.drafttopic':
family = 'classification.prediction.drafttopic'
if family == 'classification.ores.articletopic':
family = 'classification.prediction.articletopic'
if '|' in tag_score:
tag, score = tag_score.split('|', 1)
score = int(score)/1000
else:
tag = tag_score
score = 1.0
if family not in unpacked:
unpacked[family] = [{"tag": tag, "score": score}]
else:
unpacked[family].append({"tag": tag, "score": score})
return unpacked
@udf(returnType=BooleanType())
def has_tag(tags, family, tag):
if family in tags:
ts = tags[family]
for t in ts:
if t["tag"] == tag:
return True
return False
@udf(returnType=ArrayType(StringType()))
def get_tags(tags, family):
if family in tags:
return [t["tag"] for t in tags[family]]
return []
pages_with_tags = (spark.read.table("discovery.cirrus_index_without_content")
.where("cirrus_replica='codfw' AND snapshot='20250420' and wiki == 'enwiki' AND namespace=0")
.filter(F.size(F.col("weighted_tags")) > 0)
.select("page_id", "title", "weighted_tags")
.withColumn("weighted_tags", unpack_tags(F.col("weighted_tags"))))
pages_with_tags.show(10, False)
# Top 20 topics from pages in enwiki NS_MAIN with link recommendation
(pages_with_tags
.filter(has_tag("weighted_tags", F.lit("recommendation.link"), F.lit("exists")))
.withColumn("topics", F.explode(get_tags("weighted_tags", F.lit("classification.prediction.articletopic"))))
.groupBy("topics")
.agg(F.count("*").alias("cnt"))
.orderBy(F.col("cnt").desc())
.show(20, False))

Event Timeline

Hi @dcausse ,

Thank you very much for sharing this. Works like a charm!
Looking into the data, we have article topic predictions with scores, and if the article has a link recommendation as a boolean.
This is awesome.
So, if we want to find the add-a-link recommendation scores, we should look in either maria db directly or another index.

I think we have the add-a-link recommendation scores here for shown recommendations but not for the all available ones.

I'll investigate further.

Hi @dcausse ,

Thank you very much for sharing this. Works like a charm!
Looking into the data, we have article topic predictions with scores, and if the article has a link recommendation as a boolean.
This is awesome.
So, if we want to find the add-a-link recommendation scores, we should look in either maria db directly or another index.

The search index does indeed only store a flag for recommendation, if there are confidence attached to it they're not sent to us.
Sadly I'm not quite clear where and if this data is stored somewhere.
The flag used to get injected into the search index via the stream mediawiki.revision-recommendation-create but nowadays I believe these are pushed directly via LinkRecommendationUpdater but better to ask the Growth team what this does. The confidence scores can be part of the search index if this is useful but I think we should probably discuss about why you would need this, the search index is useful to produce ranked list of recommendations and we would not want it to be a generic feature store.

thank you @dcausse ,

Indeed for now, I'm only doing an investigation to understand how add-a-link works end-to-end in scope of this goal.
Exactly, I'm following it up with the php implementation and the related dbs.
Thank you very much for your help.