Page MenuHomePhabricator

Transient error while running lift wing topic model
Closed, ResolvedPublic

Description

Issue: I'm facing a transient error while running the lift wing topic model which breaks the loop, losing all of the successful results. It happens some times.

This work is for an upcoming quarterly learning session. Data will be presented on November 20th.

Updated per Luca's questions:
Tables I'm querying:
a) florez.search_results_for_topics_ja -- result row count: 1,528,720
b) florez.search_results_for_topics_en -- result row count: 7,085,715
Tools I'm using: JupyterLab, Spark on stat hosts to query hive

  1. I ran code similar to the below for 1-2 days worth of topic data. As it worked, I tried it for 7 days of data using the code below. It failed twice when I ran it for 7 days of data due to a key_error on 'prediction':
import wmfdata as wmf
from wmfdata import mariadb, hive, spark
from wmfdata.utils import pct_str, pd_display_all

import os
import json
import requests

from pandas import json_normalize 
import pandas as pd
import datetime as dt

spark_session = wmf.spark.create_session(type='yarn-large') 

select_plugin = '''
SELECT DISTINCT(page_title), language_code, search_id 
FROM florez.search_results_for_topics_plugin AS p
LEFT JOIN canonical_data.wikis AS w
ON LEFT(p.database, 2) = w.language_code
'''
plugin_results = spark_session.sql(select_plugin)
cirrus_titles = plugin_results.toPandas()

os.environ.pop('HTTP_PROXY', None)
os.environ.pop('HTTPS_PROXY', None)
os.environ.pop('http_proxy', None)
os.environ.pop('https_proxy', None)
os.environ['REQUESTS_CA_BUNDLE'] = "/etc/ssl/certs/wmf-ca-certificates.crt"

inference_url = 'https://inference.discovery.wmnet:30443/v1/models/outlink-topic-model:predict'
headers = {
    'Host': 'outlink-topic-model.articletopic-outlink.wikimedia.org',
    'Content-Type': 'application/json',
    'User-Agent': '<email> -- outlink example'
}

results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
    s = requests.Session()
    response = s.post(inference_url, 
                             headers=headers, 
                             data=json.dumps({"lang": language, "page_title": title})
                            )
    results.append([response.json()['prediction'], language])

I updated the code to add a counter. When I reran this for 7 days of data with a counter, it worked.

spark_session = wmf.spark.create_session(type='yarn-large') 

select_plugin = '''
SELECT DISTINCT(page_title), language_code, search_id 
FROM florez.search_results_for_topics_plugin AS p
LEFT JOIN canonical_data.wikis AS w
ON LEFT(p.database, 2) = w.language_code
'''
plugin_results = spark_session.sql(select_plugin)
cirrus_titles = plugin_results.toPandas()

counter = 0  # Initialize the counter
total_prints = 10  # Define the total number of print statements

results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
    s = requests.Session()
    response = s.post(inference_url, 
                             headers=headers, 
                             data=json.dumps({"lang": language, "page_title": title})
                            )
    results.append([response.json()['prediction'], language])
    # Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
    counter += 1
    if counter % (len(cirrus_titles) // total_prints) == 0:
        print(f"{counter} items done")

As it was successful, I tried to run the above code for the select_en and select_ja queries noted below; I ran this code twice for each query and all attempts were unsuccessful. Each time the failure point occurred at different points. So, I adapted the code as noted below.

  1. This code was run once without success after many hours of running:
select_en = '''
SELECT DISTINCT(page_title), 'en' as language_code
FROM florez.search_results_for_topics_en
'''
en_results = spark_session.sql(select_en)
en_df = en_results.toPandas()
cirrus_titles = en_df.copy()

counter = 0  # Initialize the counter
total_prints = 10  # Define the total number of print statements

results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
    s = requests.Session()
    response = s.post(inference_url, 
                             headers=headers, 
                             data=json.dumps({"lang": language, "page_title": title})
                            )
    results.append([response.json()['prediction'], language])
    # Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
    counter += 1
    if counter % (len(cirrus_titles) // total_prints) == 0:
        print(f"{counter} items done")

ERROR: ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

  1. This code adapted with an if/else clause was run once and failed after many hours:
select_ja = '''
SELECT DISTINCT(page_title), 'ja' as language_code
FROM florez.search_results_for_topics_ja
 '''
ja_results = spark_session.sql(select_ja)
ja_df = ja_results.toPandas()
cirrus_titles = ja_df.copy()

counter = 0  # Initialize the counter
total_prints = 10  # Define the total number of print statements

results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
    s = requests.Session()
    response = s.post(inference_url, 
                             headers=headers, 
                             data=json.dumps({"lang": language, "page_title": title})
                            )
    
    json_data = response.json()
    feature_value = json_data.get('prediction')  # Use get to avoid KeyError
    # Check if 'prediction' key is present in the response
    if feature_value is not None:
        results.append([feature_value, language])
    else:
        # Handle the case when 'prediction' key is not present
        results.append([None, language])
        
        
    
    # Increment the counter and print progress, ensuring only 10 print statements for a df, regardless of row/item count
    counter += 1
    if counter % (len(cirrus_titles) // total_prints) == 0:
        print(f"{counter} items done")

ERROR: ConnectionError: HTTPSConnectionPool(host='inference.discovery.wmnet', port=30443): Max retries exceeded with url: /v1/models/outlink-topic-model:predict (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f0f5807dc90>: Failed to establish a new connection: [Errno 111] Connection refused'))

Event Timeline

I spoke with Tobias K over Slack today. He recommended the following which I'm going to try:

....wrap the inner part of the loop in a try: except: block that would log what specific request failed. That way, it could continue on (presuming the request is the issue, and not API downtime). Those requests might be retried later....

Here's my draft code for Tobias' suggestion above:

import logging

# Set up logging
logging.basicConfig(filename='error_log.txt', level=logging.ERROR)

results = []
for language, title in zip(cirrus_titles['language_code'], cirrus_titles['page_title']):
    s = requests.Session()
    try:
        response = s.post(inference_url, 
                          headers=headers, 
                          data=json.dumps({"lang": language, "page_title": title})
                         )
        response.raise_for_status()  # Raise an HTTPError for bad responses
        json_data = response.json()
        feature_value = json_data.get('prediction')  # Use get to avoid KeyError
        
        # Check if 'feature' key is present in the response
        if feature_value is not None:
            results.append([feature_value, language])
        else:
            # Handle the case when 'feature' key is not present
            results.append([None, language])

    except requests.exceptions.RequestException as e:
        # Log the error details
        logging.error(f"Error in request for language: {language}, title: {title}. Error: {e}")
        
    except Exception as e:
        # Log any other unexpected exceptions
        logging.error(f"Unexpected error in request for language: {language}, title: {title}. Error: {e}")

    # Increment the counter and print progress, ensuring only 10 print statements for df, regardless of row/item count
    counter += 1
    if counter % (len(cirrus_titles) // total_prints) == 0:
        print(f"{counter} items done")

Luca has raised a few questions that may reveal relevant information:

Where is the code running from?
How many requests are issued? and how many of them in parallel?
Spark is mentioned, does it run on the Hadoop cluster or from a jupyter notebook? It is relevant since the code uses "Requests" that is not async, but multiple requests may be fired from multiple hadoop workers at the same time (and outlink topic model may suffer).

If I understand the code correctly, the queries to the model are not done in parallel (and they should not be, as the model can't handle (much) parallel requests). Looking at Grafana[1], there is some traffic and scaling happening, which leads me to believe that just retrying the failing requests a few times (with maybe a little increasing delay/backoff) might solve the issues.

[1] https://grafana.wikimedia.org/d/-D2KNUEGk/kubernetes-pod-details?from=now-2d&orgId=1&to=now&var-container=All&var-datasource=eqiad%20prometheus%2Fk8s-mlserve&var-namespace=articletopic-outlink&var-pod=All

Luca has raised a few questions that may reveal relevant information:

Where is the code running from?
How many requests are issued? and how many of them in parallel?
Spark is mentioned, does it run on the Hadoop cluster or from a jupyter notebook? It is relevant since the code uses "Requests" that is not async, but multiple requests may be fired from multiple hadoop workers at the same time (and outlink topic model may suffer).

The code is running from a Jupyter notebook on a stat100X node. Spark is used to query Hive and results in a dataset with 75092 rows. The loop attempts to retrieve topic predictions for those 75K rows one by one.

cirrus_titles['page_title'].nunique()
# 33960

@Iflorez: you could half the number of requests you're making by getting the topic predictions for pages and then joining the results with the table that has the search IDs. I think I remember Chris, Aiko, or Tobias mention that prediction caching is not yet implemented but is coming eventually, so not asking the model to make predictions for the same page multiple times would help a lot.

@klausman: would introducing a slight delay (e.g. 0.5s) between each of the 34K requests help?

@Iflorez: alternatively you could add a "predicted" column of NAs/nulls to the dataset and modify the code so that it only requests predictions when predicted is NA/null and make it so that exceptions are handled gracefully (that is, the loop goes on). After the first run most of the NAs/nulls should be gone and replaced with outputs from the model. Then when you run the same code a second time it tries to get predictions where there are none yet. In theory it should take 1-3 runs of the code to fully fill out the predictions. Then store the data either in Hive or in file.

@mpopov thank you for your suggestions, I'm mulling them over and considering integrating with the below.

Fabian K recommends using a UDF and spark cache to speed up the process. A test of the below was successful, taking 1.6hours to run (considerably less time than previous attempts and successful runs). @klausman How many workers can I set for this sort of task? the next and final pull has ~4 times as many titles to query.

select_ja = '''
SELECT DISTINCT(page_title), 'ja' as language_code
FROM florez.search_results_for_topics_ja
 '''
cirrus_titles = spark_session.sql(select_ja)
cirrus_titles= cirrus_titles.cache()

#%%
# Set up logging
import logging
logging.basicConfig(filename='error_log.txt', level=logging.ERROR)

inference_url = 'https://inference.discovery.wmnet:30443/v1/models/outlink-topic-model:predict'
headers = {
    'Host': 'outlink-topic-model.articletopic-outlink.wikimedia.org',
    'Content-Type': 'application/json',
    'User-Agent': '<email> -- outlink example'
}

@F.udf(returnType="string")
def query_topics(language, title):
    import json
    import requests
    s = requests.Session()
    feature_value=None
    try:
        response = s.post(inference_url,
                          headers=headers,
                          data=json.dumps({"lang": language, "page_title": title})
                         )
        response.raise_for_status()  # Raise an HTTPError for bad responses
        json_data = response.json()
        feature_value = json_data.get('prediction')  # Use get to avoid KeyError


    except requests.exceptions.RequestException as e:
        # Log the error details
        logging.error(f"Error in request for language: {language}, title: {title}. Error: {e}")

    except Exception as e:
        # Log any other unexpected exceptions
        logging.error(f"Unexpected error in request for language: {language}, title: {title}. Error: {e}")


    return feature_value

#%%

# coordinate with ml platform to get their input
numer_of_workers_querying_liftwing = 40

results = (cirrus_titles
    .repartition(numer_of_workers_querying_liftwing)
    .withColumn("prediction",query_topics("language_code","page_title"))
)

@klausman: would introducing a slight delay (e.g. 0.5s) between each of the 34K requests help?

This is hard to say, On the one hand, a slight delay would sure help, but we also don't want to make the eventual big job Irene mentioned take days and days. From our graphs, it looks like with no delay, the code produces about 240qps, so if you add a 500ms delay, it will slow to a crawl. I'd start with something smaller, maybe 50ms, see how much progress it makes over, say, 10 minutes, and then use that info to see if it is still fast enough. At the same time, count how many Connection Resets you get, to see if it helps with the error rate.

@klausman How many workers can I set for this sort of task? the next and final pull has ~4 times as many titles to query.

@achou mentioned that in the past, she used Medium sized jobs with this model, but I think yours here was Large? Medium size might be a better fit, if it still works (re: memory etc) for this use case.

It is of course an option to do both (less parallelism and the delay Mikhail mentioned), but make sure it still fits within your deadline.

Ah, correction: Mikhail mentions that the actual querying of LW is only done one request at a time (the Spark/Large bits are for getting the input data, IIUC). So the delay option seems the most promising.

In the meantime I will try to figure out why the Connection Resets happen, since obviously this should not be the case. I presume your Agent string as in the code above (...outlink example) is what you are still using? It helps a lot weeding out the log lines from other services like EventGate.

calbon triaged this task as Medium priority.Nov 28 2023, 3:51 PM