Issue: I'm facing a transient error while running the lift wing topic model which breaks the loop, losing all of the successful results. It happens some times.
This work is for an upcoming quarterly learning session. Data will be presented on November 20th.
Updated per Luca's questions:
Tables I'm querying:
1a) florez.search_results_for_topics_ja -- result row count: 1,528,720
b) florez.search_results_for_topics_en -- result row count: 1,528,7207,085,715
2) florez.search_results_for_topics_en
result row count:
7,085,715
Tools I'm using:
JupyterLab, Spark on stat hosts to query hive
1. I ran code similar to the below for 1-2 days worth of topic data. As it worked, I tried it for 7 days of data using the code below. It failed twice when I ran it for 7 days of data due to a key_error on 'prediction':
```lang=python
import wmfdata as wmf
from wmfdata import mariadb, hive, spark
from wmfdata.utils import pct_str, pd_display_all
import os
import json
import requests
from pandas import json_normalize
import pandas as pd
import datetime as dt
spark_session = wmf.spark.create_session(type='yarn-large')
select_plugin = '''
SELECT DISTINCT(page_title), language_code, search_id
FROM florez.search_results_for_topics_plugin AS p
LEFT JOIN canonical_data.wikis AS w
ON LEFT(p.database, 2) = w.language_code
'''
plugin_results = spark_session.sql(select_plugin)
cirrus_titles = plugin_results.toPandas()
os.environ.pop('HTTP_PROXY', None)
os.environ.pop('HTTPS_PROXY', None)
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)
os.environ['REQUESTS_CA_BUNDLE'] = "/etc/ssl/certs/wmf-ca-certificates.crt"
inference_url = 'https://inference.discovery.wmnet:30443/v1/models/outlink-topic-model:predict'
headers = {
'Host': 'outlink-topic-model.articletopic-outlink.wikimedia.org',
'Content-Type': 'application/json',
'User-Agent': '<email> -- outlink example'
}
results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
s = requests.Session()
response = s.post(inference_url,
headers=headers,
data=json.dumps({"lang": language, "page_title": title})
)
results.append([response.json()['prediction'], language])
```
I updated the code to add a counter. When I reran this for 7 days of data with a counter, it worked.
```lang=python
spark_session = wmf.spark.create_session(type='yarn-large')
select_plugin = '''
SELECT DISTINCT(page_title), language_code, search_id
FROM florez.search_results_for_topics_plugin AS p
LEFT JOIN canonical_data.wikis AS w
ON LEFT(p.database, 2) = w.language_code
'''
plugin_results = spark_session.sql(select_plugin)
cirrus_titles = plugin_results.toPandas()
counter = 0 # Initialize the counter
total_prints = 10 # Define the total number of print statements
results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
s = requests.Session()
response = s.post(inference_url,
headers=headers,
data=json.dumps({"lang": language, "page_title": title})
)
results.append([response.json()['prediction'], language])
# Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
counter += 1
if counter % (len(cirrus_titles) // total_prints) == 0:
print(f"{counter} items done")
```
As it was successful, I tried to run the above code for the select_en and select_ja queries noted below; I ran this code twice for each query and all attempts were unsuccessful. Each time the failure point occurred at different points. So, I adapted the code as noted below.
2. This code was run once without success after many hours of running:
```lang=python
select_en = '''
SELECT DISTINCT(page_title), 'en' as language_code
FROM florez.search_results_for_topics_en
'''
en_results = spark_session.sql(select_en)
en_df = en_results.toPandas()
cirrus_titles = en_df.copy()
counter = 0 # Initialize the counter
total_prints = 10 # Define the total number of print statements
results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
s = requests.Session()
response = s.post(inference_url,
headers=headers,
data=json.dumps({"lang": language, "page_title": title})
)
results.append([response.json()['prediction'], language])
# Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
counter += 1
if counter % (len(cirrus_titles) // total_prints) == 0:
print(f"{counter} items done")
```
ERROR: ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
3. This code adapted with an if/else clause was run once and failed after many hours:
```lang=python
select_ja = '''
SELECT DISTINCT(page_title), 'ja' as language_code
FROM florez.search_results_for_topics_ja
'''
ja_results = spark_session.sql(select_ja)
ja_df = ja_results.toPandas()
cirrus_titles = ja_df.copy()
counter = 0 # Initialize the counter
total_prints = 10 # Define the total number of print statements
results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
s = requests.Session()
response = s.post(inference_url,
headers=headers,
data=json.dumps({"lang": language, "page_title": title})
)
json_data = response.json()
feature_value = json_data.get('prediction') # Use get to avoid KeyError
# Check if 'prediction' key is present in the response
if feature_value is not None:
results.append([feature_value, language])
else:
# Handle the case when 'prediction' key is not present
results.append([None, language])
# Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
counter += 1
if counter % (len(cirrus_titles) // total_prints) == 0:
print(f"{counter} items done")
```
ERROR: ConnectionError: HTTPSConnectionPool(host='inference.discovery.wmnet', port=30443): Max retries exceeded with url: /v1/models/outlink-topic-model:predict (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f0f5807dc90>: Failed to establish a new connection: [Errno 111] Connection refused'))