Page MenuHomePhabricator

Make oozie swift upload emit event to Kafka about swift object upload complete
Closed, ResolvedPublic8 Estimated Story Points

Description

This will allow for consumers to subscribe to Kafka topics to trigger action when an object is ready for consumption and processing.

Event Timeline

I currently have three use cases for this functionality:

  1. Export bulk data updates for all wikis from analytics, import in production. ex: popularity export
  1. Export a single complete index as multiple files from analytics. Import to production and perform a promotion procedure once all files are imported. ex: pre-computed query suggestions
  1. Export an ML model and some metadata. Import to production apis and potentially pass of to an external promotion procedure.

Considerations:

  • Messages have to be acked, or they will be retried. Thus the amount of work to be performed for a single kafka message should ideally be in the single digit minutes range.

This leads me to thinking we need two different ways of choosing messages to send.

Use case 1 will take 30+ hours to perform a full import. It needs to be broken up into many small messages so they can have reasonable retry mechanics, and so multiple consumers can work on the queue. The messages in this case should be per-file, giving the export job control over how many messages to produce.

Use case 2 and 3 need to perform much less time intensive tasks to import the size of data, and need to perform some action when the import is complete and ready to use inside elasticsearch. The messages in this case should be a single kafka message with a swift prefix that returns the full directory upload.

I've implemented 1 & 2 in https://gerrit.wikimedia.org/r/521368 and https://gerrit.wikimedia.org/r/522190. Essentially they expect the following json schema. I haven't written anything to produce these messages yet. They have per-swift container configuration that triggers different behaviour. It is expected that object_prefix can refer to either a directory or a single file. Any additional metadata, such as will be required for the ML models, will probably have to be a specifically named metadata.json file or some such uploaded along with the model and referenced by the same object_prefix.

{
  "type": "object",
  "additionalProperties": false,
  "required": ["container", "object_prefix"],
  "properties": {
    "container": {"type": "string"},
    "object_prefix": {"type": "string"}
  }
}
Milimetric moved this task from Incoming to Machine Learning Platform on the Analytics board.

Ok, from discussions with Erik today, we are going with an event like:

{
  "$schema": "/swift/upload/complete/1.0.0",

  "meta": {
    "dt": "2019-07-23T22:52:28.816876Z",
    "id": "6b13a837-b117-471e-a823-394a0c4c825f",
    "stream": "analytics.swift.upload-complete",
    "uri": "https://ms-fe.svc.eqiad.wmnet/v1/AUTH_analytics/ottotest9?prefix=location_test1"
  },
  "dataset_dt": "2019-07-23T00:00:00Z",
  "swift_account": "analytics",
  "swift_container": "ottotest9",
  "swift_object_prefix": "location_test1",
  "swift_object_urls": [
    "https://ms-fe.svc.eqiad.wmnet/v1/AUTH_analytics/ottotest9/location_test1/a",
    "https://ms-fe.svc.eqiad.wmnet/v1/AUTH_analytics/ottotest9/location_test1/b"
  ]
}

The swift_object_urls field will be optional, and only filled in if the oozie user opts into setting them. Otherwise the user can list all the objects at meta.uri. (Q: should we use meta.uri for this, or make a new field, e.g. swift_prefix_url?)

Let me catch up here, seems that urls should have versions and not only be defined by a location and a set of artifacts in that location, right? Otherwise a re-run with the same parameters might override the very object the client is about to download?

Since we are really using the cluster as package builder and swift as "package storage" it seems the storage should have versions, right? I know, this is not so much a comment about the schema but rather about the flow. Let me know if you discarded this idea earlier.

The object URLs are totally up to the user, the script just uploads whatever is in the hdfs directory with a prefix. It looks like Erik is naming his e.g.

search_glent/20190720/part-00000-07245781-46f6-4e77-993c-6851a6c64d24-c000.txt.gz

But hm, I get your point. It might be nice if the upload script automated some versioning for you, even if that versioning was just a millisecond timestamp. e.g. search_glent/2019-07-22T12:43:12.1234Z/20190720/part-00000-07245781-46f6-4e77-993c-6851a6c64d24-c000.txt.gz with the 'versioning' here being the upload timestamp.

However, doing auto-versioning in the upload script would make constructing the event a little more difficult; we'd have to communicate what the version of the upload was to the event producing script via oozie. I suppose I could combine the upload and event emitting scripts; this makes things a little less modular, but would allow them to integrate a little better. Doing versioning like this might make things a bit harder for consumers to discover data without the event. I was imagining this oozie swift upload to be useable without requiring that users use a kafka consumer.

I'm not sure which is better here. I'd prefer this upload util workflow to be simple and allow users to build in more complexity if they want. The oozie upload script takes an object_prefix string parameter, which could include versioning info in it, but users would have to pass this in.

@EBernhardson any preferences here?

@Ottomata I think for users sake it is easier to do it the other way around maybe? Provide versioning by default and let users override that so artifacts could be uploaded to a fix url (thus removing the need of having an event to rediscover the data) . Could versioning be a simple md5 (oozie job name+ artifact name) ?

CC @EBernhardson for user feedback about this

I hadn't previously thought about re-publishing a new version of the same dataset. It does seem possible. For the purposes of my implementation the actual prefix is immaterial. The container is used to decide what kind of upload this is, the exact prefix is only an implementation detail.

For my purposes a timestamp injected somewhere in the prefix makes plenty of sense.

Ya, if you needed to re-run a job due to data backfill, you might want to be able to do so without overwriting the previously uploaded data in swift.

Ok, will see what I can do...

Change 525435 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/refinery@master] [WIP] swift-upload.py to handle upload and event emitting

https://gerrit.wikimedia.org/r/525435

Change 525562 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[mediawiki/event-schemas@master] Add new swift/upload/complete schema

https://gerrit.wikimedia.org/r/525562

Change 525562 merged by jenkins-bot:
[mediawiki/event-schemas@master] Add new swift/upload/complete schema

https://gerrit.wikimedia.org/r/525562

Change 525617 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[mediawiki/event-schemas@master] Increase date-time maxLength to 128 for all schemas

https://gerrit.wikimedia.org/r/525617

Change 525617 merged by Ottomata:
[mediawiki/event-schemas@master] Increase date-time maxLength to 128 for all schemas

https://gerrit.wikimedia.org/r/525617

Change 525621 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/deployment-charts@master] Allow swift/upload/complete events in streams named .*swift.upload-complete

https://gerrit.wikimedia.org/r/525621

Change 525621 merged by Ottomata:
[operations/deployment-charts@master] Allow swift/upload/complete events in streams named swift.*.upload-complete

https://gerrit.wikimedia.org/r/525621

Change 525625 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/deployment-charts@master] eventgate-analytics stream config swift upload regex - escape .

https://gerrit.wikimedia.org/r/525625

Change 525625 merged by Ottomata:
[operations/deployment-charts@master] eventgate-analytics stream config swift upload regex - escape .

https://gerrit.wikimedia.org/r/525625

Change 525626 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[mediawiki/event-schemas@master] Fix swift/upload/complete examples

https://gerrit.wikimedia.org/r/525626

Change 525626 merged by Ottomata:
[mediawiki/event-schemas@master] Fix swift/upload/complete examples

https://gerrit.wikimedia.org/r/525626

Change 526231 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[mediawiki/event-schemas@master] Update swift/upload/complete/1.0.0 with use of swift_prefix_uri

https://gerrit.wikimedia.org/r/526231

Change 526231 merged by Ottomata:
[mediawiki/event-schemas@master] Update swift/upload/complete/1.0.0 with use of swift_prefix_uri

https://gerrit.wikimedia.org/r/526231

Ottomata set the point value for this task to 8.Jul 30 2019, 4:03 PM

Change 525435 merged by Ottomata:
[analytics/refinery@master] swift_upload.py to handle upload and event emitting

https://gerrit.wikimedia.org/r/525435