Page MenuHomePhabricator

Investigate using Flink as an Event Service Platform
Closed, ResolvedPublicSpike

Description

User Story
As a platform engineer, I need to evaluate Flink so that the group can use this analysis to decide on a single platform to implement
Done is:
  • Wiki Page updated with analysis, including: Pro's, Con's and Recommendations against the criteria

Event Timeline

Restricted Application changed the subtype of this task from "Task" to "Spike". · View Herald TranscriptApr 25 2022, 1:40 PM

A crucial thing I'd like Event Platform libraries and tooling to do is automate boilerplate needed when trying to work with event streaming data. We use JSONSchemas to declare explicit schemas of the JSON data, it'd be really nice if we could use those JSONSchemas to automate any type inference Flink is going to have to do. I wrote a JsonSchemaConverter for the Flink Table API during the DSE hackathon last fall, but I think we need something a little more lower level that will work with Flink typing and serialization.

I've experimented with jsonschema2pojo to auto generate Java POJOs from our JsonSchemas, but the way we do $refs and materialized schemas makes it a little strange (every schema that uses the meta fragment makes a new Meta class. Maybe there's a way around this, but it would almost certainly require code changes in jsonschema2pojo.

I also just played around with this json-schema-to-case-class library...and it looks great. Most importantly, it is written in javascript, and uses the same JSONSchema ref parser library that our jsonschema-tools uses resolve $refs. That means that it shouldn't be difficult to add Scala Case Class generation as part of the 'materialization' step that we run for all schemas! If we do that, we could even publish .jar artifacts of our event schemas and use them as maven dependencies!

My question then is...does Flink handle case class serialization well now? From the docs it seems to be yes? I recall in the past that case classes weren't actually well supported. @dcausse do you know?

I've experimented with jsonschema2pojo to auto generate Java POJOs from our JsonSchemas, but the way we do $refs and materialized schemas makes it a little strange (every schema that uses the meta fragment makes a new Meta class. Maybe there's a way around this, but it would almost certainly require code changes in jsonschema2pojo.

My question then is...does Flink handle case class serialization well now? From the docs it seems to be yes? I recall in the past that case classes weren't actually well supported. @dcausse do you know?

Case classes are supported by default and a generic serializer will be generated by flink on the fly. The main drawback with this generic serializer is that it does not support schema upgrades and thus you have to be careful not using these in flink operators that could store them in the state (sadly the AsyncIO operation is one of them). For our usecase we ended-up writing our own serializers for such case classes but only for those. Plain POJOs supports schema upgrades out of the box in flink but sadly they do not allow you to use the handy scala pattern matching.

Generally speaking I think having some tooling like you describe that would generate data classes (or case classes) would be very nice. For scala case classes that require support for schema upgrades due to flink state a solution has to be found (one could simply be forcing flink to serialiaze such case classes as json again in its state).

Another option I have not explored: JSONSchema to Avro? We could convert the schemas and event stream data into Avro as it comes into Flink? Would that help? (I am mostly ignorant here, just poking around trying to understand.)

@dcausse are the 'schema changes' you are talking about just for Flink managed state?

@dcausse are the 'schema changes' you are talking about just for Flink managed state?

Yes these migrations only happen when loading a savepoint/checkpoint. If the job never starts from an existing savepoint/checkpoint then I guess no migration could happen, but without any state I wonder if the AsyncIO operator is usable without losing inflight events, this is something we might have to test/check.

I wonder if the AsyncIO operator is usable without losing inflight events,

Ah, right, this is probably why Kafka Streams doesn't do async IO. Unless there is some batching of the requests that either all fail or all succeed, right? We can't commit the consumer offset unles all events up to that point have successfully been processed?

For scala case classes that require support for schema upgrades due to flink state a solution has to be found (one could simply be forcing flink to serialiaze such case classes as json again in its state).

After poking around a bit with the Avro stuff, I think this might be the way to go.

Played a little bit more with the json-schema-to-case-class. It works, but will need some modifications to work with some of our conventions, specifically map types. It also generates new case classes for the fragment schemas, but in hindsight that makes sense I think. As long as we provide proper package namespaces for the generated classes, I think it will be okay that e.g. org.wikimedia.mediawiki.revision_create.meta is not the same exact same type as org.wikimedia.mediawiki.page_properties_change.meta.

Played around with Python table API, was able to use the JsonSchemaConverter from scala to get an Event Platform stream Flink Table in Python.

It does seem we should be able to convert the Flink Table into a DataStream, however some type information might need to be provided. If so, I think we can easily write a JsonSchema -> TypeInformation (?) converter, and/or Table.Schema -> TypeInformation converter.

lbowmaker updated the task description. (Show Details)