Page MenuHomePhabricator

[Shared Event Platform] Ability to use Event Platform streams in Flink without boilerplate
Closed, ResolvedPublic

Description

User Story
As a data engineer, I want to be able to use the existing Event Platform based streams declared in EventStreamConfig without having to manually write POJOs or case classes for every stream and schema.
Why?
  • It will be a common need to consume existing streams so creating a utility removes the need for event service creators to do this themselves.
Timebox:
  • 1 week?
Done is:
  • Simple POC service can use event schema converter for Flink Table API
  • Stretch: Make is possible to use streams in DataStream API too. (This may just be converting Table API DataType Schemas to Tuple based TypeInformation schemas.)

Event Timeline

Ottomata renamed this task from [Shared Event Platform] Utility to Convert Existing Streams to POJOs to [Shared Event Platform] Ability to use Event Platform streams in Flink without boilerplate.May 16 2022, 1:27 PM
Ottomata updated the task description. (Show Details)

Writing up findings here.

There are so many levels of typing in Flink it can be pretty hard to grok it all. Here's what I've learned so far:

Table API
  • DataType - Highest level Table API type abstraction. Has a 'LogicalType' as well as hints that are used by parts of flink for sinks sources, serde, etc. Usually constructed using the DataTypes factory class. You can use a DataType to construct a Table API Schema.
  • LogicalType - Like a SQL data type. Wrapped by DataType, so ideally won't have to think about this too much. DataTypes.ROW(...) Will construct a DataType with a logical RowType.
  • Schema - Describes a specific schema for a table (batch or streaming). Flink Implementation specifics get filled in here, like watermarks. The easiest way to make a Schema is to do Schema.Builder.fromRowDataType(rowDataType).(...).build(); You use a Schema when instantiating a TableDescriptor
  • TableDescriptor and CatalogTable - a table for a specific source or sink table. Abstraction for instantiating implementations of dynamic tables. These always take a Schema.
  • RowData - instances of implementations of RowData are the actual rows, that have type RowType (a LogicalType).
  • Table - Finally! The main Table API interface. This has a Schema and a DynamicTableSource, describing how to read data from the source as the described Schema, making it possible to query the source data with SQL.
DataStream API
  • TypeInformation - wraps lower level Java types with Flink typing information. Used by Flink to do lots of fancy automatic stuff, like serialization and comparing instances. You can have a TypeInformation<MyPojo> or more useful for us TypeInformation<Row>
  • Row a DataStream API version of RowType. The fields must be in a specific order here. There is an interface to refer to fields by name, but ultimately they are stored and accessed by position.
Conversion

These are the best docs I've found for converting between Table API and DataStream API.

In my test, when I do

# Table testEventsTable  = ... # a Table with Schema that is a RowType
DataStream<Row> testEventsDataStream = stEnv.toDataStream(testEventsTable);
#testEventsDataStream.getType() -> TypeInformation<Row>

So, there is internal support for converting from a Table with a Schema of RowType, to a DataStream with a TypeInformation of a Row!

It would be easy and possible to adapt this JsonRowSchemaConverter to work with Event Platform too. I think we'd also need to create our own version of JsonRowDeserializationSchema to be able to instantiate DataStream<Row> directly. (We need our own version because the built in one is deprecated, and we need to handle some special JSONSchema conventions in Event Platform, like Map types.).

But, do we need to? I believe it is possible to start with the Table API and then immediately convert to DataStream if needed. We could even provide an abstraction via the Table API to instantiate a DataStream<Row>. TBD.

Change 791646 had a related patch set uploaded (by Ottomata; author: Ottomata):

[wikimedia-event-utilities@master] Add JsonSchemaConverter for the Flink Table API.

https://gerrit.wikimedia.org/r/791646

I believe it is possible to start with the Table API and then immediately convert to DataStream if needed

Actually, I think this is cleaner than implementing our own JsonSchemaConverter for the DataStreamm API directly. In the docs:

The table runtime will make sure to properly serialize the output records to the first operator of the DataStream API.
Afterward, the type information semantics of the DataStream API need to be considered.

After that, the docs have 'legacy' information about old says to e.g. "Convert a Table into a DataStream".

@gmodena if you do need to use the DataStream API for T307959: [Event Platform] Design and Implement realtime enrichment pipeline for MW page change with content, then we should experiement with this to see if it is true. I have successfully done the conversion and printed the output DataStream<Row>, so that seems to work! In Java at least, I didn't spend enough time understanding how to actually do any tranformations of the DataStream, seeing as in Java it is really nasty!

We should experiment with how to e.g. map Table -> DataStream<Row> -> Enriched DataStream<Row> (with different underlying schema) -> Table again.

cc also @JAllemandou and @dcausse.

Some more thoughts.

I spent some time looking into a custom format and/or Table Source for WMF Event Platform streams in Kafka.

This will work! But, as I got close to making one (I copy/pasted the DebeziumJson and removed irrelevant bits), I realized that what I really was trying to do was not to so much make a custom format, but I wanted a custom 'Catalog' for streams declared in EventStreamConfig. Implementing a custom Catalog doesn't seem trivial. However, Flink does support the use of persisting Dynamic Table metadata in Hive Metastore. I got to wondering...

what if we just maintained a 'kafka_eventstreams" Hive database, and just synchronized EventStreamConfig streams to Flink Dynamic Tables in Hive. (Or...use Hive as the cananical place to declare EventStreams??????)

We could even keep them 'evovled' in Hive the same way that we do for the regular Hive event tables. We'd have a job that:

  • Used eventutilities to get all streams and schemas.
  • Used this Flink JsonSchemaConverter to get the corresponding Flink Table Schemas.
  • Created the Dynamic Table in the Hive metastore with the schema and connection information persisted.

Then, when someone wanted to use Flink Table/SQL API to read from Kafka, they'd just have to register the Hive catalog. They'd have access to the Dynamic Table directly through the usual Table/SQL API.

Complications to this idea:

  1. It seems properties like consumer group.id and scan.startup.mode (offset reset) are part of the Kafka Table properties. So, I'm not sure if this quite works for the generic use case...it is almost as if each declared table is per consumer?
  2. AFAICT, the Kafka Table connector only works with one topic at a time. I suppose this would be fine if we ever refactor the MW event streams to be entity based, but as is we have 2 topics (eqiad,codfw) for each change type, and then also many change type 'streams' (create, delete, etc.).

Actually, that last point might make be a problem for us and Flink no matter what. I suppose we'll need to join a bunch of the change type streams together anyway, but maybe that's more annoying with the Table API than the DataStream API?

... Perhaps given Complication 1., this Hive Catalog idea won't really work. We could do a custom catalog, or perhaps a custom Event Platform DynamicTableSource based on the Kafka source is the way to go.

I think what I want would be something similar to how I would expect Confluent Schema Registry integration to work. However, according to this, it looks like even the Flink confluent-avro Table format is requiring users to supply a table schema DDL!

I'm sure that by implementing our own event platform format, we could work around this and lookup and convert the schema via EventStreamConfig, but the DDL would look weird, it'd be something like:

CREATE TABLE mediawiki_revision_create_stream (/*no fields*/)
WITH (
'connector' = 'kafka',
'format' = 'wmf-event-stream-json',
'stream-name' = 'mediawiki.revision-create',
...
)

I think this would work, with no fields provided meaning we should lookup the schema rather than use the one provided in the DDL.

AFAICT, the Kafka Table connector only works with one topic at a time

This is incorrect. I assumed this based on the name of the option, but reading code, the topic kafka connector option can be a list.

Also, I think the way forward is to implement our own version of KafkaDynamicTableFactory that handles looking up the schema by stream name, and then just returning a KafkaDynamicSource using JsonRowDataDeserializationSchema for the value deserialization.

implement our own version of KafkaDynamicTableFactory

Everything is always harder than it looks! KafkaDynamicTableFactory makes heavy use of KafkaConnectorOptionsUtil, which is not accessible outside of the package.

However, I can do like Joseph and I have done elsewhere and declare a KafkaEventDynamicTableFactory extends KafkaDynamicTableFactory in the org.apache.flink.streaming.connectors.kafka.table package, giving me access to all the stuff I need. I have to copy and paste a few private methods from KafkaDynamicTableFactory, but not too bad.

Doing all this allows me to use event-utilities to look up the schema and JsonSchemaConverter to get the Flink RowType of the stream. I should also be able to automatically set format=json and topics too!

WIP code
(So far is just compiling, need to verify that this actually runs!)

implement our own version of KafkaDynamicTableFactory

Everything is always harder than it looks! KafkaDynamicTableFactory makes heavy use of KafkaConnectorOptionsUtil, which is not accessible outside of the package.

However, I can do like Joseph and I have done elsewhere and declare a KafkaEventDynamicTableFactory extends KafkaDynamicTableFactory in the org.apache.flink.streaming.connectors.kafka.table package, giving me access to all the stuff I need. I have to copy and paste a few private methods from KafkaDynamicTableFactory, but not too bad.

Doing all this allows me to use event-utilities to look up the schema and JsonSchemaConverter to get the Flink RowType of the stream. I should also be able to automatically set format=json and topics too!

WIP code
(So far is just compiling, need to verify that this actually runs!)

Spent some time writing some tests and trying to make KafkaEventDynamicTableFactory (via 'connecter' = 'wmf-kafka-event') work. I think I'm giving up. I had thought that since the built in KafkaDynamicTableFactory gets the DataType from the catalog context, (the schema provided in the CREATE TABLE statement), that I could just override that and look up the JSONSchema and use it to provide the DataType automatically. I have a feeling that this should work, but I have not yet been successful. I got pretty deep. Maybe if I spend some time looking through this with @JAllemandou I could find a way?

The next best option is going to be to provide some wrapper tooling to look up schemas by stream name and return the a Schema.Builder with DataType. Unfortunetly, this means the tooling won't work in pure SQL mode. At least I know this will work, since I was able to do a version of this during the hackathon last fall.

Change 799359 had a related patch set uploaded (by Ottomata; author: Ottomata):

[wikimedia-event-utilities@master] WIP - Create KafkaEventDynamicTableFactory to ease integration with Event Platform

https://gerrit.wikimedia.org/r/799359

Change 800754 had a related patch set uploaded (by Ottomata; author: Ottomata):

[wikimedia-event-utilities@master] Add Flink EventTableDescriptorBuilder

https://gerrit.wikimedia.org/r/800754

Okay, I got a really nice wrapper going over in event-utilities. It uses EventStreams to get most of the boilerplate needed to make a Flink TableDescriptor (which are used to instantiate Flink Tables), including schemas and Kafka topics.

@gmodena I added an example to this Draft MR in the mediawiki-stream-enrichment repo. I haven't tested that change in prod yet, but I believe it should work!

Change 799359 abandoned by Ottomata:

[wikimedia-event-utilities@master] WIP - Create KafkaEventDynamicTableFactory to ease integration with Event Platform

Reason:

https://gerrit.wikimedia.org/r/799359

Change 791646 merged by jenkins-bot:

[wikimedia-event-utilities@master] Add JsonSchemaConverter for the Flink Table API.

https://gerrit.wikimedia.org/r/791646

Change 800754 merged by jenkins-bot:

[wikimedia-event-utilities@master] Add Flink EventTableDescriptorBuilder

https://gerrit.wikimedia.org/r/800754

Change 802628 had a related patch set uploaded (by Ottomata; author: Ottomata):

[wikimedia-event-utilities@master] Default properties.auto.offset.reset to latest when setting up kafka flink table

https://gerrit.wikimedia.org/r/802628

Change 802628 merged by jenkins-bot:

[wikimedia-event-utilities@master] Default properties.auto.offset.reset to latest when setting up kafka flink table

https://gerrit.wikimedia.org/r/802628

Ottomata moved this task from In Progress to Done on the Data-Engineering-Kanban board.

@lbowmaker I'm okay with calling this task done for now. There will certainly be more work to do, but the basics work!

Change 802776 had a related patch set uploaded (by Ottomata; author: DCausse):

[wikimedia-event-utilities@master] Add withKafkaTimestampAsWatermark as separate flink table builder method

https://gerrit.wikimedia.org/r/802776

Change 802776 merged by jenkins-bot:

[wikimedia-event-utilities@master] Add withKafkaTimestampAsWatermark as separate flink table builder method

https://gerrit.wikimedia.org/r/802776