Issue: I'm facing a transient error while running the lift wing topic model which breaks the loop, losing all of the successful results. It happens some times.
This work is for an upcoming quarterly learning session. Data will be presented on November 20th.
Updated per Luca's questions:
Tables I'm querying:
a) florez.search_results_for_topics_ja -- result row count: 1,528,720
b) florez.search_results_for_topics_en -- result row count: 7,085,715
Tools I'm using: JupyterLab, Spark on stat hosts to query hive
- I ran code similar to the below for 1-2 days worth of topic data. As it worked, I tried it for 7 days of data using the code below. It failed twice when I ran it for 7 days of data due to a key_error on 'prediction':
import wmfdata as wmf from wmfdata import mariadb, hive, spark from wmfdata.utils import pct_str, pd_display_all import os import json import requests from pandas import json_normalize import pandas as pd import datetime as dt spark_session = wmf.spark.create_session(type='yarn-large') select_plugin = ''' SELECT DISTINCT(page_title), language_code, search_id FROM florez.search_results_for_topics_plugin AS p LEFT JOIN canonical_data.wikis AS w ON LEFT(p.database, 2) = w.language_code ''' plugin_results = spark_session.sql(select_plugin) cirrus_titles = plugin_results.toPandas() os.environ.pop('HTTP_PROXY', None) os.environ.pop('HTTPS_PROXY', None) os.environ.pop('http_proxy', None) os.environ.pop('https_proxy', None) os.environ['REQUESTS_CA_BUNDLE'] = "/etc/ssl/certs/wmf-ca-certificates.crt" inference_url = 'https://inference.discovery.wmnet:30443/v1/models/outlink-topic-model:predict' headers = { 'Host': 'outlink-topic-model.articletopic-outlink.wikimedia.org', 'Content-Type': 'application/json', 'User-Agent': '<email> -- outlink example' } results = [] for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']): s = requests.Session() response = s.post(inference_url, headers=headers, data=json.dumps({"lang": language, "page_title": title}) ) results.append([response.json()['prediction'], language])
I updated the code to add a counter. When I reran this for 7 days of data with a counter, it worked.
spark_session = wmf.spark.create_session(type='yarn-large') select_plugin = ''' SELECT DISTINCT(page_title), language_code, search_id FROM florez.search_results_for_topics_plugin AS p LEFT JOIN canonical_data.wikis AS w ON LEFT(p.database, 2) = w.language_code ''' plugin_results = spark_session.sql(select_plugin) cirrus_titles = plugin_results.toPandas() counter = 0 # Initialize the counter total_prints = 10 # Define the total number of print statements results = [] for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']): s = requests.Session() response = s.post(inference_url, headers=headers, data=json.dumps({"lang": language, "page_title": title}) ) results.append([response.json()['prediction'], language]) # Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count counter += 1 if counter % (len(cirrus_titles) // total_prints) == 0: print(f"{counter} items done")
As it was successful, I tried to run the above code for the select_en and select_ja queries noted below; I ran this code twice for each query and all attempts were unsuccessful. Each time the failure point occurred at different points. So, I adapted the code as noted below.
- This code was run once without success after many hours of running:
select_en = ''' SELECT DISTINCT(page_title), 'en' as language_code FROM florez.search_results_for_topics_en ''' en_results = spark_session.sql(select_en) en_df = en_results.toPandas() cirrus_titles = en_df.copy() counter = 0 # Initialize the counter total_prints = 10 # Define the total number of print statements results = [] for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']): s = requests.Session() response = s.post(inference_url, headers=headers, data=json.dumps({"lang": language, "page_title": title}) ) results.append([response.json()['prediction'], language]) # Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count counter += 1 if counter % (len(cirrus_titles) // total_prints) == 0: print(f"{counter} items done")
ERROR: ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
- This code adapted with an if/else clause was run once and failed after many hours:
select_ja = ''' SELECT DISTINCT(page_title), 'ja' as language_code FROM florez.search_results_for_topics_ja ''' ja_results = spark_session.sql(select_ja) ja_df = ja_results.toPandas() cirrus_titles = ja_df.copy() counter = 0 # Initialize the counter total_prints = 10 # Define the total number of print statements results = [] for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']): s = requests.Session() response = s.post(inference_url, headers=headers, data=json.dumps({"lang": language, "page_title": title}) ) json_data = response.json() feature_value = json_data.get('prediction') # Use get to avoid KeyError # Check if 'prediction' key is present in the response if feature_value is not None: results.append([feature_value, language]) else: # Handle the case when 'prediction' key is not present results.append([None, language]) # Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count counter += 1 if counter % (len(cirrus_titles) // total_prints) == 0: print(f"{counter} items done")
ERROR: ConnectionError: HTTPSConnectionPool(host='inference.discovery.wmnet', port=30443): Max retries exceeded with url: /v1/models/outlink-topic-model:predict (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f0f5807dc90>: Failed to establish a new connection: [Errno 111] Connection refused'))