These are my notes on what it might look like to have an ElasticSearch/OpenSearch-based alternative to the existing data pipeline and request serving app (iPoid-Service) for Spur data. This is a task to consider for future investments in hosting Spur data, and not something that is immediately actionable. See "Open questions" at the end.
Background
Spur provides a feed with a JSONL delimited file on a daily basis; the file contains about 38M lines.
The problem:
- Maintaining the volume of inserts/deletions/updates on a daily basis without error is hard
- Reconstructing the data into a relational database format is hard
- Reconstructing the data and managing updates to the data on a daily basis is harder
- Avoiding database drift while attempting to stay current with a daily file fluctuates from time to time is difficult
Spur's data feeds product page says:
Our feeds are designed to be easily ingested into most data-lake or cloud-native database solutions.
What if we tried to use a "data-lake" type application for this data, and dropped the relational database approach?
(The notes below reference ElasticSearch, but per @Gehel's feedback, we'd want to build using OpenSearch. These are more or less interchangeable for the purposes of this task.)
ElasticSearch experiment
I followed this guide: Run Elasticsearch locally. I copied the self-signed HTTP cert to my host machine for later use: docker cp elasticsearch:/usr/share/elasticsearch/config/certs/http_ca.crt .
I created a new index, search-ipoid, using the Kibana UI at http://localhost:5601
Then I used a few dozen line long script to ingest the contents of the JSONL file from Spur in batches of 10,000 items. You throw the data at ElasticSearch, and it is clever enough to work out the semantics of each field (for example it recognizes that ip is an IP address, that a date is a date field, etc). Custom mappings are also possible if we wanted.
The only modification I made to the data object was to add a last_seen date field.
Running the import took 50 minutes for initial import of 37,280,000 records. It could probably go faster with some basic optimizations of ElasticSearch and batch counts.
Querying
Queries look like this and are as fast or faster than our MySQL implementation
Results for an IP
{ "query": { "query_string": { "query": "+ip:216.195.64.46" } } }
{ "took": 15, "timed_out": false, "_shards": { "total": 2, "successful": 2, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 16.841856, "hits": [ { "_index": "search-ipoid", "_id": "REDACTED", "_score": 16.841856, "_source": { "tunnels": [ { "anonymous": false, "type": "PROXY" } ], "as": { "number": 62, "organization": "REDACTED" }, "infrastructure": "DATACENTER", "ip": "REDACTED", "organization": "REDACTED", "client": { "count": 12, "concentration": { "country": "REDACTED", "density": 0.3723, "city": "REDACTED", "geohash": "REDACTED", "state": "REDACTED", "skew": 30 }, "countries": 1, "spread": 79753 }, "location": { "country": "REDACTED", "city": "REDACTED", "state": "REDACTED" }, "update_date": "2024/02/08" } } ] } }
Query by tunnels type
"query": { "query_string": { "query": "tunnels.type:PROXY" } }
Takes 133ms to return 10,000+ results.
The querying guide shows what is possible for querying.
Observations
- Querying by IP or attribute of a record is really fast: 4-20ms when using ip:{ip}, under 50ms when querying by attribute
- Nested queries are easy using ES - or + operators
- Automatic inference of types. We could probably adjust the mappings if we need to, but it mostly seems to "just work"
- The data pipeline ingestion process would become much simpler: download the file, post the contents to ElasticSearch. It will update as needed, and in real-time. We could do this a few times a day, if we wanted to. That sets us up nicely for possible future work that uses the real-time feed (100M+ daily items) that Spur offers.
- The querying code could probably just go away. One can use ElasticSearch queries to get a record for an IP, or get a list of VPNs. (See https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html about paging)
How to handle deletions of stale date?
We would set or update a last_seen date field for an IP on each import. Then, the client is responsible for setting a range like last_seen:today-1d or setting other ranges as they'd like. This would leave an IP in the index indefinitely, though maybe there would need to be a monthly cleanup (e.g. "delete all items where last_seen is older than 1 month"), depending on how larger the index grows.
To get current data for queries, we'd:
- Add an updated field when adding items to the index, using the current day as the value
- Use a query filter to only get records with updated for current date
- This also lets us preserve historical information--we can see IPs that were in the dataset but no longer in today's, indefinitely. It doesn't let us see changes in metadata over time for a particular IP, but that is probably OK.
How would updates work?
Just run the import script again. The id field is set to the IP, and ElasticSearch handles updating other fields for the record as needed.
Running an update locally took 47m.
Remove the Kubernetes web app deployment, and query the ElasticSearch index directly
We could get rid of the express node app and allow clients to query the index directly
Exports
We can use elasticsearchdump (https://github.com/elasticsearch-dump/elasticsearch-dump) if we need to support exporting data for ingestion elsewhere
Open questions and next steps
- Where would the OpenSearch instance live?
Existing production search cluster- Search team doesn't want to add more indices to the production cluster.
- Possible solution: API feature usage and Toolforge/Toolhub indices move to a new "Misc" production cluster instance, and the ipoid index goes there too
A WikiKube deployment- Kubernetes docs for OpenSearch https://opensearch.org/docs/2.0/clients/k8s-operator/
- out of scope per https://wikitech.wikimedia.org/wiki/Kubernetes/Clusters#Goal, which says that stateful applications cannot be deployed in WikiKube
- the https://wikitech.wikimedia.org/wiki/Kubernetes/Clusters#dse-k8s cluster might be a fit, however
- A Ganeti VM in production, dedicated to an ipoid OpenSearch instance. This would be managed by puppet, and means managing the system on bare metal. Who would maintain it?
- How would we implement the data pipeline?
- The code itself is a few dozen lines and could be written in any scripting language. We'd probably want to use https://opensearch.org/docs/latest/data-prepper/.
- Maybe we'd want to use Airflow
- Or a kubernetes cron job that manages the daily updates, in the same way we currently have it set up in #ipoid?
- What level of flexibiliy is possible in querying by IPv6 addresses? (T354758: Data not retrieved by IPInfo from IPoid for IPv6 addresses)