Page MenuHomePhabricator

Build next iteration of IPoid using OpenSearch as backend
Open, Stalled, Needs TriagePublic

Description

These are my notes on what it might look like to have an ElasticSearch/OpenSearch-based alternative to the existing data pipeline and request serving app (iPoid-Service) for Spur data. This is a task to consider for future investments in hosting Spur data, and not something that is immediately actionable. See "Open questions" at the end.

Background

Spur provides a feed with a JSONL delimited file on a daily basis; the file contains about 38M lines.

The problem:

  • Maintaining the volume of inserts/deletions/updates on a daily basis without error is hard
  • Reconstructing the data into a relational database format is hard
  • Reconstructing the data and managing updates to the data on a daily basis is harder
  • Avoiding database drift while attempting to stay current with a daily file fluctuates from time to time is difficult

Spur's data feeds product page says:

Our feeds are designed to be easily ingested into most data-lake or cloud-native database solutions.

What if we tried to use a "data-lake" type application for this data, and dropped the relational database approach?

(The notes below reference ElasticSearch, but per @Gehel's feedback, we'd want to build using OpenSearch. These are more or less interchangeable for the purposes of this task.)

ElasticSearch experiment

I followed this guide: Run Elasticsearch locally. I copied the self-signed HTTP cert to my host machine for later use: docker cp elasticsearch:/usr/share/elasticsearch/config/certs/http_ca.crt .

I created a new index, search-ipoid, using the Kibana UI at http://localhost:5601

Then I used a few dozen line long script to ingest the contents of the JSONL file from Spur in batches of 10,000 items. You throw the data at ElasticSearch, and it is clever enough to work out the semantics of each field (for example it recognizes that ip is an IP address, that a date is a date field, etc). Custom mappings are also possible if we wanted.

The only modification I made to the data object was to add a last_seen date field.

Running the import took 50 minutes for initial import of 37,280,000 records. It could probably go faster with some basic optimizations of ElasticSearch and batch counts.

Querying

Queries look like this and are as fast or faster than our MySQL implementation

Results for an IP
{
  "query": {
	"query_string": {
  	"query": "+ip:216.195.64.46"
	}
  }
}
{
  "took": 15,
  "timed_out": false,
  "_shards": {
	"total": 2,
	"successful": 2,
	"skipped": 0,
	"failed": 0
  },
  "hits": {
	"total": {
  	"value": 1,
  	"relation": "eq"
	},
	"max_score": 16.841856,
	"hits": [
  	{
    	"_index": "search-ipoid",
    	"_id": "REDACTED",
    	"_score": 16.841856,
    	"_source": {
      	"tunnels": [
        	{
          	"anonymous": false,
          	"type": "PROXY"
        	}
      	],
      	"as": {
        	"number": 62,
        	"organization": "REDACTED"
      	},
      	"infrastructure": "DATACENTER",
      	"ip": "REDACTED",
      	"organization": "REDACTED",
      	"client": {
        	"count": 12,
        	"concentration": {
          	"country": "REDACTED",
          	"density": 0.3723,
          	"city": "REDACTED",
          	"geohash": "REDACTED",
          	"state": "REDACTED",
          	"skew": 30
        	},
        	"countries": 1,
        	"spread": 79753
      	},
      	"location": {
        	"country": "REDACTED",
        	"city": "REDACTED",
        	"state": "REDACTED"
      	},
      	"update_date": "2024/02/08"
    	}
  	}
	]
  }
}

Query by tunnels type

  "query": {
	"query_string": {
  	"query": "tunnels.type:PROXY"
	}
  }

Takes 133ms to return 10,000+ results.

The querying guide shows what is possible for querying.

Observations

  • Querying by IP or attribute of a record is really fast: 4-20ms when using ip:{ip}, under 50ms when querying by attribute
    • Nested queries are easy using ES - or + operators
  • Automatic inference of types. We could probably adjust the mappings if we need to, but it mostly seems to "just work"
  • The data pipeline ingestion process would become much simpler: download the file, post the contents to ElasticSearch. It will update as needed, and in real-time. We could do this a few times a day, if we wanted to. That sets us up nicely for possible future work that uses the real-time feed (100M+ daily items) that Spur offers.

How to handle deletions of stale date?

We would set or update a last_seen date field for an IP on each import. Then, the client is responsible for setting a range like last_seen:today-1d or setting other ranges as they'd like. This would leave an IP in the index indefinitely, though maybe there would need to be a monthly cleanup (e.g. "delete all items where last_seen is older than 1 month"), depending on how larger the index grows.

To get current data for queries, we'd:

  • Add an updated field when adding items to the index, using the current day as the value
  • Use a query filter to only get records with updated for current date
  • This also lets us preserve historical information--we can see IPs that were in the dataset but no longer in today's, indefinitely. It doesn't let us see changes in metadata over time for a particular IP, but that is probably OK.

How would updates work?

Just run the import script again. The id field is set to the IP, and ElasticSearch handles updating other fields for the record as needed.

Running an update locally took 47m.

Remove the Kubernetes web app deployment, and query the ElasticSearch index directly

We could get rid of the express node app and allow clients to query the index directly

Exports

We can use elasticsearchdump (https://github.com/elasticsearch-dump/elasticsearch-dump) if we need to support exporting data for ingestion elsewhere

Open questions and next steps

Event Timeline

kostajh renamed this task from ipoid-next using OpenSearch as backend to Consider building next iteration of ipoid using OpenSearch as backend.Feb 16 2024, 9:36 AM

Having chatted a bit with @Gehel and @akosiaris, it sounds like "A Ganetti VM in production, dedicated to an ipoid OpenSearch instance. This would be managed by puppet, and means managing the system on bare metal." would be the most realistic option.

I think a next step here would be to figure out roughly what level of resourcing (compute, and people time) commitments this entails, and what a maintenance plan would look like for e.g. updating the service, making sure it works in both data centers, has backups, etc.

Having chatted a bit with @Gehel and @akosiaris, it sounds like "A Ganetti VM in production, dedicated to an ipoid OpenSearch instance. This would be managed by puppet, and means managing the system on bare metal." would be the most realistic option.

I think a next step here would be to figure out roughly what level of resourcing (compute, and people time) commitments this entails, and what a maintenance plan would look like for e.g. updating the service, making sure it works in both data centers, has backups, etc.

I filed T359298: Request creation of ipoid-opensearch VPS project; once that is set up, I will set up the proof of concept importer and allow interested people to access the VPS for testing out queries.

I've set up a VPS on WMCS (T359298) and installed OpenSearch using https://opensearch.org/docs/latest/install-and-configure/install-opensearch/debian/.

I set up these field mappings on the index:

{
  "mappings" : {
    "properties" :  {
      "ip" : {
        "type" : "ip"
      },
      "tunnels.exits" : {
        "type" : "ip"
      },
      "tunnels.entries" : {
        "type" : "ip"
      },
      "client.concentration.geohash": {
        "type": "geo_point"
      }

    }
  }
}

I am not sure that the geohash one is necessary, as I don't think we have any plans to query by that dimension; it might make sense to drop that field.

Then, I installed Benthos and created a config file at $HOME/config.yaml. The relevant parts are:

input:
  label: ""
  read_until:
    idle_timeout: 600s
    input:
      file:
        paths: [ ./feed/feed.json ]
        scanner:
          lines: {}
pipeline:
  threads: -1
  processors:
  - awk:
      codec: none
      program: |
        json_set("@timestamp", timestamp_format(timestamp_unix()));
output:
  opensearch:
    urls: [ https://localhost:9200 ]
    index: "ipoid"
    tls:
      enabled: true
      skip_cert_verify: true
    max_in_flight: 10000
    basic_auth:
      enabled: true
      username: admin
      password: {REDACTED}
    action: "index"
    id: '${! json("ip") }'
    max_in_flight: 64
    batching:
      count: 100
      byte_size: 0
      period: ""
      check: ""

In the "processor" section, we're adding a @timestamp field which is the "IP was last seen at" field that clients can use when querying. (side note: it might make more sense to set this to e.g. "2024-03-07" for all records for a given import.)

I downloaded the daily feed file from Spur and extracted it to feed/feed.json.

Then I ran benthos -c ./config.yaml, and the import completed in about 50 minutes.

kharlan@ipoidopensearch:~$ curl -s --insecure -u $OPENSEARCH_AUTH -XGET "https://localhost:9200/_cat/indices?v"
health status index                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .opensearch-observability    u1VoWihFQy2Bjd6j7Ke3RQ   1   0          0            0       208b           208b
green  open   .plugins-ml-config           PZNpiX67Q32UOiw8LTMSOQ   1   0          1            0      3.9kb          3.9kb
yellow open   security-auditlog-2024.03.07 vpR2a7_WQq-MYXrzs8hXwQ   1   1     149595            0     63.7mb         63.7mb
green  open   .opendistro_security         4rfhhnVHTq26lej7IUh_zA   1   0         10            0     77.7kb         77.7kb
yellow open   ipoid                        0_N5JD_2Qmmpqa4XAb3qDg   1   1   37015900            0      6.9gb          6.9gb

I haven't done any performance tuning.

The next step is to run the import with today's (March 8) feed, and to open up query access using basic auth so that folks can evaluate querying performance.

After running the import with today's data (62 minutes):

kharlan@ipoidopensearch:~$ curl -s --insecure -u $OPENSEARCH_AUTH -XGET "https://localhost:9200/_cat/indices?v"
health status index                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .opensearch-observability    u1VoWihFQy2Bjd6j7Ke3RQ   1   0          0            0       208b           208b
green  open   .plugins-ml-config           PZNpiX67Q32UOiw8LTMSOQ   1   0          1            0      3.9kb          3.9kb
yellow open   security-auditlog-2024.03.07 vpR2a7_WQq-MYXrzs8hXwQ   1   1     149595            0     63.7mb         63.7mb
green  open   .opendistro_security         4rfhhnVHTq26lej7IUh_zA   1   0         10            0     77.7kb         77.7kb
yellow open   ipoid                        0_N5JD_2Qmmpqa4XAb3qDg   1   1   39697418      5896303      8.6gb          8.6gb
kostajh changed the task status from Open to Stalled.May 20 2024, 7:26 PM

Once T362105: EPIC: Mutualized opensearch cluster is done, we can experiment with using that platform.

kostajh renamed this task from Consider building next iteration of ipoid using OpenSearch as backend to Build next iteration of IPoid using OpenSearch as backend.May 20 2024, 7:34 PM

Something else to consider: we could also load IPs from GeoLite2 into the OpenSearch index, and have a field like {"source": "maxmind"} or {"source": "spur"} (for Spur data). But we'd have to make sure that field names don't collide.

Something else to consider: we could also load IPs from GeoLite2 into the OpenSearch index, and have a field like {"source": "maxmind"} or {"source": "spur"} (for Spur data). But we'd have to make sure that field names don't collide.

Actually, we could just place the objects of the City, Country and ASN records for an IP in dedicated fields like geolite2_city, geolite2_country, and geolite2_asn.