Page MenuHomePhabricator

[SPIKE] Check the Wikimedia content history dataset
Closed, ResolvedPublic

Description

The wmf_content.mediawiki_content_history_v1 Hive table is updated daily and might replace some critical monthly/weekly dependencies.
Docs at https://wikitech.wikimedia.org/wiki/Data_Platform/Data_Lake/Content/Mediawiki_content_history_v1.

Tasks

  • quickly explore the dataset
  • check how many dependencies it could replace, most notably:
    • wmf.mediawiki_wikitext_current
    • wmf.wikidata_item_page_link
    • wmf.wikidata_entity
    • structured_data.commons_entity
  • estimate work to let data pipelines consume the dataset

Event Timeline

Discussion thread about the differences between wmf_content.mediawiki_content_history_v1 and wmf.wikidata_entity: https://wikimedia.slack.com/archives/CSV483812/p1740173644584339

Replacing wmf.mediawiki_wikitext_current should be easy enough. Something like this will get only the latest revision

WITH all_file_revisions AS (
        SELECT wiki_id, revision_id, page_id, page_title, page_redirect_target, 
        revision_content_slots['main'].content_body AS revision_text,
        ROW_NUMBER() OVER (PARTITION BY wiki_id,page_id ORDER BY revision_dt DESC) AS revision_number_desc
        FROM wmf_content.mediawiki_content_history_v1
        WHERE page_namespace_id=0
    )
    SELECT wiki_id, revision_id, page_id, page_title, page_redirect_target, revision_text
    FROM all_file_revisions WHERE revision_number_desc=1

Note that this table is used in section-topics and seal, and not in image-suggestions directly

Here's how to replace structured_data.commons_entity ...

In entity_images.py define a udf

SCHEMA = StructType(
    [
        StructField('depicts', ArrayType(StringType()), False),
    ]
)

@udf(returnType=SCHEMA)
def parse_mediainfo(mediainfo: str) -> list:

    depicted = []

    try:
        mediainfo_dict = json.loads(mediainfo)
        if mediainfo_dict["statements"].get('P180'):
            snaks = mediainfo_dict["statements"].get('P180')
            for snak in snaks:
                depicted.append(snak['mainsnak']['datavalue']['value']['id'])

        if mediainfo_dict["statements"].get('P6243'):
            snaks = mediainfo_dict["statements"].get('P6243')
            for snak in snaks:
                depicted.append(snak['mainsnak']['datavalue']['value']['id'])

    except Exception as e:
        # do nothing, just allow the empty array to be returned
        pass
    
    return { "depicts": sorted(set(depicted)) }

Register the udf and replace get_depicts() with something like this

def get_depicts(spark: SparkSession) -> DataFrame:
    return spark.sql("""
        WITH 
        commons_mediainfo AS (
            SELECT
                page_id,
                page_title,
                revision_content_slots['mediainfo'].content_body AS mediainfo,
                revision_dt,
                ROW_NUMBER() OVER (PARTITION BY wiki_id,page_id ORDER BY revision_dt DESC) AS revision_number_desc
            FROM wmf_content.mediawiki_content_history_v1
            WHERE wiki_id="commonswiki"
            AND page_namespace_id=6
        ),
        latest_rev_with_depicts_array AS (
            select page_id,parse_mediainfo(mediainfo).depicts as depicts from commons_mediainfo where revision_number_desc=1
        )
        select page_id, item_id
        from latest_rev_with_depicts_array
        lateral view explode(depicts) as item_id
    """)..withColumn('depicts', F.lit(1))

Then in entity_images.py in the get() function, replace depicts.property_id.isNotNull() with depicts.depicts.isNotNull()

I think that should do it

Cparle updated the task description. (Show Details)

The proposal on T385787#10652695 looks good to me.

Minor comment: for your row_number(), you want: ROW_NUMBER() OVER (PARTITION BY wiki_id, page_id ORDER BY revision_dt DESC, revision_id DESC) AS revision_number_desc. You want to also do a secondary order on revision_id DESC because the resolution of revision_dt is only to the second (this is a MW limitation). The revision_id helps you disambiguate if there was a quick burst of revisions.


Side note: I really wish we at least had wmf_content.mediawiki_content_current_v1 ready for you so that you didn’t need to do the row_number() at all. This will be pursued on T366544.

We can get equivalent data to that in wmf.wikidata_item_page_link and wmf.wikdiata_entity by first defining a udf

import urllib.parse
import json
from pyspark.sql.functions import udf


SCHEMA = StructType(
    [
        StructField('links', ArrayType(MapType(StringType(), StringType())), True),
        StructField('statements', MapType(StringType(), ArrayType(StringType())), True),
    ]
)

@udf(returnType=SCHEMA)
def parse_wikidata(wikidata_json: str) -> dict:

    links = []
    statements = {
        "P18": [],
        "P31": [],
        "P373": [],
    }

    try:
        wikidata_dict = json.loads(wikidata_json)
        sitelinks = wikidata_dict["sitelinks"]
        for wiki_db, sitelink in sitelinks.items():    
            links.append({"wiki_db":sitelink["site"], "page_title":urllib.parse.unquote(sitelink["title"])})
        claims = wikidata_dict["claims"]
        for property_id, property_claims in claims.items():
            if property_id == "P31":
                for claim in property_claims:
                    statements[property_id].append(claim["mainsnak"]["datavalue"]["value"]["id"])
            elif property_id == "P18" or property_id == "P373":
                for claim in property_claims:
                    statements[property_id].append(claim["mainsnak"]["datavalue"]["value"])
                
    except json.JSONDecodeError as e:
        # wikidata json can't be decoded - do nothing, just allow empty data to be returned
        pass
    
    return {"links":links, "statements": statements}
    
spark.udf.register('parse_wikidata', parse_wikidata)

... and then a new function probably in shared.py

def load_parsed_wikidata(spark)
    return spark.sql("""
        WITH 
        wikidata AS (
            SELECT
                page_id,
                page_title,
                revision_content_slots['main'].content_body AS item_json,
                ROW_NUMBER() OVER (PARTITION BY wiki_id,page_id ORDER BY revision_dt DESC, revision_id DESC) AS revision_number_desc
            FROM wmf_content.mediawiki_content_history_v1
            WHERE wiki_id="wikidatawiki"
            AND page_namespace_id=0
        ),
        wikidata_latest AS (
            select page_id,page_title,item_json from wikidata where revision_number_desc=1
        ),
        wikidata_parsed AS (
            select page_title as item_id,parse_wikidata(item_json) as parsed from wikidata_latest
        )
        select item_id, parsed.links as links, parsed.statements.P18 as p18, parsed.statements.P31 as p31, parsed.statements.P373 as p373 from wikidata_parsed
    """).cache()

... and then

  1. update load_wikidata_items_with_p31(), load_wikidata_items_with_P18() and load_wikidata_items_with_P373() to call load_parsed_wikidata() (instead of doing their own queries) and return the relevant data from the dataframe
  2. update load_wikidata_item_page_links() to call load_parsed_wikidata() instead of doing its own queries and return relevant data from the dataframe, plus update everything that calls it to join on page_title rather than page_id

Re T385787#10657188, It does seem that you won't be able to leverage the cached dataframe because the function returns a new dataframe object each time.

I think you want to call load_parsed_wikidata() once, keep the dataframe on a variable, and pass around the variable to the different load_wikidata_items_with_*() functions.

Also, I would check whether the results of load_parsed_wikidata() are actually cacheable? wikidatawiki is big! Maybe you can cache to disk instead if memory doesn't work.