Page MenuHomePhabricator

Flink output support for Event Platform events
Closed, ResolvedPublic

Description

Flink pipelines will need a way to output JSON events that we are sure are compatible with Event Platform conventions. We need Flink support for doing what EventGate does when it receives events. Namely:

  • ensuring the schema is allowed in the stream
  • making sure some fields are set, e.g. meta.dt, dt, etc.
  • validating the event against its schema

There is already code to do this in wikimedia-event-utilities in the JsonEventGenerator. This is used in the wdqs-rdf-updater job, but its usage is pretty manual.

We should be able to plug JsonEventGenerator somewhere at the end of a streaming pipeline before producing the events to Kafka. There are a few ways to do this, and I'm not yet sure which is best:

  1. Pipeline step: Explicit map step in a pipeline, converting from a case class or a Row -> ObjectNode -> JsonEventGenertor.generateEvent -> Kafka
  2. Custom Flink serialization format: this could wrap JsonRowDataSerializationSchema somehow and call JsonEventGenertor.generateEvent with the Row converted from ObjectNode before serializing it to a byte[]

We chose solution 2.

Event Timeline

I had originally leaned towarrd option 2. custom serialization format, but the more I think about it, this is not about serialization. This is about ensuring that an event has all the required Event Platform properties before it is produced. I think probably a pipeline step makes the most sense. It will be easier to automate this for the general case if we expect the input to this pipeline step is a Row, so that we can dynamically add new fields without having to make new hardcoded case classes.

Doing it at pipeline step level will be a little hard though. We'd likely want to work at Table API level, so that both DataStream and Table API pipelines can work. Doing at the Table API level would mean a bunch of addOrReplaceColumn steps, and I'm not sure how I could re-use the logic in JsonEventGenerator with those.

Or, perhaps a custom UDF that would transform the input Row into the output Row using JsonEventGenerator. I think that this would work, but I'd still have to work with Rows instead of ObjectNodes. (Perhaps we need a FlinkRowDataEventGenerator?) Hm, it might not work; the UDF may need to have a Row of a specific hardcoded schema defined.

Change 804299 had a related patch set uploaded (by Ottomata; author: Ottomata):

[wikimedia-event-utilities@master] WIP - use JsonEventGenerator in a Flink Deserializer

https://gerrit.wikimedia.org/r/804299

Did a little more research on this today. I think we should write both

  • EventJsonRowSerializationSchema implements SerializationSchema<Row>
  • EventJsonRowDataSerializationSchema implements SerializationSchema<RowData> and all the associated Flink Table API factories.

When using Table API, one could do format('wmf-event-json') or something. When using DataStream API, one could do new EventJsonRowSerializationSchema(rowTypeInfo) to get the SerializationSchema needed for a Sink.

David, assigning this to you! Am here to help but don't have a lot of time over the next few weeks.

Change 810387 had a related patch set uploaded (by DCausse; author: DCausse):

[wikimedia-event-utilities@master] Add JsonRowSerializationSchema and helper methods to create a KafkaSink

https://gerrit.wikimedia.org/r/810387

Change 804299 abandoned by Ottomata:

[wikimedia-event-utilities@master] WIP - use JsonEventGenerator in a Flink Deserializer

Reason:

https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/810387 is better

https://gerrit.wikimedia.org/r/804299

Change 810387 merged by jenkins-bot:

[wikimedia-event-utilities@master] Add JsonRowSerializationSchema and helper methods to create a KafkaSink

https://gerrit.wikimedia.org/r/810387

Oo, I +2ed, and it merged. @dcausse hope that's okay.

Shall I make a release?

Moving to done as I believe that wikimedia-event-utilities has now the basic functionalities required to read&write events in streams managed by the event platform without having to write too much boiler plate, doc: https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/README.md.