In case of AWS MSK patching or scheduled maintenance, the brokers are patched and rebooted one at a time. During this process the kafka consumers for example encounter this error which bubbles up to the grpc channel the DAGs run - for example the batches
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "sasl_ssl://xxxx.kafka.us-east-1.amazonaws.com:9096/bootstrap: Connect to ipv4#xx.xxx.xxx.xxx:9096 failed: Connection refused (after 1ms in state CONNECT)"
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2025-06-12T10:16:26.064943775+00:00", grpc_status:2, grpc_message:"sasl_ssl://xxxx.kafka.us-east-1.amazonaws.com:9096/bootstrap: Connect to ipv4#xx.xxx.xx.xxx:9096 failed: Connection refused (after 1ms in state CONNECT)"}"Whenever a new consumer pool is created via confluent-kakfa-go, it loads the default settings from librdkafka config in addition to the custom settings.
During broker failures, the kafka client retries indefinitely within these boundaries
retry.backoff.ms 100 ms reconnect.backoff.ms 100 ms reconnect.backoff.max.ms 10000 ms
The socket.keepalive.enable is set to false by default which if enabled can help to detect dead tcp connections.
The recommended options are as below:
- Enable the "socket.keepalive.enable": true in the kafka configmap setting
- In case of all the kafka clients code: a. The retry parameters mentioned above, and the deadline of the Kafka call (e.g. consumer.ReadMessage(timeout)), should be tuned to fit the use case. b. The calling code should handle the kafka.error. The errors listed below should be ignored and can be logged as an info/debug as the librdkafka will try to recover automatically. More on the librdkafka error codes: rdkafka.h
kafka.ErrTransport RD_KAFKA_RESP_ERR__TRANSPORT -195
Reproducing the problem:
The problem can be reproduced by running batches DAG or export.go operation and rebooting one of the kafka brokers manually.
Rule of thumb:
Let librdkafka do what it does the best and the code should ignore above errors and let librdkafka retry but the upstream callers such as the kafka producers/consumers should define its own SLA/timeouts:
- enforce its own definition of “progress”
- Must fail tasks/workflows that stall
- Should apply deadlines and circuit-breaking
Services:
- snapshots (includes batches and structured-snapshots)