In case of AWS MSK patching or scheduled maintenance, the brokers are patched and rebooted one at a time. During this process the kafka consumers for example encounter this error which bubbles up to the grpc channel the DAGs run - for example the batches
```
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "sasl_ssl://xxxx.kafka.us-east-1.amazonaws.com:9096/bootstrap: Connect to ipv4#xx.xxx.xxx.xxx:9096 failed: Connection refused (after 1ms in state CONNECT)"
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2025-06-12T10:16:26.064943775+00:00", grpc_status:2, grpc_message:"sasl_ssl://xxxx.kafka.us-east-1.amazonaws.com:9096/bootstrap: Connect to ipv4#xx.xxx.xx.xxx:9096 failed: Connection refused (after 1ms in state CONNECT)"}"
```
Whenever a new consumer pool is created via confluent-kakfa-go, it loads the default settings from librdkafka [[ https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_conf.c | config ]] in addition to the custom settings.
During broker failures, the kafka client retries indefinitely within these boundaries
```
retry.backoff.ms 100 ms
reconnect.backoff.ms 100 ms
reconnect.backoff.max.ms 10000 ms
```
The `socket.keepalive.enable` is set to `false` by default which if enabled can help to detect dead tcp connections.
The recommended options are as below:
1. Enable the `"socket.keepalive.enable": true` in the kafka configmap setting
2. In case of all the kafka clients code:
a. The retry parameters mentioned above, and the deadline of the Kafka call (e.g. `consumer.ReadMessage(timeout)`), should be tuned to fit the use case.
b. The calling code should handle the kafka.error. The errors listed below should be ignored and can be logged as an info/debug as the librdkafka will try to recover automatically. More on the librdkafka error codes: [[ https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h | rdkafka.h ]]
```
kafka.ErrTransport RD_KAFKA_RESP_ERR__TRANSPORT -195
kafka.ErrAllBrokersDown RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN -187
```
Reproducing the problem:
The problem can be reproduced by running batches DAG or export.go operation and rebooting one of the kafka brokers manually.
Rule of thumb:
Let librdkafka do what it does the best and the code should ignore above errors and let librdkafka retry but the upstream callers such as the kafka producers/consumers should define its own SLA/timeouts:
* enforce its own definition of “progress”
* Must fail tasks/workflows that stall
* Should apply deadlines and circuit-breaking
Services:
[] snapshots (includes batches and structured-snapshots)