Page MenuHomePhabricator

Add better support for using Event Platform streams with the Flink DataStream API
Closed, ResolvedPublic

Description

In T308356: [Shared Event Platform] Ability to use Event Platform streams in Flink without boilerplate, we made it possible to get Tables and DataStream[Row] of WMF event streams. However, in order to do transformation on the DataStream API level we need a TypeInformation[Row] version of the output of any transformation. We really only need this when converting back up to the Table API. To do this, we can again rely on the event JSONSchemas, but we need to convert the JsonSchema to this TypeInformation[Row].

Flink actually has a built in class to do this: JsonRowSchemaConverter. However, it does not support our map types convention. Also, the JsonSchemaConverter we wrote in T308356 does not convert to some types that JsonRowSchemaConverter does, e.g. Timestamps.

We need compatible converter implementations for both Table API DataTypes/Schemas as well as DataStream API TypeInformation[Row]. This should be pretty easy to do.

Once we have this, we should be able to do DataStream level transforms and then convert back to Table API like this:

outputJsonSchema: ObjectNode = // ...
val outputTypeInfo: TypeInformation[Row] = JsonSchemaConverter.toTypeInformationRow(outputJsonSchema)

val outputStream: DataStream[Row] = inputDataStream.map(row => {
    val newRow = Row.withNames(row.getKind)
    row.getFieldNames(true).forEach(fieldName => {
        newRow.setField(fieldName, row.getField(fieldName))
    })
    newRow.setField("new_text_field", "new value")
    newRow
})(outputTypeInfo)

val outputTable: Table = tableEnv.fromDataStream(outputStream)

Event Timeline

Ottomata renamed this task from Add better support for using Event Platform streams with the DataStream API to Add better support for using Event Platform streams with the Flink DataStream API.Jun 9 2022, 5:19 PM
Ottomata claimed this task.
Ottomata created this task.
Ottomata updated the task description. (Show Details)
Ottomata updated the task description. (Show Details)
Ottomata edited projects, added Data-Engineering, Data-Engineering-Kanban; removed Epic.
Ottomata added a project: Event-Platform.

Change 804614 had a related patch set uploaded (by Ottomata; author: Ottomata):

[wikimedia-event-utilities@master] WIP - Refactor JsonSchemaConverter to allow for pluggable type converters

https://gerrit.wikimedia.org/r/804614

I think this is coming along nicely! Many thanks to @Gehel for the help on this one.
https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/804614

Still WIP, but basic functionality works. Example in the description works just fine, and is tested in TestJsonSchemaFlinkConverter.testFlinkTableIntegration.

I'd like to add some abstraction on top of this to make it as easy to get the TypeInformation of an event stream as it is to get a DataType now. Something like EventTableDescriptorBuilder, but for DataStream API. That'll be a different patch tho :)

A cool thing about 804614 is that it can also be used for Spark StructType JsonSchema conversion too! We'll be able to very easily implement a Spark DataTypeConverters that does what this code does but by simply implementing the individual type conversions, rather than iterating over the JSONSchema yet again.

Looking into how to use the TypeInformation<Row> we can now get more directly with DataStream API, rather than starting with a Table.

JsonRowDeserializationSchema works, but is deprecated. Example:

EventStreamFactory eventStreamFactory = builder.getEventStreamFactory();
EventStream eventStream = eventStreamFactory.createEventStream(streamName);

TypeInformation<Row> typeInfo = (TypeInformation<Row>)JsonSchemaFlinkConverter.toTypeInformation(
    (ObjectNode)eventStream.schema()
);

DeserializationSchema<Row> deserializationSchema =
    new JsonRowDeserializationSchema.Builder(typeInfo)
        .build();

String exampleEventString = eventStream.exampleEvent().toString();

Row exampleEventRow = deserializationSchema.deserialize(exampleEventString.getBytes(StandardCharsets.UTF_8));
System.out.println("EXAMPLE ROW\n" + exampleEventRow);

prints

EXAMPLE ROW
+I[/test/event/1.1.0, +I[null, null, null, 2019-01-01T00:00:00Z, null, test.event.example], specific test value, 2, {key1=val1, key2=val2}, null]

Once we have a JsonRowDeserializationSchema for an event JSONSchema, we should be able to use it with any Flink DataStream Source.

So, we should probably copy JsonRowDeserializationSchema into wikimedia event-utilties and use it non-deprecated from there. We'll need to make a modified copy of JsonRowSerializationSchema anyway as part of T310218: Flink output support for Event Platform events to be able to serialize using JsonEventGenerator.

Change 804614 merged by jenkins-bot:

[wikimedia-event-utilities@master] Refactor JsonSchemaConverter to allow for pluggable schema conversions.

https://gerrit.wikimedia.org/r/804614

Alright, @gmodena @dcausse https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/806319 is ready for initial review. Functionality works, please give me any tips you have on code structure or anything else. Some usage examples can be found in TestEventDataStreamFactory#testKafkaSourceBuilder and TestEventDataStreamFactory#testJsonStringToRow (MapFunction for testing).