Page MenuHomePhabricator

Migrate the WDQS streaming updater from FlinkKafkaConsumer/Producer to KafkaSource/Sink
Closed, ResolvedPublic

Description

FlinkKafkaConsumer and FlinkKafkaProducer are now deprecated, KafkaSource and KafkaSink should be used instead.
Unfortunately as these operators do store a state some migration steps have to be respected:

  • the job must be stopped with a savepoint and offsets committed to kafka
  • the job must be started from this savepoint with most probably --allowNonRestoredState set to true

Given the risk of failures it might be wise to do this in two steps:

  • add support for KafkaSink/KafkaSource in the codebase with an option to flip between the two implemations
  • drop usage of FlinkKafkaConsumer and FlinkKafkaProducer once the job is proven to run properly with KafkaSink/KafkaSource

AC:

  • FlinkKafkaConsumer and FlinkKafkaProducer usage are dropped from the codebase

Event Timeline

Gehel triaged this task as Medium priority.Jan 16 2023, 4:09 PM
Gehel moved this task from Incoming to Tech Debt on the Wikidata-Query-Service board.

Change 958535 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/rdf@master] Add an option to use newer KafkaSource/Sink flink APIs

https://gerrit.wikimedia.org/r/958535

Change 958947 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/rdf@master] Drop deprecated FlinkKafka Consumer/Producer

https://gerrit.wikimedia.org/r/958947

Change 959217 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/deploy@master] flink-job: add support for a save and cancel action

https://gerrit.wikimedia.org/r/959217

Change 959218 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/deploy@master] flink: start using KafkaSource/Sink

https://gerrit.wikimedia.org/r/959218

Change 959700 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/rdf@master] Drop kafka offsets support in bootstrap and state extraction jobs

https://gerrit.wikimedia.org/r/959700

Change 959700 merged by jenkins-bot:

[wikidata/query/rdf@master] Drop kafka offsets support in bootstrap and state extraction jobs

https://gerrit.wikimedia.org/r/959700

Change 958535 merged by jenkins-bot:

[wikidata/query/rdf@master] Add an option to use newer KafkaSource/Sink flink APIs

https://gerrit.wikimedia.org/r/958535

Change 961101 had a related patch set uploaded (by Bking; author: DCausse):

[wikidata/query/deploy@master] deploy version 0.3.130

https://gerrit.wikimedia.org/r/961101

Change 959217 merged by DCausse:

[wikidata/query/deploy@master] flink-job: add support for a save and cancel action

https://gerrit.wikimedia.org/r/959217

Change 961101 merged by DCausse:

[wikidata/query/deploy@master] deploy version 0.3.130

https://gerrit.wikimedia.org/r/961101

Change 959218 merged by DCausse:

[wikidata/query/deploy@master] flink: start using KafkaSource/Sink

https://gerrit.wikimedia.org/r/959218

Change 961137 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/deploy@master] flink-job.py: add support for allowNonRestoredState

https://gerrit.wikimedia.org/r/961137

Change 961144 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/rdf@master] Kafka sources should rely on existing group offsets

https://gerrit.wikimedia.org/r/961144

Change 961144 merged by jenkins-bot:

[wikidata/query/rdf@master] Kafka sources should rely on existing group offsets

https://gerrit.wikimedia.org/r/961144

Change 962050 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/rdf@master] Use different names for KafkaSink and FlinkKafkaProducer

https://gerrit.wikimedia.org/r/962050

Change 962050 merged by jenkins-bot:

[wikidata/query/rdf@master] Use different names for KafkaSink and FlinkKafkaProducer

https://gerrit.wikimedia.org/r/962050

Change 963057 had a related patch set uploaded (by DCausse; author: DCausse):

[operations/deployment-charts@master] rdf-streaming-updater: take a savepoint manually

https://gerrit.wikimedia.org/r/963057

Change 963057 merged by jenkins-bot:

[operations/deployment-charts@master] rdf-streaming-updater: take a savepoint manually

https://gerrit.wikimedia.org/r/963057

Change 963105 had a related patch set uploaded (by DCausse; author: DCausse):

[operations/deployment-charts@master] rdf-streaming-updater: simplify parallelism and use newer kafka APIs

https://gerrit.wikimedia.org/r/963105

Change 963105 merged by Bking:

[operations/deployment-charts@master] rdf-streaming-updater: simplify parallelism and use newer kafka APIs

https://gerrit.wikimedia.org/r/963105

After upgrading the test job to the new kafka connector APIs I now hit this problem:

  • org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.

Consistently when doing the second checkpoints, the first checkpoint always succeed.

The new kafka connectors seem to do low level interactions with the kafka-clients (using java introspection to manipulate some states), looking further it appears that we force the use of kafka-clients 2.4.1 while the flink connector expects 3.2.3. Upgrading to this version and testing with yarn appears to fix the issue mentioned above.

Change 963322 had a related patch set uploaded (by DCausse; author: DCausse):

[wikidata/query/rdf@master] Use kafka-clients 3.2.3 and set client.id.prefix

https://gerrit.wikimedia.org/r/963322

Change 963322 merged by jenkins-bot:

[wikidata/query/rdf@master] Use kafka-clients 3.2.3 and set client.id.prefix

https://gerrit.wikimedia.org/r/963322

Change 963383 had a related patch set uploaded (by DCausse; author: DCausse):

[operations/deployment-charts@master] rdf-streaming-updater: bump to 0.3.135

https://gerrit.wikimedia.org/r/963383

Change 963383 merged by jenkins-bot:

[operations/deployment-charts@master] rdf-streaming-updater: bump to 0.3.135

https://gerrit.wikimedia.org/r/963383

Change 961137 merged by DCausse:

[wikidata/query/deploy@master] flink-job.py: add support for allowNonRestoredState

https://gerrit.wikimedia.org/r/961137

all jobs have been restarted to use newer kafka apis