Page MenuHomePhabricator

[Event Platform] mw-page-content-change-enrich should (re)produce kafka keys
Closed, ResolvedPublic

Description

Default Kafka partitioning ensures that messages with the same key will always go to the same Kafka partition. Consumers should be able to consume messages in order for a given page.

We should produce page content change events to Kafka with the wiki_id and page_id in the message key. Producing events with this key will allow us to one day maintain a backfilled compacted Kafka topic with current page state (T324108).

For enrichment jobs, we could consider automating this. If a Kafka message has a key, we could default to Flink partitioning and Kafka sink key-ing using the incoming source key. To do this, we'd need to be able to use a Key and Value Kafka message deserializer via java EventDataStreamFactory, rather than just a Value only deserializer. This could be done as part of this task, or perhaps as another one. Either way:

Done is
  • mw-page-content-change-enrich produces Kafka messages with wiki_id, page_id in Kafka message key.
  • Kafka producer uses key hash partitioning (should be the default? we should double check).

Event Timeline

Ahoelzl renamed this task from mw-page-content-change-enrich should (re)produce kafka keys to [Event Platform] mw-page-content-change-enrich should (re)produce kafka keys.Oct 20 2023, 4:47 PM

Here is a summary of a couple of threads started on the attached MR. Moving to phab for visibility and to document design choices.

Where should a message key be stored?

It seems that the preference is in EventRowTypeInfo.

My initial implementation proposed to couple a kafka record key composer to KafkaRecordSerializationSchema, and make the kafka record key configurable only via EventStreamConfig. This is simple and satisfies my understanding of key semantic expressed in EventStreamConfig.

@dcausse would prefer a more generic approach: when a stream config declares a key, it should stored in EventRowTypeInfo. This would allow us to use the stream key for both kafka records as well as to key Flink DataStreams.

I’m supportive of this approach if it simplifies use cases, provided we document and are explicit about the semantics and meaning of “key”. Right now EventStreamConfig suggests it’s bound to Kafka topics / messages, but here we’d need to document things in more general terms. E.g. also DataStreams not backed by Kafka will carry key semantic in EventRowTypeInfo .

I do like the API that @dcausse proposes. I'm quoting from a gerrit comment:

  1. gives clients a KeySelector and its corresponding TypeInfo such that you could easily have KeyedDataStream using the key definition from the EventStreamConfig:
// the DataStream coming out of e.g. EventDataStreamFactory.kafkaSource 

DataStream<Row> ds = datastreamFactory.kafkaSourceBuilder(...).build();

// The EventRowTypeInfo representing our event 

EventRowTypeInfo datastreamTypeInfo = (EventRowTypeInfo) ds.getProducedType();

// EventRowTypeInfo might be used to extract the key and make the stream keyed with: 
KeyedDataStream<Row, Row> datastream.keyBy(datastreamTypeInfo::extractKey, datastreamTypeInfo.keyTypeInfo());
  1. gives KafkaEventSerializationSchema a way to serialize the key as json
// The EventRowTypeInfo representing our event 

EventRowTypeInfo datastreamTypeInfo = ...; 

// The JsonRowSerializationSchema built on top of the key definition of our event schema 

JsonRowSerializationSchema jsonKeySerializer = new JsonRowSerializationSchema .builder() .withTypeInfo(datastreamTypeInfo.keyTypeInfo()) .withoutNormalization() // it's not an event we do no normalization here .build();

/** * Serialize the key of event, event being the event we want to ship to a kafka stream that is configured to be keyed from its EventStreamConfig */ 

private byte[] getKeyBytes(Row event) { 
     Row key = datastreamTypeInfo.extractKey();
     return jsonKeySerializer.serialize(key); 
}

We should document / spec out the key semantics

Right now keys fields are generated only by eventgate producers. To the best of my knowledge we don’t have a spec for what constitutes a valid key (my search-foo might have failed me). We should document:

  • What field value types are valid. IMHO: String or Long - or allow string values only. I’d be wary of using complex types that don’t have an obvious natural order.
  • How the fields are ordered. IMHO: we should sort keys by natural order, that’s the least ambiguous option. AFAIK that’s not what EventGate does though; it follows insertion order in the message_key_fields attribute (which is non trivial to guarantee across platforms / languages). In practice, maybe we could leave this as a platform specific choice. My assumption here is that, for a given stream, eventgate or eventutilities producers are mutually exclusive.
  • Do we need to express the record key as a Json string? Why not using a csv (key1:value1,key2:value2)? This would make the key not interoperable with eventutilties-flink JsonSchema serializers, but I wonder if it's a level of complexity we'd need to being with.

my search-foo might have failed me

It did not, we have not really used keys before

What field value types are valid. IMHO: String or Long

This is a fine starting point, but I don't think we should ultimately limit ourselves. So, I wouldn't make this a hard rule, but we should recommend this in the guidelines and say that we only support String and Long for now. Or...perhaps we can just support any primitive value? E.g. no Map, Array or Row, but any other primitive is fine?

How the fields are ordered

Indeed, we just need to pick a consistent way and do it. 'Natural' means alphanumeric sorting on keys? I think that makes the most sense to me.

Do we need to express the record key as a Json string?

I believe there will be times that having the key values will be useful, and needing different deserializers for the key and value would be cumbersome, no?

It seems that the preference is in EventRowTypeInfo.

Agree, and when we use EventDataStreamFactory to build the EventRowTypeInfo (and DataStream, or Source, or whatever), that is where we should use EventStreamConfig message_key_fields to set the key, when the EventRowTypeInfo is instantiated. Just saying that EventRowTypeInfo shouldn't know anything about EventStreamConfig, and it should be possible to instantiate EventRowTypeInfo with any TypeInfo for a key. I.e., they should be decoupled, ya?

This is a fine starting point, but I don't think we should ultimately limit ourselves.

IMHO we should put some safeguards in place to avoid accidentally abusing keys. For example, I don't think we want to allow users to set floating points values in a message key.

So, I wouldn't make this a hard rule, but we should recommend this in the guidelines and say that we only support String and Long for now. Or...perhaps we can just support any primitive value? E.g. no Map, Array or Row, but any other primitive is fine?

Do you maybe have any use case in mind where we might want to to serialize Map, Array or Row? IMHO relying on collections to ensure ordering can be an anti pattern. Thinking out loud: what if a Row/Map changes under the hood? How do we handle optional k/v pairs?

Indeed, we just need to pick a consistent way and do it. 'Natural' means alphanumeric sorting on keys? I think that makes the most sense to me.

Yep. In this case it's alphanumeric sorting (ascending).

Agree, and when we use EventDataStreamFactory to build the EventRowTypeInfo (and DataStream, or Source, or whatever), that is where we should use EventStreamConfig message_key_fields to set the key, when the EventRowTypeInfo is instantiated.

Ack. It sounds like we reached consensus on this. Should not be too big a change.

Do you maybe have any use case in mind

No, but I think the Flink library code should avoid caring (I'm sure we violate this in places, but it seems like a good principal) about Event Platform conventions. I'd rather make this restriction at a higher level, perhaps an assertion in in EventDataStreamFactory when instantiating the EventRowTypeInfo?

I could be wrong, but I think the code to create the keyTypeInfo should just create a RowTypeInfo subschema from the EventRowTypeInfo's schema using message_key_fields, right? In that case, EventRowTypeInfo's keyTypeInfo can just be a generic TypeInfo, without any restrictions.

Hm, what if the needed message_key_field is nested in the event? E.g. what if we had to key by revision.rev_id? Should extractKey flatten the nested value, e.g. revision__rev_id or something? Could be simple, but also possibly could result in collisions in rare cases. Perhaps the better thing to do would be just to extract keys and values as is out of the event and set them as the key, even if those keys are nested?

No, but I think the Flink library code should avoid caring (I'm sure we violate this in places, but it seems like a good principal) about Event Platform conventions.

Ack. Maybe to me the boundary is a bit less clear cut (I'm still new on this corner of the code base :)).

I'd rather make this restriction at a higher level, perhaps an assertion in in EventDataStreamFactory when instantiating the EventRowTypeInfo?

Sounds like a good place.
FWIW re DataStream tkey types: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/

A type cannot be a key if:

    it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
    it is an array of any type.

Hm, what if the needed message_key_field is nested in the event? E.g. what if we had to key by revision.rev_id? Should extractKey flatten the nested value, e.g. revision__rev_id or something? Could be simple, but also possibly could result in collisions in rare cases. Perhaps the better thing to do would be just to extract keys and values as is out of the event and set them as the key, even if those keys are nested?

mmm... not sure I follow. Right now message_key_field defines a record <-> event mapping, for example message_key_field = { "rev_id": "revision.rev_id"}. Are you suggesting that we should allow nested keys definitions along the lines of message_key_field = { "revision.rev_id": "revision.rev_id"} ?

Change 968621 had a related patch set uploaded (by Gmodena; author: Gmodena):

[wikimedia-event-utilities@master] eventutilities-flink: add partition key support.

https://gerrit.wikimedia.org/r/968621

message_key_field defines a record <-> event mapping

Oh! I forgot about that. Sorry babytime brain :)

Okay!

Change 968621 merged by jenkins-bot:

[wikimedia-event-utilities@master] eventutilities-flink: add partition key support.

https://gerrit.wikimedia.org/r/968621

Change 983939 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/mediawiki-config@master] wgEventStreams - Add message_key_fields to page_content_change stream

https://gerrit.wikimedia.org/r/983939

Change 983939 merged by jenkins-bot:

[operations/mediawiki-config@master] wgEventStreams - Add message_key_fields to page_content_change stream

https://gerrit.wikimedia.org/r/983939

Mentioned in SAL (#wikimedia-operations) [2023-12-18T20:48:26Z] <otto@deploy2002> Synchronized wmf-config/ext-EventStreamConfig.php: Config: [[gerrit:983939|Add message_key_fields to page_content_change stream (T338231)]] (duration: 06m 32s)

@gmodena and I deployed this and T345806: [Event Platform] mediawiki.page_content_change.v1 topic should be partitioned. today.

We did not add any Kafka partitions to mediawiki.page_change.v1.