Page MenuHomePhabricator

[Shared Event Platform] - Research Flink Changelog semantics to inform POC MW schema design
Closed, ResolvedPublic

Description

T307959: [Event Platform] Design and Implement realtime enrichment pipeline for MW page change with content intends to use Flink to create a new 'mediawiki page content change' stream that can be used to propagate MW page content state changes, AKA edits.

T308017: Design Schema for page state and page state with content (enriched) streams is the task for designing the schema for this (and other) mediawiki state change streams.

Flink's Dynamic Tables have the concept of changelog streams, which are a codified way of representing changes to an entity
s (e.g. page) state as a stream. These changelog streams can then be used to automatically represent a materialized view of the entity, e.g. the current state.

This task will be used to research how we might represent state change data with our event schemas so that they are automatically useable as changelog streams in Flink.

Hopefully, the outcome will be a convention for all future state change event schemas we might design, including the one described in T308017.

Event Timeline

Specifically, we should see if we can make our state change streams work with the Upsert Kafka Connector.

From what I can tell, to do this the underlying DataStream needs to be of type Row with the RowKind on the Row set to something like INSERT or UPDATE_AFTER, or DELETE which allows Flink to know what kind of upsert the record is. E.g. for page delete events, the stream should have a Row with RowKind.DELETE set.

Since we will be creating the stream for T307959 in Flink, I think it will be easy to write the events into an upsert-kafka sink, and this may be mostly taken care of for us. However, it'd be nice if we had a consistent data model that would make sense outside of Flink too. (We should probably emit a mediawki.page_change stream directly from MW somehow). I'm sure we can accomplish this somehow by extending some Flink APIs, but I had a really hard time doing that in T308356. Or, we might be able to just use one of the existing changelog formats, like Debezium. This would mean we'd emit state change events that are compatible with Debezium's state change model.

Okay, todays findings:

  • upsert-kafka connector also needs a primary key set, as well I belive the Kafka key to be set properly. I'm not sure how this works yet.
  • All that is needed to use a changelog Table is to make sure that the RowKind is set when the RowData is deserialized from the json.
  • Debezium does this by it's op field.
  • I believe we could implement very slim custom format serdes that wrap the JsonRowData ones, but end up setting the RowKind on the RowData record. Basically wrap JsonRowDataDeserializationSchema.deserlialize with something that knows how to inspect the RowData and set the RowKind. Something like:
@Override
public RowData deserialize(@Nullable byte[] message) throws IOException {
   RowData rowData = jsonRowDataDeserializationSchema.deserialize(message);
   // We implement getRowKind to inspect rowData and infer the kind it is.  
   rowData.setRowKind(getRowKind(rowData));
   return rowData;
}

If we make a nice repeatable convention for the kind of change an ecent is (upsert, delete) for all of our schemas, then getRowKind should be relatively easy to implement.

Just discussed a bit with @dcausse. I think we know enough now to make a decision on a new Event Platform schema convention. We need a field that can be used to determine the type of change the state change event is. I believe we can make a new enum field that any state change event schema should have. This field shoudl be easily be mapped to Flink's RowKind.

There are 4 types RowKind change events in Flink:

  • DELETE
  • INSERT
  • UPDATE_BEFORE - Update operation with the previous content of the updated row
  • UPDATE_AFTER - Update operation with new content of the updated row

When dealing with UPDATE_BEFORE and UPDATE_AFTER Debezium actually emits two Flink Rows from a single message, one with UPDATE_BEFORE and one with UPDATE_AFTER. I think we can also accomplish this using our current Event Platform state change model.

Debezium really only has 3 state change types: INSERT, UPDATE, DELETE and I think we should probably do the same, and let the UPDATE_BEFORE and UPDATE_AFTER be a Flink Deserializer specific detail. So, how about this for a the new field:

change_kind:
  description: State change kind in a changelog.  One of insert, update or delete. 
  type: string
  enum: [insert, update, delete]

We'd document this convention at https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#Modeling_state_changes, and also perhaps add a reusable fragment for it.

Thoughts, opinions? Naming bikesheds? :)

Additionally, we could adopt some kind of convention for designating a 'primary key' in the event schema, but since that will be more about annotating the schema (or stream config) rather than having to add new fields to a schema, we don't need to decide on how to do this now.

Q: does this field belong in meta? Probably yes, but if I could go back now I would totally get rid of this meta object and rely on top level field conventions. $schema is not in meta. domain is, but it probably shouldn't be? meta.dt should not be in meta? (In case you can'tn tell, I don't like meta).

Note to self: document a process for making official proposals to event platform schema guidelines and conventions.

Change 807565 had a related patch set uploaded (by Ottomata; author: Luke Bowmaker):

[schemas/event/primary@master] WIP - Add new mediawiki entity fragments, and use them in new mediawiki page change schema

https://gerrit.wikimedia.org/r/807565

Change 807565 merged by jenkins-bot:

[schemas/event/primary@master] Add new mediawiki state entity and change fragments, and use them in new mediawiki page change schema

https://gerrit.wikimedia.org/r/807565