Page MenuHomePhabricator

HTML Enrichment - Tuning & Backfilling configuration
Open, Needs TriagePublic

Assigned To
Authored By
JMonton-WMF
Mar 25 2026, 10:06 AM
Referenced Files
F76416427: Screenshot 2026-04-17 at 15.50.33.png
Fri, Apr 17, 7:51 PM
F76404354: Screenshot 2026-04-17 at 13.15.44.png
Fri, Apr 17, 5:21 PM
F76404224: Screenshot 2026-04-17 at 13.13.04.png
Fri, Apr 17, 5:21 PM
F76403654: Screenshot 2026-04-17 at 13.04.07.png
Fri, Apr 17, 5:21 PM
F75498033: image.png
Apr 10 2026, 12:08 PM
F75475714: image.png
Apr 10 2026, 8:19 AM
F75406600: image.png
Apr 9 2026, 11:21 AM
F75406565: image.png
Apr 9 2026, 11:21 AM

Description

We have seen that our HTML enrichment pipeline works well with regular streaming data flow, but if it stops for a few days, or we need to reprocess old data, it doesn't work as expected. It works a bit faster than the current traffic, but to backfill a few days it might need days.

We have tried different configurations and found different issues:

The regular deployment works with a single TaskManager, using 6GB of Memory and 2 CPUs.

Tests done so far:

  • 20 Task Managers - 6GB - 2CPU each:
    • Result: Works a bit faster, but not 20 times faster.
  • 75 Task Managers - 1,5GB - 1 CPU each:
    • Result: Works a few times faster than 20 Tasks, but eventually fails, probably due to OOM
  • 55 Task Managers - 2GB - 1 CPU each:
    • Result: Works faster than 20Tasks, a bit slower than 75 tasks, but eventual it also fails, probably due to OOM (not confirmed)
  • Fake http api enrichment 'sync' mode by setting batch_size=1 and max_workers=1, increasing parallelism and memory
    • This works better, but eventually fails too, not yet sure why.

We know that many HTML API requests might take 10 or 15 seconds, so we know that there is a lot of I/O waiting happening here. We also know that some messages might be big, more than 10MB and near 20MB.

We should try other configurations to know what's the best configuration for backfilling data if needed.

Details

Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Add event process function batch_size_total metricrepos/data-engineering/eventutilities-python!103ottoprocess_function_batch_size_metricmain
page_html_content_change - don't use Content-Length response headerrepos/data-engineering/mediawiki-event-enrichment!133ottohtml_content_lengthmain
Customize query in GitLab

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

Change #1267917 merged by Ottomata:

[operations/deployment-charts@master] mw-page-html-content-change-enrich - temporarily always consume from latest offsets

https://gerrit.wikimedia.org/r/1267917

Looks like prod died, and is now backfilling! Ah, but if there is no checkpointed offsets, flink source will fallback on earliest offset by default!

Merged mw-page-html-content-change-enrich - temporarily always consume from latest offsets (1267917) and applying in prod:

helmfile apply -e dse-k8s-eqiad  --set app.taskManager.replicas=10

Prod now back to normal throughput with no lag, consuming from latest in stream.

Staging lagging and having issues as expected.

BTW, strangely enough: both prod and staging started having issues at about exactly the same time, around 2026-04-03T17:20:00.

Well! Staging is failing with message too large in kafka sink again:

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 23048294 bytes when serialized which is larger than 20971520, which is the value of the max.request.size configuration.

And, it looks like it is happening for the same message about 10 minutes give or take. The message size in the error is always the same size.

So, staging is now caught in a failure loop. It consumes, hard fails when it reaches this message, and then stops. It looks like it is not able to proceed past this message, so each time it restarts from its last checkpointed offsets.

This explains why both staging and prod failed at the same time!

It looks like we've currently got our max latest html + diff size at 18MB. That means that the rest of the event (which should be entirely from the source?) is almost 4MB! We need to find this nasty mediawiki.page_change.v1 event!

But also...let's just increase the kafka max message size some more. Maybe to 30MB max with a 5MB margin?

For now, in staging, I'm going to reduce enrich.max_content_size to 15MB, giving us a 5MB margin.

helmfile apply -e dse-k8s-eqiad \
  --set app.taskManager.replicas=20 \
  --set app.flinkConfiguration.numberOfTaskSlots=2 \
  --set app.config_files.app\\.config\\.yaml.enrich.max_content_size=15728640

Well, whatever I changed didn't work. staging still dying due to the same size too large in kafka sink error.

The production job has started getting stuck.
https://grafana.wikimedia.org/goto/bfi0ytdvi6y2od?orgId=1

It hasn't failed, no execptions. It is now lagging behind.

TM 1-3 had a CPU usage spike, and then subsequently checkpoint duration started climbing. I see errors in logs like

{"@timestamp":"2026-04-04T01:06:07.920Z","log.level": "INFO","message":"Checkpoint 1003 has been notified as aborted, would not trigger any checkpoint.", "ecs.version": "1.2.0","process.thread.name":"process_enrich_with_html -> (process_error_enrich_with_html, convert_python_to_pyflink__$null__stream.error -> null__stream.error: Writer -> null__stream.error: Committer, _stream_key_by_map_operator) (2/10)#0","log.logger":"org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl","flink-job-id":"7bcd09b4585f511f8537b85fba48b8f0"}
`

And in kubectl get events:

11m         Warning   SavepointError   flinkstatesnapshot/flink-app-production-savepoint-periodic-1775263730792   {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.","additionalMetadata":{},"throwableList":[]}

I don't see anything particularly remarkable in TM memory usage at that time. But

Shortly after that other TM-s started seeing CPU spikes too.

Huh! And production caught back up! It did not crash or restart.

I do see that about 10:00 UTC today TM 1-1's YoungGen GC started spiking and has been doing so for the past 2 hours.


staging is caught in the message too large restart loop, so is replaying the same messages over and over again. I'm going to just destroy the staging deployment for now until we have time to help it.

helmfile -i -e dse-k8s-eqiad destroy --selector name=staging

Increase mediawiki.page_change.v1 kafka topic partitions and increase kafka source parallelism

@JMonton-WMF this is still worth a try, but I am less and less thinking that source throughput is the problem. Flink should shuffle the source messages to downstream tasks, and the source messages are not the big ones. We have successfully backfilled page_content_change with a lot of TMs before, with the same number of kafka topic partitions.

If we do try: we should try by creating a new temporary kafka topic with more partitions, and using kafkacat to pipe messages to it, and read from that. That way we don't prematurely increase the partitions on a widely used topic.

I think the issue is process function latency and resulting backpressure. Perhaps there are OOM issues related to this too (e.g. network buffers / memory too full) that we need to investigate.

Average and max per process function latency:
https://grafana.wikimedia.org/goto/efi32eledfrwge?orgId=1

This is using 'fake sync':
During normal operation, the average is about 400ms (for both parent and latest html).
During the backfill hiccup that happened (and then recovered) last night, the average latency was around 1 second, and for a moment was up as high as 2 seconds.

The max latency doesn't look that much higher, so I don't think there is too much latency variation skewing the averge.

Questions then:

  • Can we do better than 400ms in the normal case?
    • This is fake sync, so each process function invocation is processing 1 message. That means: I don't think we can really improve per message latency much more than that. But, we should check the rest api / parser latency averages too. Is it possible we are causing the API to be slower?
  • Can we improve throughput by parallelizing?
    • I'd expect that increasing async I/O would affect throughput in about the same way as increasing TM parallelism. Increasing TM parallelism is easier to reason about (memory and cpu are more isolated).

So: can we increase parallelism without increasing latency? To find out!

FYI, we see log messages like

Name collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported.

Looks like a Flink bug in our current version.

Change #1268071 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-app - default to setting metrics.internal.query-service.port

https://gerrit.wikimedia.org/r/1268071

Ah, I figured out why the Flink UI wasn't showing metrics. metrics.internal.query-service.port was not configured. It is a random port by default, but in k8s we need a networkpolicy for it. This was already supported by the flink chart, but it wasn't set by default.

Patch here.

I just restarted html in prod

helmfile apply -e dse-k8s-eqiad \
  --set app.taskManager.replicas=10 \
  --set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124

and I see Flink UI metrics now.

Can we do better than 400ms in the normal case?

So, according to MW REST API latencies, the p50 should be about 40ms. We are 10x slower?
https://grafana.wikimedia.org/goto/cfi3ld57x1y4gf?orgId=1

I suppose we can assume that most (all?) of our requests are causing a parse. The parent rev request almost certainly is, and it is not impossible that we are requesting latest rev html before it has been parsed and cached too.

I wonder if anything would change if we introduce a slight delay into the pipeline? At least for latest rev html? OTOH, each event needs to request both parent and latest, so improving the latency of just one of the requests is not going to help with overall latency.

It'd nice if we could find some latency stats for HTML requests that cause a parse. How good can we expect this latency to ever be?

Looks like something similar to T421216#11787170 happened again last night.
https://grafana.wikimedia.org/goto/efi675t7jq96ob?orgId=1

At about 23:30, there was a spike in CPU usage on one TM. Shortly after that, checkpoint duration started increasing, and then shortly after that and then around 02:30, the job restarted. It is now slowly catching back up.

In the current prod configuration, this very slow catch up sort of makes sense? At about 20 messages per second in, each event has to flow through 2 process functions at ~400ms each. Each event takes almost a second. But we actually do each process functions async: we are doing 'fake' sync: each individual process function is executing async of each other. So 400ms * 2 process functions / 2 events async == an event pops out in the sink about once every 400ms. With 10 TMs in parallel, we should be able to handle just over 20 events per second.

So once again, the pressing questions to answer are:

  • Why does it take 400ms? I think it should be faster than this.
    • This measure is a custom time metric of our process function duration, not per HTTP call. The envoy latency stats look funny so I'm not sure I trust them. Is the HTTP call actually taking 400ms, or is our fake sync EventProcessFunction adding to a lot of that time somehow?
  • What's up with this CPU spike + job restart?

I think if we answer those we will be onto something...

the job restarted. It is now slowly catching back up.

Well it isn't catching back up now.

But wait! I tried to configure production to not try to catch up! Either the settings I applied aren't working, or they aren't actually applied.

I've (tried to) set:

upgradeMode: stateless and offsets_initializer: latest. ¯\_(ツ)_/¯ grrrrrr

If we do try: we should try by creating a new temporary kafka topic with more partitions, and using kafkacat to pipe messages to it, and read from that. That way we don't prematurely increase the partitions on a widely used topic.

We can do that, increase another topic and try, but in any case, I truly believe that having only 1 partition for this topic is a bit of a mistake, and it feels like we are trying to tune K8s and Flink a lot, to be able to mantain a single partition, when having more partitions would allow us to split the work better. The number of partitions per broker is hard to calculate and there isn't a rule to follow, but we can compare with some recommendations.

We have 9 brokers in Jumbo, they have around 700 partitions each (including replicas). More than half of them are completely empty due to having codfw and eqiad data, and what I believe are old topics not being used. I think that having 3 partition for topics like this one could be a good default. Having more partitions also distributes the work across brokers, with a single partition, only one broker works for all pipelines consuming "page_change.v1".

I mean, I don't think we will ever need to reduce a topic like this one from 3 partitions to 1. We have multiple pipelines reading from it, and it has around 20messages/second. I only see benefits on increasing from 1 to 3 partitions on this topic, and if we are concerned about the total number of partitions, I think we have other things to do first, like removing old topics not being used.

I am less and less thinking that source throughput is the problem. Flink should shuffle the source messages to downstream tasks, and the source messages are not the big ones. We have successfully backfilled page_content_change with a lot of TMs before, with the same number of kafka topic partitions

I think this is a problem due to a combination of many things (HTTP slow calls, latency per event, parallelism limited, memory...). It's true that a single partition can handle easily 10k messages/second, which is really far from our 20msg/second or the throughput we achieve when backfilling. As you said, page_content_change has a lower latency, which allows for better input throughput, and better throughput overall, but the latency in the HTML pipeline is worse. We have calls that take 10, 20 or 40 seconds to complete. We can create a lot of tasks, but the more tasks we have, the more coordination needs to happen, more Flink watermarks, barriers and so on, and probably more waiting time to synchronize all of that. I'm also sure we can find better performance only with TaskManagers, but maybe we need more async and more memory in each pod? We are limited to 8GB per container right now, and we don't control how Flink manage resources between "source" and "process". And maybe having 3 partitions will give us 3 times the parallelism we have right now? It is also a good, test, if 3 partitions doesn't give us almost 3 times the throughput, then we have a big issue with something else.

As a rule of thumb, we need to do 40 HTTP calls per second. If many of them take 10 seconds, we need to have 400 HTTP calls in parallel or we won't be able to reach 20msg/seconds. If we need to backfill, we need to increase that x2, x4, or ideally x10 or more.

Before testing the increase in partitions, I'm testing something else.

Looking at the Flink UI, I believe the biggest issue right now is "checkpointing". The UI shows failed checkpoints, checkpoints taking almost 10 minutes to finish, and parallel checkpoints. Also, something is wrong overall, although the application is working, it isn't reducing the offset lag in Kafka, it looks like it isn't able to properly save checkpoints, or lagging a lot on them.

Looking a bit at docs, I'm trying this:

helmfile apply -i -e dse-k8s-eqiad \
  --set app.taskManager.replicas=3 \
  --set app.taskManager.resource.memory=7500m \
  --set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=16 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=16 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.mode=EXACTLY_ONCE \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.unaligned\\.enabled=true \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.max-concurrent-checkpoints=1 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.interval=120000 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.min-pause=90000 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.interval-during-backlog=300000 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.timeout=15m

The checkpoint was already exactly_once, the Sink delivery guarantee was at_least_once, but the checkpoint is exactly_once. I thought about changing it to at_least_once too, but it looks like there is another approach: Making it "unaligned". So, we can make only 1 concurrent checkpoint, making it "unaligned" which seems to improve in scenarios with big latencies, and we can also reduce the number of checkpoints being created.

I also reduced the number of Tasks and increased the number of async operations.

Now I think we could try these 2 things in 2 tests, first the checkpoint change, and later the async.

The previous config started failing after 1 hour, I wasn't able to check the exact reason, although I assume it was OOM due to the 16 workers per task.

I changed the config to:

helmfile apply -i -e dse-k8s-eqiad \
  --set app.taskManager.replicas=10 \
  --set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=1 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.mode=EXACTLY_ONCE \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.unaligned\\.enabled=true \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.max-concurrent-checkpoints=1 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.interval=120000 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.min-pause=90000 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.interval-during-backlog=300000 \
  --set app.flinkConfiguration\\.execution\\.checkpointing\\.timeout=15m

Which is the same it was running previously, but with the changes in checkpoints. It has been running for 19h now without issues. It isn't backfilling, and I don't expect it to be able to backfill properly, as 10 tasks without async doesn't look like they can achieve enough parallelism. Maybe we could increase threads to 2 or 3 if everything works fine.

The checkpoint was already exactly_once, the Sink delivery guarantee was at_least_once,

Oh! I did not know this! Very interesting.

I assume it was OOM due to the 16 workers per task.

Yeah, I'm more and more thinking sync (or at least sync per event; we should probably do async for the pair of API calls) is the way to go here. It is simply easier to tune and reason about. A single high latency event enrichment won't block e.g. 16 events at once. It also means that a single task doesn't have to keep multiple copies of latest and parent revision html in memory for each 16 events all at once.

checkpointing..

making it "unaligned" which seems to improve in scenarios with big latencies, and we can also reduce the number of checkpoints being created.

Doing some reading, I think you are right: this is a better choice for our pipeline. But I am not totally sure...

So:

alignment is needed for all operators with multiple inputs

Okay great, we don't have multiple inputs.

and for operators after a shuffle when they consume output streams of multiple upstream subtasks

But, does our use of keyBy mean we do need aligned checkpoints?

Also:

Checkpointing can also be performed unaligned. The basic idea is that checkpoints can overtake all in-flight data as long as the in-flight data becomes part of the operator state.

Does this mean that we may end up storing source events, possibly events + latest html (after first process function) and also maybe even final output events with latest html + diff in the snapshot state? Is it possible this will add a lot of snapshot save latency because a bunch of data needs to be saved? Perhaps this is not so bad if we don't do any async stuff? Or maybe it doesn't matter because multiple messages will be in-flight in various buffers in between the operators?

We should ask some people who have more practical Flink expertise. @dcausse maybe?

..Actually, I'll schedule a meeting with some folks for a more general consultation on our woes...

As a rule of thumb, we need to do 40 HTTP calls per second. If many of them take 10 seconds, we need to have 400 HTTP calls in parallel or we won't be able to reach 20msg/seconds. If we need to backfill, we need to increase that x2, x4, or ideally x10 or more.

Was chatting with Fabian, and realized that when trying to estimate this kind of thing, I have not been factoring in timeouts and retries. We default max_retries to 3 with a retry backoff factor of 0.1.. Retry's are handled by python Requests via urllib3 Retry. backoff_factor of 0.1 adds minimal time. But, if server will 504 timeout after 60 seconds, and we retry 3 times (and the same request continues to timeout, which I think is likely...) then we add 180 seconds of latency for the HTTP call. Assuming this happens on some large pages for both parent and latest html, we can double that to 360 seconds 6 minutes!

This comment was removed by JMonton-WMF.

I'm trying this setup on the -next deployment:

helmfile apply -i -e dse-k8s-eqiad \
  --set app.taskManager.replicas=25 \
  --set app.taskManager.resource.memory=5500m \
  --set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=1 \

It started working well, but checkpoints started to fail. There was 2 concurrent checkpoints, that from 18 checkpoints, 17 failed due to timeout. It seems to run and process records, but it dies from time to time, and it doesn't commit offsets.

This config:

helmfile apply -i -e dse-k8s-eqiad \
  --set app.taskManager.replicas=25 \
  --set app.taskManager.resource.memory=5500m \
  --set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=1 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.unaligned\\.enabled=true \
  --set app.flinkConfiguration.execution\\.checkpointing\\.max-concurrent-checkpoints=1 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=120000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=90000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=300000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=15m

Changes to the unaligned checkpoints, and it is running ok right now.

it is backfilling too, although not very fast:

image.png (297×1 px, 34 KB)

Hm! prod failed with a message to large error but in the error sink!

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka eqiad.mw_page_html_content_change_enrich.error--1@-1 with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} 
...
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 23117387 bytes when serialized which is larger than 20971520, which is the value of the max.request.size configuration.

It looks like it is getting stuck in the same restart loop that staging was as noted in T421216#11786474.

If this is in the error sink, it means that this is was produced in the second process function step. The message is large because the latest html is large. Then parent rev html enrichment failed, and tried to produce this error message.

Oh! We are still using the Content-Length header, yes? Maybe that is why this is getting by our size check! We realized that we needed to not rely on this, since this is the compressed content size as returned by the HTTP API.

Will submit a patch...

I'm testing a new approach on the -next deployment. I change the code so there are 2 topologies:

  • Read from Kafka (1 partition) -> Write in a Temp topic with 32 partitions
  • HTML pipeline reading from the Temp 32 partitions topic.

image.png (1×2 px, 237 KB)

It is still running and I'd like to keep it running a bit longer, to see if it performs better or not. It shows good numbers on Flink so far:

image.png (404×898 px, 34 KB)

It is using the aligned checkpoints as always.

helmfile apply -i -e dse-k8s-eqiad \
--set app.version=v1.49.0.dev31 \
--set app.taskManager.replicas=32 \
--set app.taskManager.resource.memory=4000m \
--set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124 \
--set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=1 \
--set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=1 \

I'd like to increase the async part to 2 or 3, if it fits on memory and checkpoints are fine, that should improve it.

One interesting behavior is that the first part of the pipeline, which only reads and writes, started as expected, producing a million of records in a few seconds, but as soon as the second pipeline started lagging, the first one reduced its speed a lot. It did almost 2 millions in less than a minute, and it haven't reached 3 millions in 15 minutes.

Deploying content length fix in prod, and also avoiding retries on 504 server timeouts.

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.49.0 \
  --set app.taskManager.replicas=10 \
  --set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124 \
  --set app.config_files.app\\.config\\.yaml.http_session.retry_status_forcelist='{408,500,502,503}'

I am trying process_async_enabled_default=False in staging (T421965).

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.50.0.dev30 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=2 \
  --set app.flinkConfiguration.metrics\\.internal\\.query-service\\.port=6124 \
  --set app.config_files.app\\.config\\.yaml.http_session.retry_status_forcelist='{408,500,502,503}' \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_async_enabled_default=false \
    --set app.config_files.app\\.config\\.yaml.stream_manager.source.options.offsets_initializer=earliest \
  --set app.job.upgradeMode=stateless

I'm starting the 'backfill' from the earliest offset to avoid a checkpoint/on_timer restore issue (should be fixed in latest commit on MR).

If this works (doesn't fail due to sync mode), I think we can merge the MR and have this as an option.

It seems to work fine, but we got again this:

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka eqiad.mw_page_html_content_change_enrich.error--1@-1 with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} 
	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.decorateException(KafkaWriter.java:476)
	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.onCompletion(KafkaWriter.java:448)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1077)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962)
	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:80)
	at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:195)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:166)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
	at org.apache.flink.streaming.api.operators.python.process.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:52)
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.emitResult(AbstractExternalOneInputPythonFunctionOperator.java:133)
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:100)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:308)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1838)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$27(StreamTask.java:1829)
	... 13 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 23137028 bytes when serialized which is larger than 20971520, which is the value of the max.request.size configuration.
helmfile apply -i -e dse-k8s-eqiad \
  --set app.version=v1.49.0 \
  --set app.taskManager.replicas=6 \
  --set app.taskManager.resource.memory=7500m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=2 \
  --set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124 \
  --set app.flinkConfiguration.\\python\\.fn-execution\\.bundle\\.size=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=70 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=70 \
  --set app.config_files.app\\.config\\.yaml.http_session.pool_maxsize=50 \
  --set app.config_files.app\\.config\\.yaml.http_session.retry_status_forcelist='{408,500,502,503}'

I think this might be a good throughput metric:

image.png (1×2 px, 240 KB)

But it's around 18messages/second.

This is the last configuration I'm using, I tried to reduce buffers, use unaligned checkpoints, async=1 and use more tasks managers and slots. It went up to 500 requests/second. I haven't tried the "sync" mode you enabled @Ottomata, I got an error and haven't looked why.

Some params might not be needed to be honest.

helmfile apply -i -e dse-k8s-eqiad \
  --set app.version=v1.50.0.dev40 \
  --set app.taskManager.replicas=15 \
  --set app.taskManager.resource.memory=7400m \
  --set app.flinkConfiguration.taskmanager\\.network\\.memory\\.buffers-per-channel=2 \
  --set app.flinkConfiguration.taskmanager\\.network\\.memory\\.floating-buffers-per-gate=8 \
  --set app.flinkConfiguration.taskmanager\\.network\\.memory\\.buffer-debloat\\.enabled=true \
  --set app.flinkConfiguration.taskmanager\\.network\\.memory\\.buffer-debloat\\.period=100ms \
  --set app.flinkConfiguration.taskmanager\\.network\\.memory\\.buffer-debloat\\.target=50ms \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=7 \
  --set app.flinkConfiguration.\\metrics\\.internal\\.query-service\\.port=6124 \
  --set app.flinkConfiguration.\\python\\.fn-execution\\.bundle\\.size=1 \
  --set app.config_files.app\\.config\\.yaml.http_session.retry_status_forcelist='{408,500,502,503}' \
  --set app.flinkConfiguration.execution\\.checkpointing\\.max-concurrent-checkpoints=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=1 \
  --set app.config_files.app\\.config\\.yaml.http_session.pool_maxsize=1 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.unaligned\\.enabled=true \
  --set app.flinkConfiguration.execution\\.checkpointing\\.max-concurrent-checkpoints=1 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=10000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=600000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=5m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.aligned-checkpoint-timeout=1s

checkpoints are around a few seconds with this config.

But all tries to use more of the "async" have failed with very low number of requests to Envoy. Even using 500 or 1000 wasn't working (maybe they were waiting for the CPU to be available, so using more than CPUs doesn't do anything?)

Change #1270548 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] mw-page-html-content-change-enrich-next - try sync mode

https://gerrit.wikimedia.org/r/1270548

Change #1270548 merged by Ottomata:

[operations/deployment-charts@master] mw-page-html-content-change-enrich-next - try sync mode

https://gerrit.wikimedia.org/r/1270548

Alright, I merged the patch for T421965: eventutilties-python - support synchronous Flink process function mode, and bumped MWEE to use it. I merged your patch to use a single process function, and released MWEE.

I also merged mw-page-html-content-change-enrich-next - try sync mode (1270548) to have some of that as default.

I then redeployed staging with some (but not all) of your trial settings from above:

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.50.0 \
  --set app.taskManager.replicas=15 \
  --set app.taskManager.resource.memory=7400m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=7 \
  --set app.flinkConfiguration.taskmanager\\.network\\.memory\\.buffer-debloat\\.enabled=true \
  --set app.flinkConfiguration.\\python\\.fn-execution\\.bundle\\.size=1 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.unaligned\\.enabled=true \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=5m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.aligned-checkpoint-timeout=1s

I removed the settings that (I think) are Flink defaults, like buffers-per-channel and floating-buffers-per-gate. I also removed the modifications to some of the debloat settings; reading the docs the defaults seemed more appropriate, but please let me know if that was way off and you found good reasons to set those.


One good thing: it looks like at least in normal operation, (not backfilling) staging is no longer backpressured at all! Checkpoint duration is around 20 seconds. It does look like the unaligned checkpoint is increasing the checkpoint size (from ~50kb to ~300kb). I think unaligned checkpoints help us here. With our buffer tuning, hopefully the # of inflight records is small, and with our single process function thing, the number of buffers with large messages should be small too.

Going to quit for the day, we should try this while backfilling next...

Hm, async_enabled=False EventProcessFunction metrics look busted though: https://grafana.wikimedia.org/goto/bfizxkx7mumm8d?orgId=1

Hm, it looks like my Monday deployment now container OOMs regularly after about 11 hours. it restarts and recovers fine.

Reverted back to your dev tag + fake sync mode, but with the same flink configs from my Monday try.

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.50.0.dev40 \
  --set app.taskManager.replicas=15 \
  --set app.taskManager.resource.memory=7400m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=7 \
  meout=1s \
    --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=1 \
  --set app.flinkConfiguration.taskmanager\\.network\\.memory\\.buffer-debloat\\.enabled=true \
  --set app.flinkConfiguration.\\python\\.fn-execution\\.bundle\\.size=1 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.unaligned\\.enabled=true \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=5m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.aligned-checkpoint-timeout=1s

Change #1271761 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] html enrich staging - remove process_async_enabled_default config until it settles

https://gerrit.wikimedia.org/r/1271761

Change #1271761 merged by Ottomata:

[operations/deployment-charts@master] html enrich staging - remove process_async_enabled_default config until it settles

https://gerrit.wikimedia.org/r/1271761

Ah! It also slowly container OOMs and restarts in the same way on v1.50.0.dev40 (without the new async_enabled param). So seems like more due to our latest Flink settings? Or was it OOMing before (in non backfill case) too?

Since OOMs don't seem to be related to async_enabled=false or the v1.50.0 release I'm going to deploy staging back at v1.50.0. I want to find out if enrich function metrics are broken just for synch mode, or if they are also broken in async. I also don't think we need so much parallelism (if we are not backfilling, and maybe even if we are). Deploying v1.50.0 with 'fake sync' mode, with slight reduction in parallelism and TM memory.

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.50.0 \
  --set app.taskManager.replicas=15 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  meout=1s \
    --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=1 \
  --set app.flinkConfiguration.taskmanager\\.network\\.memory\\.buffer-debloat\\.enabled=true \
  --set app.flinkConfiguration.\\python\\.fn-execution\\.bundle\\.size=1 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.unaligned\\.enabled=true \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=5m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.aligned-checkpoint-timeout=1s

And, the enrich function metrics disappeared. So metrics broke in EUP MR101. On it...

Change #1272843 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] html-enrich - bump to v1.51.0 and apply some flink tuning

https://gerrit.wikimedia.org/r/1272843

Change #1272843 merged by Ottomata:

[operations/deployment-charts@master] html-enrich - bump to v1.51.0 and apply some flink tuning

https://gerrit.wikimedia.org/r/1272843

Status report!

  • I fixed the process function metrics. I accidentally renamed them.
  • I think sync mode is working fine. Also, the buffer debloating and unaligned checkpointing seem to be helping too. I've merged some changes to both prod and staging helmfiles and deployed them. (commands run below).
  • I'm not 100% sure if we still have OOMs. That might have been related to too many task slots for the amount of allocated memory. Or, I've just increased the time-to-OOM by reducing number of slots. TBD...

In prod:

  taskManager:
    replicas: 5
    resource:
      cpu: 2
      memory: 6000m

    "taskmanager.numberOfTaskSlots": 4


helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.50.0

In staging:

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.50.0 \
  --set app.taskManager.replicas=15 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  meout=1s \

I'll check back in on this tomorrow. I'd like to try backfilling again to see how this does.

I fixed the process function metrics. I accidentally renamed them.

Errr, I thought I fixed them, but I don't see them. They were fixed locally. Hm. Will follow up tomorrow.

Intersting data point:

Last night at about 00:00 both staging and prod had a spike in checkpoint times. These were running with the default checkpoint timeout of 10 minutes. staging has a few more resources allocated to it.

It looks like:

  • staging checkpoint finished at about 8 minutes. checkpoint size never increased, so I assume unaligned checkpoints didn't kick in. I guess they only work for the subsequent checkpoint if it times out or takes too long?
  • I can't tell for sure, but prod checkpoint took almost 10 minutes, so perhaps one timed out? After that checkpoint size temporarily increased. I assume this means that unaligned checkpoints turned on for a little while.

Both staging and prod eventually recovered back to short (6-10) second max checkpoint times. Prod also looks like it reset to aligned checkpoints, as the checkpoint size went back down.

During this time, it looks like both staging and prod experienced backpressure and lag. I'm guessing that there was at least one message in the source that caused a very high latency fetch (with envoy retries? we haven't disabled that yet). Since the checkpoint timeout was 10 minutes, they all each until the checkpoint finished or time out before allowing the pipeline to proceed.

I think Javier was right to have reduced the checkpoint timeout. In the case of high latency fetches, we can just timeout the in-flight checkpoint and wait for the next one.

Let'sreduce the timeout to 4 minutes.

Also, a TM pod in production container OOMed and the job was auto restarted. Staging has parallelism=60, and prod has 20. I can't quite tell if a container OOM will eventually happen to staging too, or if memory usage is stable with these resources.

I'd like to experiment with python thread worker mode, and maybe even JEMALLOC again.

Change #1273779 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] html-enrich - reduce checkpoint timeout

https://gerrit.wikimedia.org/r/1273779

I fixed the process function metrics. I accidentally renamed them.

Errr, I thought I fixed them, but I don't see them. They were fixed locally. Hm. Will follow up tomorrow.

Ah, I deployed the wrong MWEE version! I needed v1.51.0!

Change #1273779 merged by jenkins-bot:

[operations/deployment-charts@master] html-enrich - reduce checkpoint timeout

https://gerrit.wikimedia.org/r/1273779

I'd like to experiment with python thread worker mode

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.51.0 \
  --set app.flinkConfiguration.python\\.execution-mode=thread

But, I get this java.lang.RuntimeException: java.lang.NoSuchFieldException: loadedLibraryNames\n\tat pemja.utils.CommonUtils.loadLibrary (full error pasted below).

I am able to reproduce this locally. I couldn't find anything online related to this, but I did find these sort of similar issues:

In FLINK-38500, they suggested

one workaround is that moving flink-python jar from opt/ to lib/

I tried locally in my dev image by building it via blubber with

  docker-dev:
    includes: [production]
    copies:
# ...
      - from: production
        source: /opt/flink/opt/flink-python-1.20.3.jar
        destination: /opt/flink/lib/flink-python-1.20.3.jar
    builders:
      - custom:
          command: "rm -v /opt/flink/opt/flink-python-1.20.3.jar"

I verified that flink-python-1.20.3.jar was moved into lib/ successfully. I still got the same error.

FLINK-38161 suggests that the problem is python dependencies in different system and venv environments. Welp! Luckily T418525 is happening now, and I mertged M136.

I tried locally and still got the same error.

I'm giving up on trying python thread mode for now.


Full error:

{
  "@timestamp": "2026-04-17T13:53:38.797Z",
  "log.level": "INFO",
  "message": "kafka__mediawiki.page_change.v1__convert_flink_to_python, filter_remove_canary_events, filter_filter_for_wikitext_content_model, _stream_key_by_map_operator (2/20) (9e0e9e2866af5066e37892b612c092a9_0a448493b4782967b150582570326227_1_4) switched from INITIALIZING to FAILED on flink-app-production-taskmanager-1-4 @ 10.67.24.104 (dataPort=6121).",
  "ecs.version": "1.2.0",
  "process.thread.name": "flink-pekko.actor.default-dispatcher-4",
  "log.logger": "org.apache.flink.runtime.executiongraph.ExecutionGraph",
  "flink-job-id": "0453635ef1de8e113845f999cbb2f717",
  "error.type": "java.lang.RuntimeException",
  "error.message": "java.lang.NoSuchFieldException: loadedLibraryNames",
  "error.stack_trace": "java.lang.RuntimeException: java.lang.NoSuchFieldException: loadedLibraryNames\n\tat pemja.utils.CommonUtils.loadLibrary(CommonUtils.java:103)\n\tat pemja.utils.CommonUtils.loadPython(CommonUtils.java:45)\n\tat pemja.core.PythonInterpreter$MainInterpreter.initialize(PythonInterpreter.java:365)\n\tat pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:144)\n\tat pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:45)\n\tat org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator.open(AbstractEmbeddedPythonFunctionOperator.java:72)\n\tat org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator.open(AbstractEmbeddedDataStreamPythonFunctionOperator.java:88)\n\tat org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.open(AbstractOneInputEmbeddedPythonFunctionOperator.java:68)\n\tat org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator.open(EmbeddedPythonProcessOperator.java:67)\n\tat org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)\n\tat org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)\n\tat org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)\n\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)\n\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: java.lang.NoSuchFieldException: loadedLibraryNames\n\tat java.base/java.lang.Class.getDeclaredField(Class.java:2612)\n\tat pemja.utils.CommonUtils.loadLibrary(CommonUtils.java:87)\n\t... 19 more\n"
}

Trying JEMALLOC in prod:

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.51.0 \
  --args "--set-string app.env[0].value=false"

Note on this command. Here is the value we need to override.

env:
  # Disable use of jemalloc, we've mostly seen it just cause more memory use
  # https://phabricator.wikimedia.org/T332948#8792955
  - name: DISABLE_JEMALLOC
    value: "true"
  • I needed to use --set-string in order to force the boolean to be a string.
  • I relied on the fact that app.env[0] just happened to be the element in the array to override.

I'll let this run as is for a while. now to test some backfilling in staging...

Attempting a backfill on staging with

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.source.options.offsets_initializer=earliest \
  --set app.job.upgradeMode=stateless

(v1.52.0 is the version with T418525. Should be a noop for the app.)

Checkpoint times up to about 2 minutes, and checkpoint size around 115MB..

Checkpoints are failing. e.g.

{
  "@timestamp": "2026-04-17T15:35:49.151Z",
  "log.level": "WARN",
  "message": "Failed to trigger or complete checkpoint 4 for job e3c769a1613187f51abafee3a4ea8ecb. (0 consecutive failed attempts so far)",
  "ecs.version": "1.2.0",
  "process.thread.name": "Checkpoint Timer",
  "log.logger": "org.apache.flink.runtime.checkpoint.CheckpointFailureManager",
  "flink-job-id": "e3c769a1613187f51abafee3a4ea8ecb",
  "error.type": "org.apache.flink.runtime.checkpoint.CheckpointException",
  "error.message": "Checkpoint expired before completing.",
  "error.stack_trace": "org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2346)\n\tat org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n"
}
...
{
  "@timestamp": "2026-04-17T15:35:49.153Z",
  "log.level": "INFO",
  "message": "Trying to recover from a global failure.",
  "ecs.version": "1.2.0",
  "process.thread.name": "flink-pekko.actor.default-dispatcher-13",
  "log.logger": "org.apache.flink.runtime.jobmaster.JobMaster",
  "flink-job-id": "e3c769a1613187f51abafee3a4ea8ecb",
  "error.type": "org.apache.flink.util.FlinkRuntimeException",
  "error.message": "Exceeded checkpoint tolerable failure threshold. The latest checkpoint failed due to Checkpoint expired before completing., view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.",
  "error.stack_trace": "org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. The latest checkpoint failed due to Checkpoint expired before completing., view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.\n\tat org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:212)\n\tat org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169)\n\tat org.apache.flink.runtime.checkpoint.CheckpointFailureManage{F76404142}r.handleCheckpointException(CheckpointFailureManager.java:122)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2281)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2260)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$1200(CheckpointCoordinator.java:102)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2346)\n\tat org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\n"
}

It looks like this is causing the job to restart. It does seem to be making progress in the topic..., but I'm not sure how Flink holds on to its offset state if it can't complete checkpoints? Maybe it is falling back to the committed offsets in Kafka and skipping the uncheckpointed stuff each time?


Why are checkpoints taking so long though!

I noticed that manage memory is 100% used. Perhaps increasing that, or changing the fraction allocated for python, may help?

Screenshot 2026-04-17 at 13.04.07.png (132×2 px, 25 KB)

Or, maybe not: it is 100% used during normal operation too. Hm.

I can see in Flink JM UI that in one of the failed checkpoints, the barrier didn't even fully make it past the first filter operator? They didn't make it into the process function operator?

Screenshot 2026-04-17 at 13.13.04.png (1×2 px, 322 KB)

Then again, here is one where most did make it into the process function operator:

Screenshot 2026-04-17 at 13.15.44.png (1×2 px, 336 KB)

I think I'm misunderstanding something about unaligned checkpoints. I see that the checkpoint has to flow through each operator and each subtask. Some of the checkpoint barriers(?) are 'aligned' and some are 'unaligned'. For this one failed checkpoint, one of the aligned barriers times out and aborts, and then that causes the whole checkpoint to fail. I thought unaligned checkpoints would complete once one checkpoint barrier made it through, and everything else in-flight would just be saved in the checkpoint state?

reading...

Hm:

https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/ops/state/checkpointing_under_backpressure/#interplay-with-long-running-record-processing

Despite that unaligned checkpoints barriers are able to overtake all other records in the queue. The handling of this barrier still can be delayed if the current record takes a lot of time to be processed.
[...]
In such scenarios back pressure can block unaligned checkpoints until all the network buffers required to process the single input record are available. It also can happen in any other situation when the processing of the single record takes a while.

And also from https://aws.amazon.com/blogs/big-data/optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints-part-2/ which is the best explainer I've read yet:

Although checkpoint barriers can surpass records in the network’s buffer queue, this won’t occur if the sub-task is currently processing a message. If processing a message takes too much time (for example, a flat-map operation emitting numerous records for each input record), barrier handling will be delayed.

I'm not entirely sure what to do other than increase checkpoint timeout. Even if we do the unordered async option, I think a single high latency HTML fetch will cause the entire checkpoint to take a long time, and ultimately cause the whole pipeline to block? I must be misunderstanding something...

I'm going to try to adjust a few checkpoint settings. Perhaps a larger timeout as well as fewer checkpoint attempts during backpressure will let the pipeline mostly progress, and only occasionally stall...

# in staging
helmfile diff -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=60000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=10 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0
  • 10 minute checkpoint timeout
  • 60 second checkpoint interval with 30 second min pause between checkpoint attempts
  • 10 minute checkpoint interval during backlog
  • tolerate 10 failed checkpoints before failing job
  • disable auto-savepoints while we attempt the backfill (savepoints are never unaligned).

I removed the offset reset stuff, allowing it to pick up from its last successful checkpoint. There is still a lot in the backlog since I reset it.

I usually don't see skew: why would we? Unless somehow all very busy pages get mapped to the same subtask?

But after the most recent restart, the enrich task has 33% skew in its subtasks?

Screenshot 2026-04-17 at 15.50.33.png (676×2 px, 168 KB)

Checkpoints are being triggered about every 60 seconds, even though we are underbackpressure. Going to try just setting a main 10 minute checkpoint interval

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=10 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0

Well, things are...better? Checkpoints are completing and the app is not getting bogged down. Now it runs long enough to encounter a TM container OOM (I think), and the app is restarted from its most recent checkpoint. That's good! That means it is able to make progress even in the case of restarts (even if it does have to re-backfill a max of 10 minutes since the previous checkpoint).

But why the OOM? I haven't been able to locate logs or kubectl events for this. It looks like all TMs slowly creep up in memory usage. It looks like eventually a TMs pod is killed (by k8s?) due to OOM, but I don't have that evidence for sure.

I also can't tell which kind of memory is the culprit. It doesn't look like python worker memory usage, those are pretty stable.

All I can see is container RSS steadily increasing over time.

I have been accidentally posting my recent updates on T422928: HTML Pipeline - Performance improvements instead of this ticket. I will repost them here with the times I posted them. The times are pasted from my browser so are in US east coast timezone.


Sun, Apr 19, 10:00

Now with less frequent checkpointing, checkpoint times seem to be a bit under control.

I wanted to try a little bit of async again, trying:

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=2 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=10 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0 \
  --set app.flinkConfiguration.python\\.fn-execution\\.bundle\\.size=4 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_async_enabled_default=true \
    --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=4 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=4
So batch_size=4, and I upped python bundle size to 4 too (assuming that passing 4 at a time to python makes sense given that we'd block the batch on all 4 succeeding?)

Sun, Apr 19, 12:03

results after a few hours: throughput is down. Strangely HTTP idle connection timeouts are higher. Somehow the async stuff really causes the pipeline to stall now and then.

I did reduce parallelism from 80 to 40 (by reducing task slots), expecting the async batch to make up for it (and allow for more memory headroom in the TM).

I don't see much of a difference in memory usage. I'm going to try the async batch of 4 with the same 80 parallelism, just to rule that out as the issue.

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=10 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0 \
  --set app.flinkConfiguration.python\\.fn-execution\\.bundle\\.size=4 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_async_enabled_default=true \
    --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=4 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=4

If this doesn't help, I'll try keeping the async batch of 4, but reduce python bundle size back to 1. ¯\_(ツ)_/¯


Sun, Apr 19, 14:21

I'm going to try the async batch of 4 with the same 80 parallelism

Throughput not better, and now OOMs happen much more quickly (expected).

Now trying async batch of 4, python bundle size of 1. number of task slots = 3 to increase parallelism but maybe avoiding quick OOMs?

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=3 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=10 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0 \
  --set app.flinkConfiguration.python\\.fn-execution\\.bundle\\.size=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_async_enabled_default=true \
    --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=4 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=4

Sun, Apr 19, 20:24
Hm, well, it isn't really better, but it isn't worse? I'm going to just reduce batch size to 2 just to try it. I'll go back up to task slots = 4 so parallelism = 80.

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=10 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0 \
  --set app.flinkConfiguration.python\\.fn-execution\\.bundle\\.size=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_async_enabled_default=true \
    --set app.config_files.app\\.config\\.yaml.stream_manager.process_max_workers_default=2 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_batch_size_default=2

It is hard to tell but I don't think I see that much improvement over sync vs async batch_size=2.

For the sake of normalcy and discussions tomorrow I'm going to reset staging back to sync mode.

# staging
helmfile helmfile diff -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=10 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0 \
  --set app.flinkConfiguration.python\\.fn-execution\\.bundle\\.size=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_async_enabled_default=false

I haven't been paying attention to production since I tried JEMALLOC, but either JEMALLOC or its reduced parallelism of 20 is causing pretty frequent restarts. I'm going to reset it to similar to staging, but a little less parallelism, savepoints enabled, and more frequent checkpointing, and also no JEMALLOC.

# production
helmfile diff -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=10 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=1m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=4 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=5m \
  --set app.flinkConfiguration.python\\.fn-execution\\.bundle\\.size=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_async_enabled_default=false
Status update 2026-04-20
  • Reducing checkpoint interval makes the pipeline stable(ish).
  • Async mini batches does not seem better than sync.  Why?  In either case, async mini batches + high latency increase checkpoint durations, and the larger the batch, the higher the checkpoint duration.
  • Sync mode seems acceptable and more stable.  I can get a stableish backfill going at about 50-60 msgs per second on average over days.  This is enough to catch up after outages. 
  • However, we should be able to do more throughput than 60 / sec.  That 60 / sec is an average.  We sometimes do spikes of 200+/second, before the pipeline stalls processing them.  (I’m basing these numbers on actual kafka messages produced.  If we can produce 200 messages / sec to kafka, we should be able to tune flink to do that regularly.)
  • In all configuration I tried, TM memory usage slowly increases.  AFAICT, k8s eventually container OOMs one of them, and then the JM restarts the job from the latest checkpoint and continues without intervention.

So: I am pretty sure we can run this thing in prod now.  We can continue to experiment with efficiency improvements in staging.

Change #1275441 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] html-enrich - try mw-api-int to get earlier envoy timeout

https://gerrit.wikimedia.org/r/1275441

Change #1275441 merged by jenkins-bot:

[operations/deployment-charts@master] html-enrich - try mw-api-int to get earlier envoy timeout

https://gerrit.wikimedia.org/r/1275441

@dcausse pointed out that we are using mw-api-int-async, which has a timeout of 120s, not 60s! Using mw-api-int will give us a timeout of 60s.

html-enrich - try mw-api-int to get earlier envoy timeout (1275441)

We should probably do as Search team has done, but that will require app code changes. For now, this will let us actually test a 60s client timeout!

# staging
helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.taskManager.resource.memory=6000m \
  --set app.flinkConfiguration.taskmanager\\.numberOfTaskSlots=4 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.timeout=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval-during-backlog=10m \
  --set app.flinkConfiguration.execution\\.checkpointing\\.min-pause=30000 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.tolerable-failed-checkpoints=10 \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0 \
  --set app.flinkConfiguration.python\\.fn-execution\\.bundle\\.size=1 \
  --set app.config_files.app\\.config\\.yaml.stream_manager.process_async_enabled_default=false

Change #1275445 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] html-enrich - use mw-api-int for stream config too

https://gerrit.wikimedia.org/r/1275445

Change #1275445 merged by Ottomata:

[operations/deployment-charts@master] html-enrich - use mw-api-int for stream config too

https://gerrit.wikimedia.org/r/1275445

Change #1275453 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] html-enrich - update values with latest settings from T421216

https://gerrit.wikimedia.org/r/1275453

Change #1275453 merged by jenkins-bot:

[operations/deployment-charts@master] html-enrich - update values with latest settings from T421216

https://gerrit.wikimedia.org/r/1275453

Change #1275458 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] html-enrich - set tolerable-failed-checkpoints

https://gerrit.wikimedia.org/r/1275458

Change #1275458 merged by Ottomata:

[operations/deployment-charts@master] html-enrich - set tolerable-failed-checkpoints

https://gerrit.wikimedia.org/r/1275458

Just applied our current settings to prod and deployed without any overrides:

# production
helmfile apply -e dse-k8s-eqiad

And for staging, here are the current --set overrides in case we need to redeploy it before we reset it to normal staging life.

helmfile apply -e dse-k8s-eqiad \
  --set app.version=v1.52.0 \
  --set app.taskManager.replicas=20 \
  --set app.flinkConfiguration.execution\\.checkpointing\\.interval=10m \
  --set app.flinkConfiguration.kubernetes\\.operator\\.periodic\\.savepoint\\.interval=0

I think we can close this task. If we ever need to backfill data, with the current configuration, we only need to increase the number of replicas. I'll write that down on the https://wikitech.wikimedia.org/wiki/MediaWiki_Event_Enrichment/HTML_Enrichment