Page MenuHomePhabricator

Determine proper encoding for structured log data sent to Kafka by MediaWiki
Closed, ResolvedPublic

Description

@bd808, @EBernhardson I believe is producing binary Avro to Kafka. At this time, I advise that you to stick to JSON. Analytics is working on figuring out how to import the binary data into Hadoop properly, but it is not as easy as we thought it would be. EventBus will in the near term only support producing JSON.

Using an Avro schema is fine, as is using a JSON schema. If you do go with Avro schema (which you are!) you should be able to push the Avro-JSON representation of your data, instead of binary. I'm not sure how that works with the existing Avro+Monolog implementation, but we should make sure that it works.

The avro encoder in php doesn't look to directly support json encoding, but i don't think it needs to either. We would just need to re-use the schema validation, then do a standard json encoding. It seems we might not need binary avro in the php side at all then?

Related to:

Related Objects

Event Timeline

bd808 raised the priority of this task from to Needs Triage.
bd808 updated the task description. (Show Details)
bd808 added subscribers: bd808, EBernhardson, Ottomata.

@madhuvishy let me know earlier today on IRC the issues with binary encoding in the kafka -> hadoop pipeline (camus) have been worked out.

The avro encoder in php doesn't look to directly support json encoding, but i don't think it needs to either. We would just need to re-use the schema validation, then do a standard json encoding. It seems we might not need binary avro in the php side at all then?

That's right. The Avro - JSON representation is JSON that can be serialized to Avro Binary by passing validation of an Avro Schema. There is an avro-tools .jar that helps you verify this from the CLI. Something like

java -jar avro-tools.jar fromjson --schema-file path/to/schema.avsc record.json

If that works, then your JSON is valid Avro.

@EBernhardson you ended up using binary Avro for T103505 right? Is that working well enough to say that it is the "right way" to handle new things like T108618: Publish detailed Action API request information to Hadoop?

Avro is pretty awesome for some things (especially data size), but we are still working out issues related to schema evolution. We are looking into doing our first schema change (adding two new fields) after trying out the existing schema for a few weeks. This is proving to be troublesome as camus currently only knows about a single version of the schema, and with binary encoding you cannot just add fields.

Avro's json format might be a better choice for writing to kafka, but annoyingly the avro php library does not support the necessary transformations to output this (yes, the avro json is not just the same data that was encoded to binary but encoded with json_encode(). There are a couple transformations that need to be applied). I don't expect this would be particularly hard, I just havn't had time to dig through the spec and figure out what all needs to happen.

I was just talking to @Ottomata this morning about ways forward, we are currently considering prepending a binary value (1 or 2 bytes, haven't decided) to all messages to use as a schema version identifier. Actually solving this problem wont happen this week though, the current plan is to do some exploration next week.

Yeah, I'm unsure of what we should do at this point. I won't have a lot of time to work on Avro support in Kafka right now. Other analytics devs might, but I'm not sure.

If we want to use Avro in Kafka, the right thing to do would be to prepend 4 byte integer schema id just as @EBernhardson says. But, then all consumers would have to know how to decode this special message, including unpacking that integer and then mapping it somehow to the writer schema. This is how the kafka+avro world is doing things, but it is pretty annoying when we have so many different clients and languages.

Alternatively, we could just avoid using Avro in Kafka altogether, and produce (jsonschema-ed) JSON. At the moment, we'd just import this JSON into Hadoop and you'd use it there. In the future, we plan to make converting (possibly at import time) for jsonschema-ed data into Avro in Hadoop. Once the Avro data is in Hadoop, it is fine and easy (since the schemas are stored in the header of the binary Avro container files).

Avro's json format might be a better choice for writing to kafka, but annoyingly the avro php library does not support the necessary transformations to output this (yes, the avro json is not just the same data that was encoded to binary but encoded with json_encode(). There are a couple transformations that need to be applied). I don't expect this would be particularly hard, I just havn't had time to dig through the spec and figure out what all needs to happen.

It looks to me like the Avro JSON encoding rules are relatively simple. I'd be willing to take a stab at adding JSON encoding support to the upstream library we are using if that seems like a useful feature for us in general.

Could be worth it. We had a meeting yesterday and discussed at least trying this. We want to see if we can easily use camus to write Avro binary data when receiving Avro JSON. Hold off for now and ask @Nuria and @dcausse how it goes.

The only thing I noted that is especially different about regular JSON and Avro JSON is how union types are specified. Aside from that everything looks the same.

Unfortunately AvroJson won't help to resolve the issues we have with AvroBinary, if the schema used by the producer is different it will fail.
The requirements are the same for AvroJson and AvroBinary: the consumer needs to know the schema used to generate the data inside the kafka message.
I've pushed a patch to refinery-camus that includes a very basic support for schema rev_id in the kafka message and use the classpath as a schema repository: https://gerrit.wikimedia.org/r/#/c/251267/

I talked with @Ottomata briefly a couple of weeks ago (in the lobby of Club Quarters in SF) and he mentioned that for some use cases it might be simpler to just produce json data into Kafaka rather than deal with the complexity of using binary Avro encoding.

I think that @dcausse and @EBernhardson got a handle on how to deal with Avro schema evolution, but I'm wondering specifically if @Ottomata is right for the use case I have in T108618: Publish detailed Action API request information to Hadoop today. This data pipeline is only intended to feed a single Hive/Hadoop table that will be processed using additional Hive jobs to create specific dimensional tables for reporting on MediaWiki Action API usage. Is backwards compatible schema evolution needed for the seemingly simple case of "gather data in MediaWiki and make it available to Hive"? If it makes any difference this will be a fairly high volume data set. The Action API gets ~450M daily hits as counted in the wmf.webrequests data measured at Varnish and we assume the request count measured at MediaWiki will be larger as the Varnish route does not count internal cluster requests from the Parsoid servers (or at least is assumed not to).

The pipeline is working fine for us with avro binary now.
Erik wrote a documentation about it : https://wikitech.wikimedia.org/wiki/Analytics/Cluster/MediaWiki_Avro_Logging

Schema upgrade is still a bit painful since schemas are not deployed to a http accessible repo (we still need to build and deploy refinery-camus).
With a http enabled schema-repo we should be able to remove step 2 & 3 from https://wikitech.wikimedia.org/wiki/Analytics/Cluster/MediaWiki_Avro_Logging#Schema_Upgrade_Checklist

Whoa, I had not seen that page. Nice job Erik, that is really great!

This data pipeline is only intended to feed a single Hive/Hadoop table that will be processed using additional Hive jobs to create specific dimensional tables for reporting on MediaWiki Action API usage. Is backwards compatible schema evolution needed for the seemingly simple case of "gather data in MediaWiki and make it available to Hive"?

Yes, it is, cause your data is not likely to look the same forever (fields might be drooped or added)

If it makes any difference this will be a fairly high volume data set. The Action API gets ~450M daily hits as counted in the wmf.webrequests data measured at Varnish and we assume the request count measured at MediaWiki will be larger as the Varnish route does not count internal cluster requests from the Parsoid servers (or at least is assumed not to).

Scale makes no difference in this case, avro compresses well but even if we were to use json we would need to do compression.

Hm, just a thought. 450M per day is about 50 per second, ja? If we blacklist this schema from going into MySQL, you could emit these via EventLogging and automatically get JSON data in Hadoop.

https://wikitech.wikimedia.org/wiki/Analytics/EventLogging#Access_data_in_Hadoop

@Nuria and I talked this out on irc a bit and had these thoughts:

  • Schema migration (adding/removing fields) using Avro is still requires some manual steps.
  • Querying data with Hive from an EventLogging supplied data set requires a intermediate step of parsing the JSON blobs.

Based on these tradeoffs and the desire to get T108618: Publish detailed Action API request information to Hadoop working sooner rather than later I'm going to attempt to implement an Avro based solution. If we reach a point where using Avro is too difficult we can always back up and try the EventLogging path instead.

Querying data with Hive from an EventLogging supplied data set requires a intermediate step of parsing the JSON blobs.

FYI, T162610 is coming soon (next quarter?) and should make this much easier.