The reassignment described over at https://phabricator.wikimedia.org/T336044#9206891 seemed to have caused issues for mw-page-content-change-enrich (source). The reassignment started on Sept 28th 1:55PM UTC
@JAllemandou mentioned
We start seeing a drop in events in/out of our app just before 14:00 UTC - this seems to match
The logs they were seeing were of the form
{"@timestamp":"2023-09-28T15:26:47.761Z","log.level": "INFO","message":"Triggering checkpoint 6 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1695914807740 for job 641561a8f514fa3193aa7ddcd82b46f4.", "ecs.version": "1.2.0","process.thread.name":"Checkpoint Timer","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}
{"@timestamp":"2023-09-28T15:27:05.199Z","log.level": "INFO","message":"[AdminClient clientId=mw-page-content-change-enrich.wikikube-codfw.000-enumerator-admin-client] Node 2004 disconnected.", "ecs.version": "1.2.0","process.thread.name":"kafka-admin-client-thread | mw-page-content-change-enrich.wikikube-codfw.000-enumerator-admin-client","log.logger":"org.apache.kafka.clients.NetworkClient"}
{"@timestamp":"2023-09-28T15:32:05.339Z","log.level": "INFO","message":"[AdminClient clientId=mw-page-content-change-enrich.wikikube-codfw.000-enumerator-admin-client] Node 2003 disconnected.", "ecs.version": "1.2.0","process.thread.name":"kafka-admin-client-thread | mw-page-content-change-enrich.wikikube-codfw.000-enumerator-admin-client","log.logger":"org.apache.kafka.clients.NetworkClient"}
{"@timestamp":"2023-09-28T15:32:54.619Z","log.level": "INFO","message":"[Producer clientId=producer-7] Closing the Kafka producer with timeoutMillis = 3600000 ms.", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.clients.producer.KafkaProducer"}
{"@timestamp":"2023-09-28T15:32:54.621Z","log.level": "INFO","message":"Metrics scheduler closed", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.common.metrics.Metrics"}
{"@timestamp":"2023-09-28T15:32:54.622Z","log.level": "INFO","message":"Closing reporter org.apache.kafka.common.metrics.JmxReporter", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.common.metrics.Metrics"}
{"@timestamp":"2023-09-28T15:32:54.622Z","log.level": "INFO","message":"Metrics reporters closed", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.common.metrics.Metrics"}
{"@timestamp":"2023-09-28T15:32:54.622Z","log.level": "INFO","message":"App info kafka.producer for producer-7 unregistered", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.common.utils.AppInfoParser"}
{"@timestamp":"2023-09-28T15:32:54.622Z","log.level": "INFO","message":"[Producer clientId=producer-8] Closing the Kafka producer with timeoutMillis = 3600000 ms.", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.clients.producer.KafkaProducer"}
{"@timestamp":"2023-09-28T15:32:54.705Z","log.level": "INFO","message":"Metrics scheduler closed", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.common.metrics.Metrics"}
{"@timestamp":"2023-09-28T15:32:54.705Z","log.level": "INFO","message":"Closing reporter org.apache.kafka.common.metrics.JmxReporter", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.common.metrics.Metrics"}
{"@timestamp":"2023-09-28T15:32:54.705Z","log.level": "INFO","message":"Metrics reporters closed", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.common.metrics.Metrics"}
{"@timestamp":"2023-09-28T15:32:54.705Z","log.level": "INFO","message":"App info kafka.producer for producer-8 unregistered", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1)#3","log.logger":"org.apache.kafka.common.utils.AppInfoParser"}These show a failure to produce messages to kafka, topic codfw.mediawiki.page_content_change.v1 : Closing the Kafka producer with timeoutMillis = 3600000 ms..
At the time of the reassignment start, a reassignment throttle was set to 50MB/s. It was then incrementally increased by steps of a couple of 10MB/s each time. However, the commit failures started as soon as the reassignment started, meaning with a throttle of 50MB/s.
Since the reassignment started, the topic has been in the following state (it is a topic with a single partition):
Topic: codfw.mediawiki.page_content_change.v1 Partition: 0 Leader: 1002 Replicas: 1013,1002,1011,1003,1008,1004 Isr: 1002,1003,1004
The original replicas 1002, 1003, 1004 are still in sync, and streaming data out to 1013,1011,1008, as all 3 original brokers need to be evacuated.
We initially investigated a possible network policy issue, but an helmfile diff didn't show any output, meaning that new brokers should be able to be egress-ed to:
brouberol@deploy2002:/srv/deployment-charts/helmfile.d/services/mw-page-content-change-enrich$ helmfile -e codfw diff helmfile.yaml: basePath=. Comparing release=main, chart=wmf-stable/flink-app helmfile.yaml: basePath=. brouberol@deploy2002:/srv/deployment-charts/helmfile.d/services/mw-page-content-change-enrich$ helmfile -e eqiad diff helmfile.yaml: basePath=. Comparing release=main, chart=wmf-stable/flink-app helmfile.yaml: basePath=. brouberol@deploy2002:/srv/deployment-charts/helmfile.d/services/mw-page-content-change-enrich$
After investigation, @JAllemandou found the following log emitted by mw-page-content-change-enrich
{"@timestamp":"2023-09-28T17:53:05.864Z","log.level": "INFO","message":"process_enrich_with_page_content -> (process_error_enrich_with_page_content, dict_to_row__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, dict_to_row__$kafka__mediawiki.page_content_change.v1 -> kafka__mediawiki.page_content_change.v1: Writer -> kafka__mediawiki.page_content_change.v1: Committer) (1/1) (94e62905d349a5968ab3bf1ccb1c9e02_e9b82703c83ea7e9b9e5df8d59eb3c36_0_2) switched from RUNNING to FAILED on flink-app-main-taskmanager-1-1 @ 10.194.164.194 (dataPort=6121).", "ecs.version": "1.2.0","process.thread.name":"flink-akka.actor.default-dispatcher-4","log.logger":"org.apache.flink.runtime.executiongraph.ExecutionGraph","error.type":"org.apache.flink.util.FlinkRuntimeException","error.message":"Failed to send data to Kafka codfw.mediawiki.page_content_change.v1-0@-1 with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} ","error.stack_trace":"org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka codfw.mediawiki.page_content_change.v1-0@-1 with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} \n\tat org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:438)\n\tat org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:419)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)\n\tat org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)\n\tat org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)\n\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)\n\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for codfw.mediawiki.page_content_change.v1-0:120000 ms has passed since batch creation\n"}snipped version:
Failed to send data to Kafka codfw.mediawiki.page_content_change.v1-0 [...] Expiring 2 record(s) for codfw.mediawiki.page_content_change.v1-0:120000 ms has passed since batch creation


