Page MenuHomePhabricator

Flink SQL queries should access Kafka topics from a Catalog
Closed, ResolvedPublic

Description

In https://phabricator.wikimedia.org/T318859 and https://phabricator.wikimedia.org/T318856 we explored using python and SQL apis to develop stateless services for content enrichment patterns.

SQL queries can select from, and insert into, Kafka topics provided they are registered as tables.
For example, the following DDL would register the eqiad.eventgate-main.test.event topic as testevent.

CREATE TABLE testevent (                                                                                          
  test string ) WITH (                                                                                                                    
 'connector' = 'kafka',                                                                                              
 'topic' = 'eqiad.eventgate-main.test.event',                                                                              
 'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092',                                                      
 'properties.group.id' = 'testGroup',                                                                                  
 'scan.startup.mode' = 'earliest-offset',                                                                              
 'format' = 'json'                                                                                                         
);

The topic can than be queried as

select test from testevent;

Declaring DDL for for all tables/topics is cumbersome. It would lead to code duplication, overhead for managing schema changes, and ad-hoc data validation before producing messages.

We should leverage on existing json schema registries and
allow for automatic table discovery. This already happens for Scala and python jobs that depend on
eventutilities (e.g. EventTableDescriptorBuilder) to configure streams. SQL jobs should follow the same approach.

Success criteria

  • eventutilities is integrated with a Flink Catalog that automatically exposes kafka topics
  • It is possible to consume events from Kafka
  • It is possible to produce events into Kafka

Options we considered

Some (non mutually-exclusive) options we considered:

  1. Implement a SQL client that executes statements in a job that initializes EventTableDescriptorBuilder. Basic functionality for this has already been implemented in scala/python, we should be mindful of how to manage a stream config. This approach would require providing (and managing) a config that declares all streams that need to be exposed.
  1. Implement a user-defined Flink Catalog atop eventutilities.
  1. Integrate with Datahub.
Other integrations

Things to consider:

  1. Enrichment services need to access external sources, for instance HTTP endpoints (e.g. Action API). Those too could be exposed in a Catalog. This aspect, however, is out of scope for the current task.
  2. How would a Catalog integrate with Hive / Iceberg?

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript

Implement a user-defined Flink Catalog atop eventutilities.

I'm partial to this one if it is possible!

Some thoughts and trials of implementing a Flink Event Platform catalog here and in some comments below.

I was able to implement a flink catalog that acts as an options passthrough to the built-in kafka connector and uses eventutilities to dynamically create the tables with schemas and with sensible defaults. So something like

CREATE CATALOG wmfeventcatalog WITH (
	'type' = 'wmfeventcatalog',
	'properties.group.id' = 'catalog-test'
);

Will let you use a table like eventgate-main.test.event as if you made it like

CREATE TABLE `eventgate-main.test.event` (
	$schema STRING,
	meta ROW<...>,
	test STRING,
	test_map MAP<STRING, STRING>
) WITH (
	'connector' = 'kafka',
        'format' = 'json',
        'topic' = 'eqiad.eventgate-main.test.event;codfw.eventgate-main.test.event',
	'properties.group.id' = 'catalog-test',
        'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092',  
	'scan.startup.mode' = 'latest-offset',
	'json.timestamp-format.standard' = 'ISO-8601'
);

It works surprisingly well for select statements, and by extending the built-in in-memory catalog it can also use UDFs from the pyflink experiments.

However, once you try to insert something, it gets a bit messy. The kafka connector only allows you to sink to one topic, so for topics with eqiad and codfw, there has to be a way to select between them. The catalog has no concept of a source or sink, so this would have to happen at the DynamicTableFactory level or lower. Still have to figure that one out.

That's not a massive problem right now, and you can actually bypass this issue by overriding the topic by using SQL Hints.

INSERT INTO `eventgate-main.test.event` /*+ OPTIONS('topic'='eqiad.eventgate-main.test.event') */ SELECT '/test/event/1.0.0', ROW(...), 'test_from_catalog', MAP['test_key', 'test_val'];

However, now there's a UX problem. When inserting you need the $schema and meta field, which is annoying to fill out say the least. There is probably a way to perform a transformation to populate those fields before the data is sunk, but I haven't looked into it yet. (Seems like another future task)

gmodena renamed this task from [NEEDS GROOMING] Flink SQL queries should access Kafka topics from a Catalog to Flink SQL queries should access Kafka topics from a Catalog.Nov 23 2022, 11:20 AM
gmodena updated the task description. (Show Details)
gmodena updated the task description. (Show Details)

However, once you try to insert something, it gets a bit messy. The kafka connector only allows you to sink to one topic, so for topics with eqiad and codfw, there has to be a way to select between them.

Kafka stretch could maybe help here.

However, now there's a UX problem. When inserting you need the $schema and meta field, which is annoying to fill out say the least. There is probably a way to perform a transformation to populate those fields before the data is sunk, but I haven't looked into it yet. (Seems like another future task)

+1 for a dedicated tasks. How are you handling these fields right now?

Very cool! Code? :)

Kafka stretch could maybe help here.

It will help, but I don't think it will eliminate the need for being able to do this.

Here's the working code so far, sans the stuff I talk about below

I've been looking into two things: How to intercept config options before a sink is created so we can select a topic prefix, and how to intercept the row data before it's sunk so we can automatically insert a meta field.

When someone wants to insert into a topic, I envisioned they would be able to instantiate a catalog with a default dc like

CREATE CATALOG wmfeventcatalog WITH (
	'type' = 'wmfeventcatalog',
	'properties.group.id' = 'catalog-test',
        'sink-server' = 'eqiad'
);

That way, selects can still read from eqiad and codfw but they can insert into their preferred server.

I just typed out what was basically a rant but for brevity I deleted it

This is doable, but not in a way that I like. Extend KafkaDynamicTableFactory and either override the topic passed to it when it instantiates a sink (which involves constructing an entirely new context since the catalog options are read-only), or copy and pasting its implementation of createDynamicTableSink but changing the single line where it picks the topic to sink to

When it comes to intercepting row data, that seems difficult enough that it's probably wrong to do it from this side of Flink. We'd have to drill down to the runtime implementation level, and since we only have access to the sink in this case, it seems like the only time to modify the row data is by hijacking the serialization schema. This seems super wrong. It is definitely easier to just have some prebuilt UDF that can auto-generate the meta data.

The catalog is now able to sink to a specific prefixed topic by overriding the ResolvedCatalogTable before passing it to the Kafka connector

Perhaps the end goal would have a user experience like:

CREATE CATALOG wmfeventcatalog WITH (
	'type' = 'wmfeventcatalog',
	'properties.group.id' = 'catalog-test'
);

that sets default table options like:

'scan.startup.mode' = 'latest-offset'
'topic-prefix' = 'eqiad'

If someone just wanted to read from a topic they can do:

SELECT `page_id` FROM `mediawiki.page-create`;

and the catalog will handle looking up the topic and filling in the table options and all that jazz.
INSERT will work using the default topic prefix option to pick a topic


If a user wanted to edit some option for a managed table, they can do something like:

ALTER TABLE `mediawiki.page-create` SET ('schema'='0.0.1');

Alter table would either alter the manually created table, or if it's a managed table, create a table with the default options merged with the custom ones


If someone wants to explicitly set every option in a table they could do:

CREATE TABLE `test.event.example` ( _placeholder STRING ) WITH (
	'topic_prefix'='codfw',
	...
);

which would still generate the schema automatically but leave everything up to the user.
The difference between this and using ALTER TABLE every option is that there are no defaults and you can use different connectors which will have different options validation


And if someone wanted to create a table with a topic schema but give it a different name so it's not overriding the built-in ones, maybe something like

CREATE TABLE `custom_table` ( _placeholder STRING ) WITH (
	'topic'='test.event.example',
	...
);

Yes I like it!

CREATE TABLE test.event.example

Should we make sure that CREATED 'custom' tables don't use the same database.name ('object path' as Flink calls it) as the managed ones? Or hm, I suppose if we can 'alter' the managed ones, it would be okay to drop and create a table with the same name but totally different settings too?

ALTER TABLE mediawiki.page-create SET ('schema'='0.0.1');

BTW, it technically should be fine to always use the latest schema when reading, so we might not need to support this just for reading (if not supporting it makes it easier to implement :) ). When writing we need to know the exact schema version a user wants though.

Some updates after some discussion

  • We decided to not implement a custom dynamic table factory. This means that we won't be able to distinguish between sources and sinks.
  • Tables created with the kafka connector will default to our custom event-json format unless specified otherwise (actually, should kafka just be the default for created tables?)

Here's some implications:

  • We won't officially support writes to dynamically generated tables, although writes might still work if the event stream just so happens to have a single kafka topic of the same name
  • You cannot specify a sink topic anymore
CREATE TABLE `test.event.example` ( _placeholder STRING ) WITH (
	'topic_prefix'='codfw',
	...
);

Will not work and users will have to specify the data center explicitly

CREATE TABLE `codfw.test.event.example` ( _placeholder STRING ) WITH (
	...
);
  • Altering an event table to set a topic prefix to sink to will also not work. At this point, the alter table command is just a shorthand to creating a custom table without needing to write a placeholder column.
  • Creating a table with a custom name (and thus needing a topic option) will pass through to the table factory instead of being intercepted by the catalog
CREATE TABLE `custom_table` ( _placeholder STRING ) WITH (
	'topic'='test.event.example', -- Catalog used to convert this to the kafka topics 'topic' = 'eqiad.test.event.example` etc.
	...
);

Will not work and you'd have to do

CREATE TABLE `custom_table` ( _placeholder STRING ) WITH (
	'topic'='eqiad.test.event.example',
	...
);

After testing more, it seems better to explicitly define options event-stream-name and event-stream-prefix instead of trying to derive the stream name and prefix from the table name. We need the unprefixed stream name to look up the schema, and the prefix is only needed if the user is trying to insert into kafka.

I'm also temped to restrict ALTER TABLE to only be used for modifying the dynamically created event tables. Having it work with created tables causes a headache of branching edge cases (Did the schema version change? Change the schema, unless they added custom columns in which we have to account for conflicts. Did the connector change to/from Kafka? Add/remove the topic option, unless the new connector happens to also need a topic option. Did the format change to/from event-json? Remove old format options and prefix/unprefix the new options needed)

After testing more, it seems better to explicitly define options event-stream-name and event-stream-prefix

Sounds good. And we'd still be able to SHOW/SELECT tables from the externally managed (default read-only stream config based) tables? event-stream-name is discovered in stream config, and table name is set from that?

event-stream-prefix

How about event-stream-kafka-topic-prefix, since this is specific to the needs of the Kafka connector when producing?

I'm also temped to restrict ALTER TABLE to only be used for modifying the dynamically created event tables.

Meaning, ALTER TABLE -> CREATE TABLE in in memory cache with same name?

What if...we didn't even support ALTER TABLE of external ables (stream config readonly), but instead only supported CREATE TABLE LIKE of external tables? ALTER TABLE still okay on in memory cached tables?

But...I see your point about e.g. ALTER TABLE 'event-schema-version = <new version>'. Hm. I don't think this is something we really need to support. Once a table is in the cache, I think it'd be okay if you aren't allowed to change its actual table schema by just changing 'event-schema-version', right?

This is so hard to describe through text I just made a miro board to try to logic out all the behavior. Take a look at it if you want and add comments as you see fit

Change 868172 had a related patch set uploaded (by TChin; author: Ottomata):

[wikimedia-event-utilities@master] [WIP] - Add Flink catalog support for WMF Event Platform streams

https://gerrit.wikimedia.org/r/868172

Change 868172 merged by jenkins-bot:

[wikimedia-event-utilities@master] Add Flink catalog support for WMF Event Platform streams

https://gerrit.wikimedia.org/r/868172

Merged and released:

https://archiva.wikimedia.org/repository/releases/org/wikimedia/eventutilities-flink/1.2.5/

I just tried this with flink 1.16 and flink 1.16.1 though, and I got some errors! Flink 1.15 works fine? Will use that for demo for now.