Page MenuHomePhabricator

Implementing the reliable event bus using Kafka
Closed, ResolvedPublic

Description

Implementation sketch for a reliable event bus with both task queuing and pub/sub functionality, originally based on this restbase doc.

Client connection

A popular and well supported option is to use websockets or long polling. Fall-back to polling is common in websocket libraries.

Using Kafka

for queuing

Goal: deliver each message at least once while avoiding duplicate processing.

Kafka splits a topic into many partitions.

  • Each partition's *commit* offset is stored in zookeeper
  • Reading from a partition is separate from committing the offset. Client keeps track of read offset
  • Offset can be *committed* once acks are received by service clients (auto.commit.enable = false)
  • Clients rebalance partitions on join; need more (~2x?) partitions than clients
Error / timeout handling
  • Each message has an associated timeout
    • configured per queue
    • can be increased by client up to 12 hours (AWS) in poll request or per message ('heartbeat')
  • If no ack is received within this timeout window:
    • message is placed in internal queue & handed out again *once*; messages are only committed up to the failed message
    • if retry succeeds, all is well
    • else: place message(s) into retry queue, ack primary queue
      • dequeue messages from retry queue as well, keep counter of retries
      • once max number of retries reached, place message in separate dead-letter queue for inspection (ideally with some debugging info)

for pub/sub

Goal: Deliver each message to all consumers.

  • use one consumer group id per pub/sub client
  • each consumer group maintains its own commit offset, so speed independent
  • error handling as in queuing

Open questions

  • rebalance periodically based on committed offsets in each partition?

Similar services

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

@Ottomata, the confluence proxy seems to cover most of what we need, especially on the producer side.

A few nitpicks I noticed while browsing the docs:

  • No direct support for regular JSON messages / only supports Avro-flavored JSON. This is fairly easy to add in a frontend, but means that using this API directly from various services might be a bit ugly.
  • No direct support for low-latency streaming (can use native Kafka clients to get this, perhaps expose Websocket interface).
  • Consumer instances are tied to a specific proxy instance, would need to play some load balancing tricks without persistent connections.
  • Only single subscription per consumer instance; can make consuming many topics expensive as client needs to poll all those topics.
  • Apparently no support for maintaining the offset(s) completely as a client-side page, without being tied to a single consumer instance.
  • No direct support for access control (especially needed on the producer side) and message filtering.
  • The producer API isn't as clean as it could be. Producers need to know schema ids returned from previous requests & can't simply produce conforming messages to a topic.
    • But, there *is* a schema registry, and schema migrations can be restricted.

We already have a lot of commitment to python + kafka via the recent EventLogging of Kafka project. I wonder if might be worth considering creating our own REST endpoint built on the EventLogging codebase. The code there is quite slick, and already includes easy JSON Schema validation in conjunction with a schema store on meta.wm.org.

I still want Avro, but sticking with JSON Schema might give us a faster path to MVP, especially for lower volume (productionish) use cases. I wonder how hard it would be to build in Avro schema support to EventLogging. Just an idea! :) @Nuria? @madhuvishy? Thoughts?

When I say REST endpoint, I specifically mean for producing messages. I like the idea of building a (node based?) websockets endpoint for consuming from this system.

@Ottomata: Lets do some quick benchmarks & then make a more informed decision, taking into account the expected request volumes.

@ori, I'd love some thoughts from you too :)

Ja, sure @GWicke, I'm just writing down ideas and soliciting for thoughts.

I like the idea of quickly prototyping and benchmarking things, as it could help inform us as to how difficult different approaches would be to implement. But I'm not sold on how the benchmarking itself will help us make this decision, as the end product is likely to be much more complicated than the prototype. Also, any solution we choose will need to be horizontally scalable, so small differences in performance can be solved by scale out (likely more cheaply than by spending engineer time on building new things).

@GWicke, I guess if we are going to prototype and benchmark over in Labs Services, I should stand up Confluent stuff there too, eh?

@GWicke, I guess if we are going to prototype and benchmark over in Labs Services, I should stand up Confluent stuff there too, eh?

Yeah, that would be great!

From confluent: "Avro has a JSON like data model, but can be represented as either JSON or in a compact binary form. It comes with a very sophisticated schema description language that describes data" Seems to me that avro is a subset of JSON (a good thing) that can be (optionally) represented in binary. I think it seems like a good choice, what problems do you see with it @GWicke?

Confluent Schema Registry and Kafka REST Proxy are running.

Schema Registry is at:

confluent-event-bus.services.eqiad.wmflabs:8081

Kafka REST Proxy is at:

confluent-event-bus.services.eqiad.wmflabs:8082

@Ottomata, thanks!

I have created a simple node test service (see https://github.com/gwicke/restevent, main entry point). It validates a simple JSON schema, and serializes messages to snappy-compressed JSON in Kafka. Currently it only supports topic creation and message posting.

First throughput numbers, with schema checking:

$ cat test.post 
[{"url":"foo/bar","name":"somename"}]

$ ab -n10000 -c10 -T 'application/json' -p ./test.post http://10.68.17.136:6927/test/v1/topics/test/
...
Requests per second:    1108.07 [#/sec] (mean)

Percentage of the requests served within a certain time (ms)
  50%     17
  66%     19
  75%     21
  80%     22
  90%     25
  95%     30
  98%     35
  99%     38
 100%     65 (longest request)

Confluence, without schema checking:

$ cat confluence.post 
{"records":[{"value":"S2Fma2E="}]}

$ ab -n10000 -c10 -T 'application/vnd.kafka.binary.v1+json' -p ./confluence.post http://localhost:8082/topics/test

Requests per second:    1285.89 [#/sec] (mean)

Percentage of the requests served within a certain time (ms)
  50%      7
  66%      8
  75%      9
  80%     10
  90%     12
  95%     14
  98%     17
  99%     20
 100%     38 (longest request)

All this is from /home/gwicke/restevent, on confluent-event-bus, a dual-core labs VM.

Ha, we are both working on this late for fUUuUn!

Just did a similar, but sorta different thing in the EventLogging codebase using python tornado:

https://gerrit.wikimedia.org/r/#/c/235671/1/server/bin/eventlogging-service

Using your ab test, I get:

$ cat /home/otto/event.json
{"event":{"isAPI":false,"isMobile":false,"revisionId":56719144,"userAgent":"Mozilla/5.0(X11;FreeBSDamd64;rv:40.0)Gecko/20100101Firefox/40.0"},"recvFrom":"mw1113","revision":5588433,"schema":"PageContentSaveComplete","seqId":33729021,"timestamp":1441251122,"userAgent":"Mozilla/5.0(X11;FreeBSDamd64;rv:40.0)Gecko/20100101Firefox/40.0","uuid":"b28f1482a1bb566da944f6d4780c970e","webHost":"ja.wikipedia.org","wiki":"jawiki"}

Raw, no JSON schema parsing or validation:

$ ab -n10000 -c10 -T 'application/json' -p ./event.json http://kafka-event-bus.services.eqiad.wmflabs:8083/raw

Requests per second:    839.24 [#/sec] (mean)

Percentage of the requests served within a certain time (ms)
  50%     11
  66%     13
  75%     14
  80%     15
  90%     16
  95%     17
  98%     19
  99%     20
 100%     32 (longest request)

JSON parsing and schema validation:

$ ab -n10000 -c10 -T 'application/json' -p ./event.json http://kafka-event-bus.services.eqiad.wmflabs:8083/event

Requests per second:    599.27 [#/sec] (mean)

Percentage of the requests served within a certain time (ms)
  50%     16
  66%     17
  75%     17
  80%     18
  90%     19
  95%     21
  98%     26
  99%     29
 100%    169 (longest request)

This is producing with the kafka-python library using request.required.acks=1 (won't matter much here since only one broker, setting this to 0 would make a difference) and a synchronous producer.

Ah, I see your Kafka producer is async. Will try that too, but aaah I must go to bed! Can you try yours with a sync producer to compare?

Ah, I see your Kafka producer is async.

It's sync in the sense that requiredAcks is 1. All production-worthy node code is async / event-driven, so testing writes in a way that would block the event loop wouldn't make much sense.

@Nuria, I don't have anything against avro as a storage format and a possible interface option, especially if it has tangible benefits (possibly: performance, ease of use for analytics). However, I think that we should provide solid JSON interface support. Ideally, this would include continued support for JSON schema.

The biggest tangible benefit of Avro over JSON Schema is schema evolution.

Just did your ab with confluent rest proxy and avro data:

I used the EditEvent schema I made here during some previous confluent experimenting:
https://github.com/ottomata/EventAvro/

otto@confluent-event-bus:~/EventAvro$ cat edit_event.json
{"value_schema_id": 21, "records":
        [
            {"value": {
    "action": "saveAttempt",
    "meta": {
        "wiki": "en.wikipedia.org",
        "webHost": "mediawiki-vagrant.dev",
        "recvFrom": "mediawiki-vagrant.dev",
        "timestamp": 1433974402,
        "clientIp": "127.0.0.1",
        "seqId": 1,
        "uuid": "05fcf26",
        "userAgent": "nonya"
    },

    "action_init_type": null,
    "action_init_mechanism": null,
    "action_ready_timing": null,
    "action_save_intent_timing": null,
    "action_save_attempt_timing": null,
    "action_save_success_timing": null,

    "action_save_failure_type": null,
    "action_save_failure_message": null,
    "action_save_failure_timing": null,

    "action_abort_type": null,
    "action_abort_mechanism": null,
    "action_abort_timing": null,

    "mediawiki_version": null,
    "page": null,

    "editing_session_id": null,

    "user": null
}}
        ]
    }

ab -n10000 -c10 -T 'application/vnd.kafka.avro.v1+json' -p ./edit_event.json http://confluent-event-bus.services.eqiad.wmflabs:8082/topics/EditEvent

Requests per second:    733.79 [#/sec] (mean)

Percentage of the requests served within a certain time (ms)
  50%     12
  66%     14
  75%     17
  80%     18
  90%     22
  95%     28
  98%     36
  99%     41
 100%     73 (longest request)

I just reran the EventLogging service test I did last night, but with the Kafka producer async=True and batch_send_every_n=1000. This seemed to slightly improve the raw production, but json schema validation was about the same:

Raw production:

Requests per second:    937.63 [#/sec] (mean)

JSON Event validation and production:

Requests per second:    597.18 [#/sec] (mean)

Snappy compression seems to use significant CPU time. After disabling message compression in the node service, I get 1334 req/s and a p99 of 20ms (no aggregation, with schema checking, JSON serialization).

Based on the numbers so far & assuming 10x more throughput on real hardware, we'd need about five boxes to sustain 50k req/s using Java or Node, but about double that number using Python. For the 'prod' use case, a single box should be able to handle the load.

I think we have established that schema validation and a simple producer / topic creation api are fairly easy to provide. For the full functionality, we'd also need to add:

  1. dynamic schema fetching (eventlogging) or lookup from the config ('production') (all but python?)
  2. possibly, avro encoding support (sounds relatively straightforward) (node, python)
  3. some form of authentication (all)

Of these, authentication sounds like the potentially most complex part. 1) and 2) should be doable in a day. I might look into avro encoding & benchmark it later tonight.

The biggest tangible benefit of Avro over JSON Schema is schema evolution.

How would we leverage this in practice? Would you wire up schema compatibility checking with the schema namespace?

I'm also wondering if the schema compatibility checking is really such a big deal from an implementation point of view. It seems to boil down to asserting that a new schema basically defines a superset of messages of all older ones. This in turn means that all fields that were legal in older schemas are still legal in the new schema, including types. To a first approximation, asserting this with a visitor doesn't sound soo hard, although I'm sure the devil is in the details.

Based on the numbers so far & assuming 10x more throughput on real hardware, we'd need about five boxes to sustain 50k req/s using Java or Node, but about double that number using Python. For the 'prod' use case, a single box should be able to handle the load.

I don't think this is necessarily true. JSON validation and snappy compression are likely all CPU bound, and these boxes have many CPUs. We could parallelize the service, either by parallelizing the message production, or by running multiple services on one box.

Snappy compression seems to use significant CPU time. After disabling message compression in the node service, I get 1334 req/s and a p99 of 20ms (no aggregation, with schema checking, JSON serialization).

Snappy compression will probably be important, especially for the higher load non production use case. Messages are compressed in batches and even though the throughput for our test cases here seems to increase when we turn snappy off, I think there will be a threshold at which it will improve things.

@Ottomata, the 10x speed-up assumption is conservative for a 2x6 core box, assuming parallellization (which at least Node and probably Java have built-in anyway). We can likely squeeze out a little more by playing with the constants, but that doesn't change the order of magnitude.

There is a Java project that converts JSON schema to Avro schema & back: https://github.com/fge/json-schema-avro

It's quite verbose Java stuff, but the actual meat of the conversion looks rather straightforward.

This means that we have the option to continue using JSON schema externally, even if we use Avro internally.

@GWicke I think that restricting ourselves to avro is not a bad thing. Do you have in mind a use case that cannot be satisfied with avro? The least conversion needed the better.

From what I have seen, there doesn't seem to be a clear winner in the Avro vs. JSON schema comparison.

Better in Avro:

  • built-in schema compatibility checking
  • attribute rename mapping
  • more consistently implemented default value support
  • binary value support

Better in JSON schema:

  • integration with swagger specs, already widely used
  • value validation: string formats, ranges, lengths etc
  • less complex, possibly more robust: No schema registry, no need to integrate code review with schema validation
  • no need for special libraries in clients

Performance is probably going to be a mixed bag, with binary avro possibly winning over compressed JSON at write time, but losing when converting avro to json (vs. just spitting out a blob) at the consumer end.

  • no need for special libraries in clients

This is the same in either case, if we were to produce Avro-JSON data.

  • less complex, possibly more robust: No schema registry, no need to integrate code review with schema validation

How so? How do you handle multiple clients needing the schema in order to fill in default values? I don't want to have to clone mediawiki and look at the history if I'm just writing an edit event stream consumer.

  • binary value support

This is a huge win, not just that it is binary and more efficient, but it is strictly typed and allows you to more safely reason about the way code will execute.

If we are able to use Avro-JSON format, it is then very easy for us to convert to binary in the Analytics world, which makes things immensely better. There is a lot to be gained from standardizing on a org wide schema format, and JSON Schema will only work well in low load use cases where consumers don't need to evolve schemas or deal with ancient data. Especially as more and more folks are asked to justify their projects with data analysis, it will be very handy to be able to use the same infrastructure they use for message passing in production as they do for analytics.

I do agree that support for schema-ed JSON data is needed. I think we should experiment with making this service produce and validate Avro-Binary and Avro-JSON, depending on client preference. This could be done in one of our prototypes, or possibly in Confluent REST proxy.

If it is impossible or impractical to produce Avro-JSON, then we should support JSON Schema.

Also

no need for special libraries in clients

This is not correct. Any client that is sending events in JSON and has a test suite of unit tests would want to validate his events against the schema they are posting to (so events do not end up in /dev/null because they are not valid) and doing schema validation in JSON requires the usage of additional libraries even in languages like JS and php that support JSON natively.

It is the usage of schemas (them being JSON or JSON-avro) not the schema format what introduces the need for an additional library.

  • no need for special libraries in clients

This is the same in either case, if we were to produce Avro-JSON data.

  • less complex, possibly more robust: No schema registry, no need to integrate code review with schema validation

How so? How do you handle multiple clients needing the schema in order to fill in default values? I don't want to have to clone mediawiki and look at the history if I'm just writing an edit event stream consumer.

So you are seeing everything relying on the schema registry for basic functioning? If so, we'll have to make that highly available / replicated as well to avoid making it a SPOF, further adding to the complexity. Also, clients would need to write custom caching and polling code to avoid fetching that schema on each request. That would be fun, especially in PHP.

  • binary value support

This is a huge win, not just that it is binary and more efficient, but it is strictly typed

I actually meant binary data, not a binary encoding. The binary encoding is nice, but I don't think it's a given that it's significantly more efficient than compressed JSON, especially when emitting JSON to consumers.

If we are able to use Avro-JSON format, it is then very easy for us to convert to binary in the Analytics world, which makes things immensely better.

I'm on board with restricting the schemas to something that easily maps to Avro too. We can probably find a way to automate that check, perhaps by running the converter before checking in a new schema. That way, we get both value validation (from JSON schema), and compatibility with Avro.

There is a lot to be gained from standardizing on a org wide schema format, and JSON Schema will only work well in low load use cases

I am sceptical that you can convince people to stop using JSON schema in favor of Avro across the organization. Basically all of our APIs are JSON-driven, and there are good reasons for that. Your performance claim should ideally be backed up by data. The data we have so far certainly doesn't support it for the producer side.

where consumers don't need to evolve schemas or deal with ancient data.

The main unique bit of evolution support Avro seems to add is simple attribute aliasing. The common task of superseding an attribute or sub-structure still requires code handling both cases.

Lets please not over-hype Avro as "it'll solve all our data compatibility problems". It handles one simple sub-problem, but still leaves the harder and perhaps more common changes to be handled manually.

I do agree that support for schema-ed JSON data is needed. I think we should experiment with making this service produce and validate Avro-Binary and Avro-JSON, depending on client preference. This could be done in one of our prototypes, or possibly in Confluent REST proxy.

If it is impossible or impractical to produce Avro-JSON, then we should support JSON Schema.

I'm on board with supporting both at the API layer if it can be done reasonably easily. Lets pick the serialization format based on actual performance data.

It is the usage of schemas (them being JSON or JSON-avro) not the schema format what introduces the need for an additional library.

Yeah, as long as the serialization format is JSON. I'm on board with supporting schemas (and enforcing them on the way in), but I'm not on board with forcing all clients and consumers to deal with schemas.

I've implemented a very simple prototype that supports avro encoding in node.js. For serialisations I've used avro-serializer, which is a simple convenience wrapper over node-avro-io. I've benchmarked that for a simple message on localhost (OS X, quad-core i7 with 16Gb ram), and here are the results:

  1. JSON.stringify for serialisation, with schema check, no compression: 1090 req/s
  2. JSON.stringify for serialisation, with checma check, snappy compression: 1027 req/s
  3. Avro for serialisation, with schema check, no compression: 1013 req/s
  4. Avro for serialisation, with schema check, snappy compression: 974 req/s

I didn't find any good alternative libraries for node.js & avro serialization, and even node-avro-io has binary dependencies and doesn't build on my machine with node 0.12+ or io.js.

Overall, avrò serialization is significantly slower that pure JSON.stringify. A simple benchmark shows the following results:

  1. JSON.stringify(message) - 806k calls per second (124 ms for 100000 calls)
  2. schema.verify(mesage) + JSON.stringify(message) - almost the same performance - 770k calls/sec (130 ms for 100000 calls)
  3. avro.serialize(message) with schema validation - almost 20 times slower - 42k call/sec (2340 ms for 100000 calls)

@GWicke: While it is a good to give perf a try early on, I think perf results should be evaluated in production hardware rather than labs, on my experience testing EL the lab results cannot be extrapolated in any way to what you see on a real prod machine with several cores/more memory. This is specially true if results are so due to code being CPU bound which schema validation is.

testing EL the lab results cannot be extrapolated

EL might not actually be CPU-bound, which would explain the difference. IO performance in labs is pretty poor, while CPU performance is typically within a couple % of the raw hardware.

The Avro spec defines two main message containers:

  1. Object Container Files
  2. an RPC protocol

The Node Avro writer uses 1) by default. Unfortunately, the spec requires avro.schema to be embedded in the header of avro files, which accounts for most of the message size and CPU overheads. In a quick test script, 1327 bytes out of 1504 bytes for the avro message are used for the uncompressed schema alone, with the remainder used for the compressed data.

In Kafka, I believe that only the data encoding would be used (omitting the schema), which will be a lot more efficient. However, as that is not actually following the avro spec strictly, it makes it somewhat awkward to interact with.

For heavily string-based messages with lots of optional fields (omitted / undefined in JSON, null in Avro), output sizes are fairly similar between json and avro:

  • json 221
  • json+snappy 198
  • json+deflate 160
  • avro+snappy 1504 (1327 schema, ~17 container overheads, ~160 data)

This example also shows the limited effectiveness of per-message compression, especially for the kind of small messages we expect to be common.

Oh boy! Long weekend responses to make new responses to woowee!

So many things to respond to! I'm going to try to organize the subjects a bit.

Performance

I second @Nuria's comment about perf testing now, especially when considering the complexity and scale differences of a prototype test vs whatever this system will be in the real world.

CPU performance is typically within a couple % of the raw hardware.

However, it's not just labs vs. prod hardware that matters here. @GWicke is right, when testing something that is CPU bound, likely the performance will be similar in either case. Especially when we are doing the comparisons on the same hardware.

binary value support

This is a huge win, not just that it is binary and more efficient, but it is strictly typed

I actually meant binary data, not a binary encoding. The binary encoding is nice, but I don't think it's a given that it's significantly more efficient than compressed JSON, especially when emitting JSON to consumers.

Ah, sorry, I misunderstood you here. When considering performance of the binary storage format, I'm thinking of specifically the analytics use case, where huge amounts of data are loaded into memory around a distributed cluster at once. The fact that this data does not need to be parsed from JSON text makes this much more efficient. That this binary data is strictly typed means that the execution of distributed jobs is easier to reason about, as you know the type of data you will encounter at compile time. But blablabla, that is just a static vs. dynamic typing argument that everybody knows, and isn't relevant to performance :)

I'm on board with supporting both at the API layer if it can be done reasonably easily. Lets pick the serialization format based on actual performance data.

In any case here, the performance that we test now will vary from what we finally build because the scale in production will be larger, and the system will be more complex. It is interesting to note the performance differences now, so we are aware of them, but we should be making this choice based on feature set, not on raw performance. This system will need to be fully horizontally scalable. Unless the performance is prohibitive, performance differences can be solved by scale out.

Compression and size

  • JSON Schema vs Avro-JSON should be the same, they are both just JSON.
  • Batches of JSON records of the same schema compress well. I don't expect much of a difference in size between a file of compressed JSON messages and a file of compressed Avro-Binary.

This example also shows the limited effectiveness of per-message compression, especially for the kind of small messages we expect to be common.

I think this is a difference between the 'production' system and the 'analytics' system. Kafka producer clients usually compress batches of messages, not individual messages. The analytics system will have a high throughput, and gain a lot from batch compression, whether or not the data is JSON-text or binary.

In Kafka, I believe that only the data encoding would be used (omitting the schema), which will be a lot more efficient. However, as that is not actually following the avro spec strictly, it makes it somewhat awkward to interact with.

Thanks for pointing this out. Each Avro-Binary (and also Avro-JSON?) message in Kafka would still be associated via a specific schema (obtainable via a schema registry). This isn't following the Object Container File spec, because these are not files. At its core, Avro is just an object model, not a storage format, and an Avro Object Container File is just a storage format that Apache Avro specifies and uses. This is why we have Avro-JSON as a storage format option. (E.g. for certain analytics use cases, the data may be stored as Parquet (a binary columnar storage format) in HDFS, and read back by Hadoop jobs using an Avro schema.)

Client Dependencies

If you are consuming Avro-JSON, your dependencies are the same as if you are consuming JSON-Schema. Just JSON. If you are consuming binary, you will need Avro dependencies.

Adoption

There is a lot to be gained from standardizing on a org wide schema format, and JSON Schema will only work well in low load use cases

I am sceptical that you can convince people to stop using JSON schema in favor of Avro across the organization. Basically all of our APIs are JSON-driven, and there are good reasons for that.

I agree we shouldn't try to ask people to stop using JSON, especially for public facing APIs. Are all our APIs JSON Schema based, or just the more recent public facing Services built APIs (RESTbase, etc.)?

I'm on board with restricting the schemas to something that easily maps to Avro too. We can probably find a way to automate that check, perhaps by running the converter before checking in a new schema. That way, we get both value validation (from JSON schema), and compatibility with Avro.

Indeed. I'm skeptical of our ability to easily convert between JSON Schemas and Avro schemas. Yes there is this, but it even admits that it doesn't work well in some cases. I think we will avoid a lot of compatibility and conversion bugs if we stick with one schema choice, if we can. That's why I'm suggesting we support both Avro-JSON and Avro-Binary.

Schema Registry

So you are seeing everything relying on the schema registry for basic functioning? If so, we'll have to make that highly available / replicated as well to avoid making it a SPOF, further adding to the complexity.

In some binary cases, yes. But in a system with many producers and many consumers, I don't see how to avoid this. (BTW, the Confluent Schema Registry has replication built in.) This EventBus produce API we are building here will need to have all schemas in order to validate produce requests, and other clients will need access to schemas as well. A schema service is needed in order to share schemas with many clients.

Also, clients would need to write custom caching and polling code to avoid fetching that schema on each request. That would be fun, especially in PHP.

  • For producing messages, this is true both JSON Schema or Avro, if the client does not yet have a version of the schema in code.
  • For the production system use case where clients would be consuming Avro-JSON, this is not an issue, as you can parse the JSON without a schema.
  • For the general analytics system use case, jobs would likely have classes generated from Avro schemas in code, so this usually won't be a problem.
    • For the analytics batch (Hadoop) use case, the data would be written using Object Container files, (or Parquet), so the schema would be available in the files.
    • For the analytics realtime streaming use case, where clients would (mostly) be consuming Avro-Binary directly from Kafka, and don't have generated classes or schema in code, this is indeed something that will have to be dealt with.

Schema Evolution

where consumers don't need to evolve schemas or deal with ancient data.

The main unique bit of evolution support Avro seems to add is simple attribute aliasing. The common task of superseding an attribute or sub-structure still requires code handling both cases.

JSON Schema will not fill in default values for you. Each client consuming this data will have to implement logic to deal with missing fields themselves. As far as I know, JSON Schema will "handle" addition of new fields ("handle" in that those fields will be allowed but not validated in any way) via the additionalProperties setting , but it will not handle the case of missing fields in the data. In a quick test using python jsonschema, data missing a field was successfully validated, but there is no way to fill in the missing field with a default value:

from jsonschema import validate

schema = {
    "type" : "object",
    "properties" : {
        "price" : {"type" : "number"},
        "name" : {"type" : "string", "default": "NONE" },
    },
}

# If no exception is raised by validate(), the instance is valid.
validate({"price" : 1}, schema)

In this case, all consumers of this data will have to implement logic to deal with the fact that the name field was not provided in this message. If Avro were used, the name field would have been given the value of "NONE" when the message was produced.

tl;dr

Woo, that's a lot of responding!

  • Everyone agrees EventBus will support JSON
  • client dependencies not relevant
  • features > performance for criteria
  • schema registry necessary
  • Avro schema evolution is a very good thing
  • Optional Avro binary (and typed) storage format is good

Are all our APIs JSON Schema based, or just the more recent public facing Services built APIs (RESTbase, etc.)?

The Action APIs generally don't have schemas defined, but we have been defining schemas for REST APIs and services, typically as part of the general effort to document the services with Swagger specs. We are now leveraging those specs to automatically monitor & test services.

JSON Schema will not fill in default values for you

JSON schema (the spec) doesn't, but implementations can and do, based on the 'default' value defined in the spec.

Attempt at a summary

Overall, I think we agree that we want to protect the integrity of the data by enforcing schemas on the way in. Where we diverge is how to best achieve that, and how much complexity we are willing to introduce in the process.

From an analytics perspective, you like Avro for its schema migration guarantees and Java integration. If possible, you'd prefer to use the binary Avro encoding in Kafka (so that you don't need to re-encode when loading into Hadoop). You directly control most systems the analytics system will interact with, which makes complexities around the schema registry, avro and documentation less pressing to you.

From a services perspective, we are more focused on the interaction with various systems and clients. It is important to us to provide a low-friction API, with low mandatory requirements for each individual client. Reliability and overall system complexity is also important, as our use cases will directly impact the functioning of important features.

It sounds like a solution that guarantees compatibility with JSON schema *and* Avro could work for both applications. To make progress, I think we should simply try converting some representative schemas to see whether that is an option or not. Do you have examples for schemas that are especially tricky?

It sounds like a solution that guarantees compatibility with JSON schema *and* Avro could work for both applications. To make progress, I think we should simply try converting some representative schemas to see whether that is an option or not. Do you have examples for schemas that are especially tricky?

This is a great summary and idea. Off the top of my head, no, as I haven't really tried. Maybe @Nuria and I can find some.

Many schemas are listed here: https://meta.wikimedia.org/wiki/Research:Schemas not all of them are in use but for the purpose of avro compatibility I do not think it really matters.

@Nuria, the couple of schemas I looked at all seem to be relatively straightforward. Most seem to be a combination of value attributes and string enums. Some have nested object too. I haven't seen any that use JSON schema's value (string pattern / numeric range) validation support, which wouldn't be transferable to Avro.

While I haven't set up a commandline wrapper for the Java converter yet, its tests are passing for me, and seem to cover that functionality.

@Nuria, the couple of schemas I looked at all seem to be relatively straightforward. Most seem to be a combination of value attributes and string enums. Some have nested object too. I haven't seen any that use JSON schema's value (string pattern / numeric range) validation support, which wouldn't be transferable to Avro.

While I haven't set up a commandline wrapper for the Java converter yet, its tests are passing for me, and seem to cover that functionality.

I posted a web service that does bidirectional conversions here: https://github.com/eevans/json-schema-converter. It is very much a MVP, but works-for-me. Usage should be straightforward, and the README should help.

OK, I extracted a copy of the 85 schemas marked active at https://meta.wikimedia.org/wiki/Research:Schemas, and played around with them a bit.

As-is, they all fail conversion because the converter defaults to draft 4 of the spec (ours must all be based on an earlier version), so to each schema I added 3 additional properties:

propertiesvalueComments
$schemahttp://json-schema.org/draft-03/schema#To declare them as being draft 3 compliant
additionalPropertiesfalseCannot be unset
typeobject

With these changes applied, there are 3 schemas with remaining issues.

  1. https://meta.wikimedia.org/wiki/Schema:Edit
  2. https://meta.wikimedia.org/wiki/Schema:MobileAppCategorizationAttempts
  3. https://meta.wikimedia.org/wiki/Schema:Schema

Of these, the first two fail solely because they have dot and/or hyphen delimited property names (so are easily fixed). I'm not entirely sure why Schema:Schema fails. The two uses of properties: [] prevent it from validating as JSON schema, but even with that fixed, conversion fails.

Speaking of conversion failures: When conversion fails, the reason for the failure is very opaque (ProcessingException: fatal: no suitable processor found), hopefully there is some way straightforward way to improve upon this (you're poking around blindly in the dark, otherwise).

Additionally, the conversions aren't perfect. The resulting Avro schema is given an auto-generated record name (I'd hoped it would use JSON schema's title property, but it does not). The description property is also dropped in the conversion.

FYI, that while trying conversion is a good exercise I do not think eventlogging json schemas will be a showstopper if they were not convertible to Avro as most schemas can be set up in different ways to capture the data we want. That is, for everyone of our EL schemas there is an avro schema we could have that captures the same data.

Here I am at 1am NYC time on a flight to SF and my mind is wandering EventBus.

It sounds like a solution that guarantees compatibility with JSON schema *and* Avro could work for both applications.

What would an EventBus that supported both JSON Schema and Avro look like?

Would we maintain a mapping between the two versions of a schema revision?

Would the system just produce both formats into Kafka in different topics (this duplicates the data, but might not be a bad idea?). Or would we have some built in auto conversion between types, e.g. produce JSON Schema, consume Avro Binary.

Or would we keep it simple, and just give users a choice?: if you want JSON Schema, use it. If you want Avro, use it. If our use cases are so different, maybe that is all we need to do.

Daww...are we arriving at a point where our use cases are so different that we need to break up?! I would be a little disappointed if we broke up, as we wouldn't have manifested the grand vision of a unified data format + message bus. I really do think there is a lot to be gained unifying the foundation under the same schema format. Hm.

Hm. Anyway just some thoughts from me at 30-thou down to our earth bound phabricator. :)

What would an EventBus that supported both JSON Schema and Avro look like?

It would support JSON Schema and Avro specs, and store data as JSON (compatible with Avro *and* plain JSON) or Avro binary. The key is keeping the schemas compatible with Avro and JSON Schema, which based on what we have seen doesn't seem to be so difficult. Even if some topic is set up to internally use Avro binary serialization, we can expose a public consumer interface that's emitting the JSON serialization.

Lets not obsess too much about a minor syntactical difference in schema formats, which shouldn't really matter much in practice, especially when using JSON serialization. By offering the schema in either format, we remain compatible with the widest set of producers and consumers, and can continue to hook into API documentation tools like Swagger, while also leveraging Avro where that is more convenient.

Oh, hm. So you are saying that a message of a given schema would validate using the Avro and JSONSchema versions of that schema? e.g.

# JSON Schema
{
    "properties": {
        "a_field": {
            "type": "string",
        }
    }
}


# Avro Schema
{
  "type" : "record",
  "name" : "a_record",
  "namespace" : "org.wikimedia.mediawiki",
  "fields" : [ {
    "name" : "a_field",
    "type" : "string"
  } ]
}


# Datum / Record / Event / Message / Whatever
{ "a_field": "woowoo" }

# (P.S. I didn't actually test any of the above schemas for validity)

So, we [cw]ould enforce that both incoming records validated against a JSON schema and an Avro schema? HMmMm.

@Ottomata, since the schemas are basically different syntax for the same idea, we could use either Avro or JSON schema for validation. There is richer support for value validation in JSON schema, so if the JSON schema specifies that, then it would be more precise to validate using JSON schema. In the reverse direction (features exclusive to Avro) there is primarily ordered key-value collections, which we probably don't have a use case for. We would reject schemas that contain ordered collections, but would probably accept JSON schemas that contain value assertions not supported in Avro. The Avro schema would still validate any data that the JSON schema will, but the reverse might not always be true if numeric ranges or string patterns are specified.

Off the top of my head, I'm also remembering that enums are a little different between the two, right? I think Avro requires that enum symbols be strings, and JSON Schema lets you use anything? But, ja, this just might work!

How would this look to a user then? Let's say we do validation with JSON Schema. Would that mean that the user would only have to submit the JSON Schemas somewhere, and we would auto convert and store the Avro version of the schema in the registry? Then when producing, the user could specify what format the event should be produced as? Something like:

curl -X POST --data '
  {
    "schema_id": 123, 
    "records": [ {"a_field": "woowoo"} ] 
    "format": "avro-binary" # or "json"
   }
' http://eventbus/topics/a_record"

Then EventBus could convert the JSON to Avro-Binary, or just send it along, based on user preference?

I think Avro requires that enum symbols be strings, and JSON Schema lets you use anything?

Yeah, I would hope that the conversion tool would reject such schemas. Based on @Eevans' findings in T88459#1664605, non-string enums don't seem to be an issue currently.

How would this look to a user then?

I'd say the user would produce a message to a topic, and the topic configuration would determine the schema to check against, and how the topic is serialized in Kafka.

Would that mean that the user would only have to submit the JSON Schemas somewhere, and we would auto convert and store the Avro version of the schema in the registry?

We'd definitely auto-convert, for example as part of the build / CI setup. We could store the Avro version in the registry as well, for example by syncing the avro versions from the schema git repo.

Given their importance, my personal preference is to apply code review to schema changes, which means that we'd move schemas from the schema namespace on-wiki to a git repository. Do you think this would be an issue for schema development, especially for eventlogging?

@Ottomata & myself just chatted about this in the office. It sounds like we are converging on a rough plan:

  • Have a config containing a list of topics, and for each topic
    • a JSON schema,
    • an Avro schema,
    • the serialization format (binary avro, json)
    • possibly, the number of Kafka partitions
  • Maintain schemas in a git repository, with CI ensuring JSON/schema conversions & Avro schema compatibility.
    • @ori indicated that he thinks this would be fine for EL as well. Might be worth double-checking with PMs using EL, though.

Notes:

  • No aliasing / property renames. These aren't supported in JSON schema.
  • We'll support only backwards-compatible schema changes. This guarantees that consumers can always use the latest version of a schema for reading, and we don't need to store the schema version with each message. In practice, this means that 1) we only add properties, and 2) types of properties remain stable.
  • Really major schema changes should be very rare; if absolutely necessary, we can create a new topic for these.

For the implementation, we could consider expanding on https://github.com/gwicke/restevent/blob/master/routes/v1.js, or do something similarly simple. The current implementation already performs rather well, and is only 120 lines. There is already topic creation support, so the main thing needed would be

  • adding a config (YAML?),
  • making sure all topics exist on startup,
  • produced messages are checked with the schema corresponding to the topic,
  • add support for binary avro serialization, and finally
  • add some basic access restrictions.

FYI, I am mostly sold on going forward this way, but I still want to think about it some, and make sure that it really makes sense to create something new here, rather than modify something else that already exists (confluent, or eventlogging?). But overall I think this approach could work and would make the most amount of people the most happy :)

@Neil_P._Quinn_WMF brought up some good points about the strengths of the current on-wiki EventLogging schemas:

  • Schema documentation is easily discoverable, and easy to update.
  • There is a provision for marking schemas as inactive, and build listings of active / inactive schemas.

Providing similar functionality in a git based system is possible, but potentially a bit more cumbersome:

  • We could have a SchemaName.md file along SchemaName.json, and SchemaName.avro. There are tools to visualize JSON schema, so we could perhaps expose schemas per topic from the service.
  • Listing active schemas would probably involve some grepping, or some convention like /active/ and /inactive/ subdirs.

@Milimetric mentioned this to me today too, and had a similar idea. Maybe we can build the schema repo in such a way that it can be used by a Mediawiki extension or some other site to allow for browsing schemas and info about them.

Some more notes from yesterday's conversation in the office:

EventLogging

For EventLogging, varnishkafka currently writes request urls containing a query string with the JSON-encoded event straight into Kafka. Consumers then deal with the decoding.

For the event bus, it would be nicer to decode & validate before adding the events in Kafka. Since this requires more CPU, it might make sense to handle this by setting up a regular service & configure it as a Varnish backend. This service would expose a GET entry point that extracts the event JSON from the URL, and then

  • validates the event based on the schema name / revision in the topic,
  • writes it to a corresponding topic.

An advantage of this setup would be the ability to provide error responses to a browser if schema validation fails, speeding up debugging. A downside is the need to set up another service cluster. Request rates are currently moderate (< 1000/s, mean ~100/s), but the expectation is for this to increase up to 50k/s eventually. We should consult with @BBlack on this topic.

Since this is custom for eventlogging anyway, we could actually consider continuing to support fetching / caching those schemas from the wiki. Cache invalidation is handled by the revision id supplied in events.

Avro

@Ottomata is investigating if going with JSON schema only could be an option. There are some unforeseen complexities in the way the schema id is encoded in Kafka (custom encoding), which would need to be replicated by all binary avro consumers.

This service would expose a GET entry point that extracts the event JSON from the URL

Ideally it would work with GET if it had to, but POST would work just like we'd do it for internal services. GET would be just for non JS clients.

@Ottomata is investigating if going with JSON schema only could be an option.

I'd still like to see if we can make the JSON Schemas we define compatible with Avro Schema. I think what I said yesterday is that it is starting to make more sense to me to not support producing binary Avro to Kafka directly, because of the difficulty this causes when consuming the data. I had thought it would be easier than it is. Still looking into it.

If we can still keep the Avro Schemas around for each JSON Schema, then we can much more easily import into the Analytics world into binary files. Streaming binary is just looking too complicated right now.

Just collecting a couple of morning thoughts.

If we adopt a convention of always storing schema name and/or revision in the schemas themselves, then we can do like EventLogging does and infer and validate the schema based on this value. This would especially be helpful in associating a message with an Avro Schema when serializing into binary.

It sounds helpful to tie a topic to a schema, but I think we should be able to know the schema of a given message in Kafka by something other than having to look in the schema repository config.

I need to investigate further, but I think there will be an advantage in being able to easily get any version of a schema out of the schema repository, not just the latest. This is probably true for JSON Schema, but maybe even more so for Avro Schemas. Camus has the ability to 'cast' an older schema version to a newer version while serializing. Even though it should be possible to read any old data with later schemas if those schemas are evolved properly, I foresee running into situations where there'll need to be tooling around translating between the two.

Also, it I think it would be nice for schema designers to be able to easily see the history in front of them, rather than relying on git history to browse the past. There will be occasions where multiple versions of a schema are being produced to the same topic.

Cool stuff!

@Neil_P._Quinn_WMF brought up some good points about the strengths of the current on-wiki EventLogging schemas:

  • Schema documentation is easily discoverable, and easy to update.

It gets ugly, people have tried to put bullets and markup in the JSON "description" field.

  • There is a provision for marking schemas as inactive, and build listings of active / inactive schemas.

That's only by convention of templates and categories on each Schema_talk: page. When a developer removes the last logEvent( 'MobileWebUploads', myData) call, the talk page (sample) remains unchanged until someone notices.

Providing similar functionality in a git based system is possible, but potentially a bit more cumbersome:

  • We could have a SchemaName.md file along SchemaName.json, and SchemaName.avro. There are tools to visualize JSON schema, so we could perhaps expose schemas per topic from the service.

Sounds good.

We could still write wiki pages that explain schemas, with all the benefits (easy update, categorization, easy links to wiki pages and URLs, translation by volunteers) and problems (pages don't get updated when schema changes, pages don't get updated when circumstances change), using templates to link to schemas in this new system. As with MediaWiki code, my bias is to have high-level project docs on a wiki, and try keeping implementation details alongside the source code.

GWicke renamed this task from Implementation sketch for reliable event bus using Kafka to Implementation option for reliable event bus: Kafka.Oct 5 2015, 5:08 PM
GWicke renamed this task from Implementation option for reliable event bus: Kafka to Implementing the reliable event bus using Kafka.Oct 5 2015, 6:12 PM

It'd be interesting to have this interface used by MediaWiki for recent changes (e.g. replacement backend for RCStream).

We'd be emitting JSON packets from MediaWiki using an abstract interface to an rc topic (which would go nowhere by default, and goes to Kafka for Wikimedia; others may configure it with plain UDP or files on disk etc. some abstract interface that isn't too deeply tied to Kafka). And we'd refactor RCStream to subscribe to the (public) Kafka topics.

We'll need to be careful about what we do and don't expose since some of these topics may not be intended for public consumption. This could also help resolve T114951: Support proper continuation for RCStream after connection drops.

What would the PHP producer/publisher/emit interface to Kafka look like?

Currently EventLogging's server-side EventLogging::logEvent() simply uses LegacyLogger::emit(), and we configure

$wgEventLoggingFile = 'udp://10.64.32.167:8421/EventLogging';  // eventlog1001.eqiad.wmnet

to magically send this over UDPTransport into EventLogging-land. Can we use a similar trick to abstract away logging to Kafka? @EBernhardson wrote a Monolog KafkaHandler, (https://gerrit.wikimedia.org/r/#/c/229172/)...

It'd be interesting to have this interface used by MediaWiki for recent changes (e.g. replacement backend for RCStream).

We'd be emitting JSON packets from MediaWiki using an abstract interface to an rc topic (which would go nowhere by default, and goes to Kafka for Wikimedia;

Likewise, would we replace includes/rcfeed/RedisPubSubFeedEngine.php with KafkaFeedEngine.php, or just have a LoggerFeedEngine.php that logs the feed and WMF configures this to emit to Kafka?

Hm, a worry:

It would support JSON Schema and Avro specs, and store data as JSON (compatible with Avro *and* plain JSON) or Avro binary.

Optional fields are represented in a very annoying way in Avro JSON. In Avro Schema, optional fields are notated by using a union type, like:

{
  "name" : "my_field",
  "type" : [ "null", "string" ],
  "default" : null
},

If you just wanted to produce JSON that would validate against a corrsesonding JSON schema with this non required field, you'd do:

{
...
"my_field": "my value",
...
}

However, to validate against the Avro schema, you have to specify the type of the value in the record, like:

{
...
"my_field": { "string": "my value" },
...
}

Also, apparently defaults for missing fields are not auto filled in during (most?) JSON -> Avro serializations. See: http://grisha.org/blog/2013/06/21/on-converting-json-to-avro/

Should we disallow optional fields altogether? Hm. I suppose not, because if we did, we'd run into many more schema evolution issues.

I believe we intend to discuss this at E84: Dependency Injection for MediaWiki core (RFC Meeting 2015-10-28). Sorry for the late notice, but I hope you can be there.

I believe we can close this task. Any objections?

Ottomata claimed this task.