Page MenuHomePhabricator

Create Airflow job that loads Spur data
Closed, ResolvedPublic

Description

Per previous discussion with @kostajh and @TAdeleye_WMF , we are to consider our hypothesis done when we have a working Airflow job that loads data into OpenSearch.

Now that we have a working OpenSearch test cluster, we can start working on the Airflow job. Creating this ticket to:

  • Create an Airflow job that loads Spur data into OpenSearch (this will be a shared responsibility between Kosta, @brouberol and myself)
  • Verify operation

Details

Other Assignee
kostajh
Related Changes in Gerrit:
SubjectRepoBranchLines +/-
operations/deployment-chartsmaster+4 -4
operations/deployment-chartsmaster+10 -0
operations/deployment-chartsmaster+8 -0
operations/deployment-chartsmaster+11 -3
operations/deployment-chartsmaster+11 -1
operations/deployment-chartsmaster+4 -0
operations/deployment-chartsmaster+1 -1
operations/deployment-chartsmaster+1 -1
operations/deployment-chartsmaster+5 -0
operations/deployment-chartsmaster+1 -1
operations/deployment-chartsmaster+4 -3
operations/deployment-chartsmaster+29 -1
operations/deployment-chartsmaster+5 -0
operations/deployment-chartsmaster+4 -0
Show related patches Customize query in gerrit
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
ipoid: Set a version name for the indexrepos/data-engineering/airflow-dags!1886kharlanipoid-indexmain
ipoid: Write to both data centersrepos/data-engineering/airflow-dags!1872kharlanT408238main
Add the `airflow-provider-opensearch` dependencyrepos/data-engineering/airflow-dags!1786brouberolT408238main
Customize query in GitLab

Event Timeline

Change #1200354 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: allow task pods to egress to the urldownloader hosts

https://gerrit.wikimedia.org/r/1200354

Change #1200357 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: define a connection to the spur.us API

https://gerrit.wikimedia.org/r/1200357

brouberol updated Other Assignee, added: brouberol.

Change #1200354 merged by jenkins-bot:

[operations/deployment-charts@master] airflow-platform-eng: allow task pods to egress to the urldownloader hosts

https://gerrit.wikimedia.org/r/1200354

Change #1200357 merged by jenkins-bot:

[operations/deployment-charts@master] airflow-platform-eng: define a connection to the spur.us API

https://gerrit.wikimedia.org/r/1200357

@kostajh all the network policies and connection configuration has been deployed to airflow-platform-eng. You should be able to retrieve the spur.us data via the following snippet (to be expanded upon and "packaged" as a DAG):

>>> from airflow.providers.http.hooks.http import HttpHook
>>> spurus_hook = HttpHook(http_conn_id="spurus", method="GET")
>>> spurus_conn = spurus_hook.get_connection(conn_id=spurus_hook.http_conn_id)
>>> spurus_hook.run(endpoint="/v2/anonymous-residential/latest", headers={"Token": spurus_conn.password})
[2025-11-03T15:33:10.759+0000] {base.py:84} INFO - Retrieving connection 'spurus'
<Response [200]>

@bking how is the DAG supposed to connect to the opensearch instance? Through basic auth? If so, we need to a) provision a user and b) create a connection for it in the airflow config

Yes, we can use basic auth or mTLS. I'm working on T408919 and I imagine we'll end up with a more generic way to add users from there. In the short term, I'm happy to take this ticket and create an airflow user for opensearch-test.

Nice! Re-assigning you as second owner.

Change #1202186 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow: define a network policy specific to task pods requiring egress to a proxy

https://gerrit.wikimedia.org/r/1202186

Change #1202187 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: enabled jobs properly labeled to egress to the urldownloader proxies

https://gerrit.wikimedia.org/r/1202187

OK, I have created a username/password combo for accessing https://opensearch-test.discovery.wmnet:30443 from airflow.

You should be able to verify access by something like curl -u ${PW} https://opensearch-test.discovery.wmnet:30443/enwikibooks

The airflow user has full rw permissions on all indices, but no cluster access (so you can't do calls like /_cat/indices, you will need the admin user for that).

Assigning back to Balthazar for working on secrets. @brouberol feel free to hand back to me if you aren't able to work on it, I'll try and follow the steps Ben suggested here .

bking updated Other Assignee, added: brouberol; removed: bking.Nov 6 2025, 2:38 PM

Change #1202186 merged by jenkins-bot:

[operations/deployment-charts@master] airflow: define a network policy specific to task pods requiring egress to a proxy

https://gerrit.wikimedia.org/r/1202186

Change #1202187 merged by Brouberol:

[operations/deployment-charts@master] airflow-platform-eng: enabled jobs properly labeled to egress to the urldownloader proxies

https://gerrit.wikimedia.org/r/1202187

@kostajh FIY, I've deployed a change to airflow-platform-eng that only allows pods to egress to the urldownloader hosts (on port 8080) if they are labeled with proxy=urldownloader. This ensures that we don't give indirect internet access to all tasks on that instance.

To ensure that your tasks have this label, you'll need to structure your DAG in the following way:

task_flow_api_dag.py
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator

from kubernetes.client import models as k8s


with DAG(
    dag_id="example",
    ...
) as dag:
    executor_config = {
        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"proxy": "urldownloader"})),
    }

    @task(executor_config=executor_config)
    def python_operator_task():
        ...

or

traditional_api_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator

from kubernetes.client import models as k8s


with DAG(
    dag_id="example",
    ...
) as dag:
    executor_config = {
        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"proxy": "urldownloader"})),
    }

    task = PythonOperator(executor_config=executor_config, ...)

See https://airflow.apache.org/docs/apache-airflow/2.10.5/tutorial/taskflow.html about the 2 different APIs to write a DAG. If you're confused, I suggest using the traditional API for your first DAG. It's less magic.

Here's what I have for a script to download the latest Spur data. (Caveat: I used claude.ai to write this, and have read the code and verified its functionality)

#!/usr/bin/env python3
"""
Download latest Spur.us anonymous residential feed data.
"""

import os
import sys
import json
import urllib.request
import urllib.error
from datetime import datetime


def download_file(url, headers, output_path):
    """Download a file from URL to the specified path."""
    try:
        request = urllib.request.Request(url, headers=headers)
        with urllib.request.urlopen(request) as response:
            # Read in chunks to handle large files efficiently
            chunk_size = 8192
            with open(output_path, 'wb') as f:
                while True:
                    chunk = response.read(chunk_size)
                    if not chunk:
                        break
                    f.write(chunk)
        return True
    except urllib.error.HTTPError as e:
        print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr)
        return False
    except urllib.error.URLError as e:
        print(f"URL Error: {e.reason}", file=sys.stderr)
        return False
    except Exception as e:
        print(f"Error downloading file: {e}", file=sys.stderr)
        return False


def main():
    api_key = os.environ.get('SPUR_API_KEY')
    if not api_key:
        print("Error: SPUR_API_KEY environment variable not set", file=sys.stderr)
        sys.exit(1)
    
    download_dir = os.environ.get('SPUR_DOWNLOAD_DIR', '.')
    if not os.path.exists(download_dir):
        try:
            os.makedirs(download_dir)
        except Exception as e:
            print(f"Error: Could not create download directory: {e}", file=sys.stderr)
            sys.exit(1)
    
    headers = {
        'Token': api_key
    }
    
    metadata_url = "https://feeds.spur.us/v2/anonymous-residential/latest"
    print(f"Fetching metadata from: {metadata_url}")
    
    try:
        request = urllib.request.Request(metadata_url, headers=headers)
        with urllib.request.urlopen(request) as response:
            metadata_content = response.read().decode('utf-8')
    except urllib.error.HTTPError as e:
        print(f"HTTP Error {e.code}: {e.reason}", file=sys.stderr)
        if e.code == 401:
            print("Authentication failed. Please check your SPUR_API_KEY.", file=sys.stderr)
        sys.exit(1)
    except urllib.error.URLError as e:
        print(f"URL Error: {e.reason}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"Error fetching metadata: {e}", file=sys.stderr)
        sys.exit(1)
    
    try:
        metadata = json.loads(metadata_content)
        metadata_json = metadata.get('json', {})
    except json.JSONDecodeError as e:
        print(f"Error parsing metadata JSON: {e}", file=sys.stderr)
        sys.exit(1)
    
    print("\nMetadata received:")
    print(json.dumps(metadata_json, indent=2))
    
    location = metadata_json.get('location')
    if not location:
        print("Error: No 'location' field found in metadata", file=sys.stderr)
        sys.exit(1)
    
    date = metadata_json.get('date', 'unknown')
    filesize = metadata_json.get('filesize', 0)
    uncompressed_filesize = metadata_json.get('uncompressed_filesize', 0)
    line_count = metadata_json.get('line_count', 0)
    
    print(f"\nFile details:")
    print(f"  Date: {date}")
    print(f"  Compressed size: {filesize:,} bytes ({filesize / (1024**3):.2f} GB)")
    print(f"  Uncompressed size: {uncompressed_filesize:,} bytes ({uncompressed_filesize / (1024**3):.2f} GB)")
    print(f"  Line count: {line_count:,}")
    
    file_url = f"https://feeds.spur.us/v2/anonymous-residential/{location}"
    
    filename = os.path.basename(location)
    output_path = os.path.join(download_dir, filename)
    
    print(f"\nDownloading file from: {file_url}")
    print(f"Saving to: {output_path}")
    
    start_time = datetime.now()
    success = download_file(file_url, headers, output_path)
    
    if success:
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        actual_size = os.path.getsize(output_path)
        print(f"\nDownload completed successfully!")
        print(f"  Time taken: {duration:.2f} seconds")
        print(f"  Downloaded size: {actual_size:,} bytes")
        
        if actual_size != filesize:
            print(f"  Warning: Downloaded size ({actual_size}) differs from expected size ({filesize})")
        
        sys.exit(0)
    else:
        print("\nDownload failed!", file=sys.stderr)
        if os.path.exists(output_path):
            try:
                os.remove(output_path)
            except:
                pass
        sys.exit(1)


if __name__ == "__main__":
    main()

And here is a script (also written by claude.ai, but reviewed and verified by me) for importing into OpenSearch in batches of 10,000 records:

#!/usr/bin/env python3
"""
Process Spur.us feed data and index to OpenSearch.
"""

import os
import sys
import json
import gzip
import time
import urllib.request
import urllib.error
from datetime import datetime


def post_bulk_request(url, auth, bulk_data):
    """Send bulk request to OpenSearch."""
    headers = {
        'Content-Type': 'application/x-ndjson',
    }
    
    if auth:
        import base64
        credentials = base64.b64encode(auth.encode('utf-8')).decode('ascii')
        headers['Authorization'] = f'Basic {credentials}'
    
    data = ''.join(bulk_data).encode('utf-8')
    
    try:
        request = urllib.request.Request(url, data=data, headers=headers, method='POST')
        with urllib.request.urlopen(request) as response:
            return response.status, response.read().decode('utf-8')
    except urllib.error.HTTPError as e:
        error_body = e.read().decode('utf-8') if e.fp else ''
        return e.code, error_body
    except Exception as e:
        return None, str(e)


def process_file(file_path, opensearch_url, index_name, auth, batch_size=10000):
    """Process the gzipped JSON file and index to OpenSearch."""
    if not os.path.exists(file_path):
        print(f"Error: File not found: {file_path}", file=sys.stderr)
        return False
    
    bulk_url = f"{opensearch_url}/{index_name}/_bulk"
    
    print(f"Processing file: {file_path}")
    print(f"OpenSearch URL: {opensearch_url}")
    print(f"Index name: {index_name}")
    print(f"Batch size: {batch_size}")
    
    total_lines = 0
    successful_docs = 0
    failed_docs = 0
    batch_count = 0
    
    try:
        with gzip.open(file_path, 'rt', encoding='utf-8') as f:
            bulk_data = []
            line_count = 0
            
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue
                
                try:
                    doc = json.loads(line)
                except json.JSONDecodeError as e:
                    print(f"Warning: Failed to parse JSON at line {line_num}: {e}", file=sys.stderr)
                    failed_docs += 1
                    continue
                
                ip = doc.get('ip')
                if not ip:
                    print(f"Warning: No 'ip' field found at line {line_num}", file=sys.stderr)
                    failed_docs += 1
                    continue
                
                doc['id'] = ip
                doc['@timestamp'] = int(time.time())
                
                action = json.dumps({"index": {"_id": ip}}) + '\n'
                document = json.dumps(doc) + '\n'
                
                bulk_data.extend([action, document])
                line_count += 1
                total_lines += 1
                
                if line_count >= batch_size:
                    batch_count += 1
                    print(f"\nSending batch {batch_count} ({line_count} documents)...")
                    
                    status, response = post_bulk_request(bulk_url, auth, bulk_data)
                    
                    if status == 200 or status == 201:
                        try:
                            result = json.loads(response)
                            if result.get('errors'):
                                for item in result.get('items', []):
                                    if 'index' in item:
                                        if item['index'].get('status') in [200, 201]:
                                            successful_docs += 1
                                        else:
                                            failed_docs += 1
                                            error = item['index'].get('error', {})
                                            print(f"  Document failed: {error.get('type', 'Unknown error')}: {error.get('reason', '')}", file=sys.stderr)
                            else:
                                successful_docs += line_count
                        except json.JSONDecodeError:
                            print(f"  Warning: Could not parse bulk response", file=sys.stderr)
                            successful_docs += line_count
                        
                        print(f"  Batch {batch_count} completed")
                    else:
                        print(f"  Batch {batch_count} failed with status {status}", file=sys.stderr)
                        if response:
                            print(f"  Error: {response}", file=sys.stderr)
                        failed_docs += line_count
                    
                    bulk_data = []
                    line_count = 0
            
            if bulk_data:
                batch_count += 1
                print(f"\nSending final batch {batch_count} ({line_count} documents)...")
                
                status, response = post_bulk_request(bulk_url, auth, bulk_data)
                
                if status == 200 or status == 201:
                    try:
                        result = json.loads(response)
                        if result.get('errors'):
                            for item in result.get('items', []):
                                if 'index' in item:
                                    if item['index'].get('status') in [200, 201]:
                                        successful_docs += 1
                                    else:
                                        failed_docs += 1
                        else:
                            successful_docs += line_count
                    except json.JSONDecodeError:
                        successful_docs += line_count
                    
                    print(f"  Final batch completed")
                else:
                    print(f"  Final batch failed with status {status}", file=sys.stderr)
                    if response:
                        print(f"  Error: {response}", file=sys.stderr)
                    failed_docs += line_count
    
    except Exception as e:
        print(f"Error processing file: {e}", file=sys.stderr)
        return False
    
    print(f"\nProcessing complete:")
    print(f"  Total lines processed: {total_lines:,}")
    print(f"  Successfully indexed: {successful_docs:,}")
    print(f"  Failed: {failed_docs:,}")
    print(f"  Total batches: {batch_count}")
    
    return failed_docs == 0


def main():
    opensearch_url = os.environ.get('OPENSEARCH_URL')
    if not opensearch_url:
        print("Error: OPENSEARCH_URL environment variable not set", file=sys.stderr)
        sys.exit(1)
    
    index_name = os.environ.get('OPENSEARCH_INDEX', 'spur-feed')
    
    username = os.environ.get('OPENSEARCH_USERNAME')
    password = os.environ.get('OPENSEARCH_PASSWORD')
    auth = f"{username}:{password}" if username and password else None
    
    file_path = os.environ.get('SPUR_FEED_FILE')
    if not file_path:
        download_dir = os.environ.get('SPUR_DOWNLOAD_DIR', '.')
        import glob
        pattern = os.path.join(download_dir, '*feed.json.gz')
        files = glob.glob(pattern)
        if files:
            file_path = max(files, key=os.path.getmtime)
            print(f"Auto-detected feed file: {file_path}")
        else:
            print("Error: SPUR_FEED_FILE not set and no feed.json.gz files found", file=sys.stderr)
            sys.exit(1)
    
    try:
        batch_size = int(os.environ.get('BATCH_SIZE', '10000'))
    except ValueError:
        print("Warning: Invalid BATCH_SIZE, using default of 10000", file=sys.stderr)
        batch_size = 10000
    
    success = process_file(file_path, opensearch_url, index_name, auth, batch_size)
    
    if success:
        print("\nIndexing completed successfully!")
        sys.exit(0)
    else:
        print("\nIndexing completed with errors!", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    main()

Thanks @kostajh. I'll see how I can "package" this into an airflow DAG that we'll start testing on platform-eng. Probably next week.

Discussed with @brouberol, I'll have a first pass at this

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1768 works for downloading the data, verified via airflow devenv. Thanks to @brouberol for your help. I'll make another DAG for importing into OpenSearch after https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1786 is merged.

Change #1207121 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow: update the base image to include the opensearch provider

https://gerrit.wikimedia.org/r/1207121

Change #1207121 merged by Brouberol:

[operations/deployment-charts@master] airflow: update the base image to include the opensearch provider

https://gerrit.wikimedia.org/r/1207121

Change #1207132 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: define the opensearch_test connection

https://gerrit.wikimedia.org/r/1207132

Change #1207132 merged by Brouberol:

[operations/deployment-charts@master] airflow-platform-eng: define the opensearch_test connection

https://gerrit.wikimedia.org/r/1207132

Change #1207136 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: fix a tyop in the opensearch_test connection

https://gerrit.wikimedia.org/r/1207136

Change #1207136 merged by Brouberol:

[operations/deployment-charts@master] airflow-platform-eng: fix a typo in the opensearch_test connection

https://gerrit.wikimedia.org/r/1207136

Change #1207137 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: fix a typo in the opensearch_test connection

https://gerrit.wikimedia.org/r/1207137

Change #1207137 merged by Brouberol:

[operations/deployment-charts@master] airflow-platform-eng: fix a typo in the opensearch_test connection

https://gerrit.wikimedia.org/r/1207137

Change #1207140 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: configure SSL for opensearch API communication

https://gerrit.wikimedia.org/r/1207140

Change #1207140 merged by Brouberol:

[operations/deployment-charts@master] airflow-platform-eng: configure SSL for opensearch API communication

https://gerrit.wikimedia.org/r/1207140

Change #1207166 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] opensearch-cluster: give 'opensearch' user access to bulk API

https://gerrit.wikimedia.org/r/1207166

Change #1207166 merged by Bking:

[operations/deployment-charts@master] opensearch-cluster: give 'opensearch' user access to bulk API

https://gerrit.wikimedia.org/r/1207166

btullis merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/1768

OpenSearch IPoid: Set up DAG to download and import Spur data to OpenSearch

Change #1217461 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] Replace temporary opensearch_test connection by staging/prod OS-ipoid

https://gerrit.wikimedia.org/r/1217461

Change #1217461 merged by Brouberol:

[operations/deployment-charts@master] Replace temporary opensearch_test connection by DC-scoped opensearch-ipoid

https://gerrit.wikimedia.org/r/1217461

Change #1217482 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: restore the opensearch_test connection

https://gerrit.wikimedia.org/r/1217482

Change #1217482 merged by Brouberol:

[operations/deployment-charts@master] airflow-platform-eng: restore the opensearch_test connection

https://gerrit.wikimedia.org/r/1217482

Change #1217488 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] airflow-platform-eng: enable egress to dse-k8s-codfw HTTPS ingress

https://gerrit.wikimedia.org/r/1217488

Change #1217488 merged by Brouberol:

[operations/deployment-charts@master] airflow-platform-eng: enable egress to dse-k8s-codfw HTTPS ingress

https://gerrit.wikimedia.org/r/1217488

Change #1217630 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/deployment-charts@master] opensearch-ipoid: increase the cpu and memory of each master in the cluster

https://gerrit.wikimedia.org/r/1217630

Change #1217630 merged by Brouberol:

[operations/deployment-charts@master] opensearch-ipoid: increase the cpu and memory of each master in the cluster

https://gerrit.wikimedia.org/r/1217630