Page MenuHomePhabricator
Authored By
Cparle
Oct 8 2021, 4:48 PM
Size
4 KB
Referenced Files
None
Subscribers
None

T286562.py

# wikidata items that commons files are linked from are stored in hdfs:/user/cparle/commons_files_related_wikidata_items
#
# read the data from hdfs, and push it to the commons elasticsearch index on relforge
#
# for more info on the data, and the fields to which it is being pushed, see https://phabricator.wikimedia.org/T286562
import json
import sys
import numpy as np
import requests
import logging
import pyspark
import pyspark.sql
spark = pyspark.sql.SparkSession.builder.getOrCreate()
def main():
commonsDF = spark.read.load('hdfs:/user/cparle/commons_files_related_wikidata_items')
url = 'https://relforge1003.eqiad.wmnet:9243/commonswiki_file_t286562/_bulk'
count = 0
allData = []
for row in commonsDF.toLocalIterator():
data1 = json.loads('{"update":{"_type":"page"}}');
data1['update']['_id'] = str(row['page_id'])
allData.append(json.dumps(data1))
weightedTags = []
if row['reverse_p18'] is not None:
for p18 in row['reverse_p18']:
weightedTags.append( 'image.linked.from.wikidata.p18/' + p18 )
if row['reverse_p373'] is not None:
for p373 in row['reverse_p373']:
split = p373.split('|')
# first part of field contains a wikidata item-id linked via P373 (commons category) to any commons category that the commons file belongs to
qid = split[0]
# second part of field contains the number of articles in the commons category
pages_in_category = int(split[1])
if ( pages_in_category > 0 ):
# we want the score of this field to be inversely proportional to the number of pages in category
# i.e. if an image is one of 5 in a category, then we care more about that category for searching purposes than a category with 10k images
# ... when calculating the score we take a log of the number of pages + 1 (adding 1 to make sure the score is positive),
# and multiply by 1000/1.443 to give us a maximum score of 1000
pages_in_category_score = int( round( 1/np.log( pages_in_category + 1 ) * 1000/1.443) )
weightedTags.append( 'image.linked.from.wikidata.p373/' + qid + '|' + str( pages_in_category_score ) )
if row['container_page_qids'] is not None:
# we want the score of this field to be proportional to how important the page containing the image is
# we measure importance by summing all the incoming links to pages with the relevant Q-id across all wikis
qids_with_incoming_links = {}
for sitelink in row['container_page_qids']:
split = sitelink.split('|')
# first part of field contains a wikidata item-id for a page that contains the commons file
qid = split[0]
# second part of field contains the wiki the page is on
# third part of field contains the number of incoming links to the page
incoming_link_count = int(split[2])
if not qid in qids_with_incoming_links:
qids_with_incoming_links[qid] = incoming_link_count
else:
qids_with_incoming_links[qid] += incoming_link_count
for qid in qids_with_incoming_links:
# take a log to get the final score, and make sure the max score is 1000
score = min( 1000, int( 100 * round( np.log( qids_with_incoming_links[qid]), 3 ) ) )
weightedTags.append( 'image.linked.from.wikidata.sitelink/' + qid + '|' + str( score ) )
allData.append('{"doc":{"weighted_tags":' + json.dumps(weightedTags) + '}}')
count += 1
if ( count % 100 == 0):
dataAsJson = "\n".join(allData) + "\n"
response = requests.post( url, data=dataAsJson, headers={"Content-Type": "application/x-ndjson"} )
logging.info('cormac: data sent to ' + str(count) + ' documents')
logging.info('cormac: latest ' + str(row['page_id']))
logging.info('cormac: ' + json.dumps(weightedTags))
allData = []
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
sys.exit(main())

File Metadata

Mime Type
text/x-python
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
9199772
Default Alt Text
T286562.py (4 KB)

Event Timeline