In https://phabricator.wikimedia.org/T318859 and https://phabricator.wikimedia.org/T318856 we explored using python and SQL apis to develop stateless services for content enrichment patterns.
SQL queries can select from, and insert into, Kafka topics provided they are registered as tables.
For example, the following DDL would register the eqiad.eventgate-main.test.event topic as testevent.
CREATE TABLE testevent ( test string ) WITH ( 'connector' = 'kafka', 'topic' = 'eqiad.eventgate-main.test.event', 'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
The topic can than be queried as
select test from testevent;
Declaring DDL for for all tables/topics is cumbersome. It would lead to code duplication, overhead for managing schema changes, and ad-hoc data validation before producing messages.
We should leverage on existing json schema registries and
allow for automatic table discovery. This already happens for Scala and python jobs that depend on
eventutilities (e.g. EventTableDescriptorBuilder) to configure streams. SQL jobs should follow the same approach.
Success criteria
- eventutilities is integrated with a Flink Catalog that automatically exposes kafka topics
- It is possible to consume events from Kafka
- It is possible to produce events into Kafka
Options we considered
Some (non mutually-exclusive) options we considered:
- Implement a SQL client that executes statements in a job that initializes EventTableDescriptorBuilder. Basic functionality for this has already been implemented in scala/python, we should be mindful of how to manage a stream config. This approach would require providing (and managing) a config that declares all streams that need to be exposed.
- Implement a user-defined Flink Catalog atop eventutilities.
- Integrate with Datahub.
Other integrations
Things to consider:
- Enrichment services need to access external sources, for instance HTTP endpoints (e.g. Action API). Those too could be exposed in a Catalog. This aspect, however, is out of scope for the current task.
- How would a Catalog integrate with Hive / Iceberg?