Issue: I'm facing a transient error while running the lift wing topic model which breaks the loop, losing all of the successful results. It happens some times.
This work is for an upcoming quarterly learning session. Data will be presented on November 20th.
1. This worked on the first try for 1-2 days worth of data. When I reran this for 7 days of data, I had to run the code three times before it was successful. The first two times it failedIt failed twice when I ran it for 7 days of data due to a key_error on 'prediction':
```
spark_session = wmf.spark.create_session(type='yarn-large')
select_plugin = '''
SELECT DISTINCT(page_title), language_code, search_id
FROM florez.search_results_for_topics_plugin AS p
LEFT JOIN canonical_data.wikis AS w
ON LEFT(p.database, 2) = w.language_code
'''
plugin_results = spark_session.sql(select_plugin)
cirrus_titles = plugin_results.toPandas()
os.environ.pop('HTTP_PROXY', None)
os.environ.pop('HTTPS_PROXY', None)
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)
os.environ['REQUESTS_CA_BUNDLE'] = "/etc/ssl/certs/wmf-ca-certificates.crt"
inference_url = 'https://inference.discovery.wmnet:30443/v1/models/outlink-topic-model:predict'
headers = {
'Host': 'outlink-topic-model.articletopic-outlink.wikimedia.org',
'Content-Type': 'application/json',
'User-Agent': '<email> -- outlink example'
}
results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
s = requests.Session()
response = s.post(inference_url,
headers=headers,
data=json.dumps({"lang": language, "page_title": title})
)
results.append([response.json()['prediction'], language])
```
I updated the code to add a counter. When I reran this for 7 days of data with a counter, it worked.
```
spark_session = wmf.spark.create_session(type='yarn-large')
select_plugin = '''
SELECT DISTINCT(page_title), language_code, search_id
FROM florez.search_results_for_topics_plugin AS p
LEFT JOIN canonical_data.wikis AS w
ON LEFT(p.database, 2) = w.language_code
'''
plugin_results = spark_session.sql(select_plugin)
cirrus_titles = plugin_results.toPandas()
counter = 0 # Initialize the counter
total_prints = 10 # Define the total number of print statements
results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
s = requests.Session()
response = s.post(inference_url,
headers=headers,
data=json.dumps({"lang": language, "page_title": title})
)
results.append([response.json()['prediction'], language])
# Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
counter += 1
if counter % (len(cirrus_titles) // total_prints) == 0:
print(f"{counter} items done")
```
As it was successful, I tried to run the above code for the select_en and select_ja queries noted below; I ran this code twice for each query and all attempts were unsuccessful. Each time the failure point occurred at different points. So, I adapted the code as noted below.
2. This code adapted with a counter was run once without success after many hours of running:
```
select_en = '''
SELECT DISTINCT(page_title), 'en' as language_code
FROM florez.search_results_for_topics_en
'''
en_results = spark_session.sql(select_en)
en_df = en_results.toPandas()
cirrus_titles = en_df.copy()
counter = 0 # Initialize the counter
total_prints = 10 # Define the total number of print statements
results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
s = requests.Session()
response = s.post(inference_url,
headers=headers,
data=json.dumps({"lang": language, "page_title": title})
)
results.append([response.json()['prediction'], language])
# Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
counter += 1
if counter % (len(cirrus_titles) // total_prints) == 0:
print(f"{counter} items done")
```
ERROR: ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
3. This code adapted with a counter to see progress and an if/else clause was run once and failed after many hours:
```
select_ja = '''
SELECT DISTINCT(page_title), 'ja' as language_code
FROM florez.search_results_for_topics_ja
'''
ja_results = spark_session.sql(select_ja)
ja_df = ja_results.toPandas()
cirrus_titles = ja_df.copy()
counter = 0 # Initialize the counter
total_prints = 10 # Define the total number of print statements
results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
s = requests.Session()
response = s.post(inference_url,
headers=headers,
data=json.dumps({"lang": language, "page_title": title})
)
json_data = response.json()
feature_value = json_data.get('prediction') # Use get to avoid KeyError
# Check if 'prediction' key is present in the response
if feature_value is not None:
results.append([feature_value, language])
else:
# Handle the case when 'prediction' key is not present
results.append([None, language])
# Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
counter += 1
if counter % (len(cirrus_titles) // total_prints) == 0:
print(f"{counter} items done")
```
ERROR: ConnectionError: HTTPSConnectionPool(host='inference.discovery.wmnet', port=30443): Max retries exceeded with url: /v1/models/outlink-topic-model:predict (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f0f5807dc90>: Failed to establish a new connection: [Errno 111] Connection refused'))