starting today at ~9:30 logstash is getting spammed with messages such as the following:
{"log":"2024-06-04 13:13:14,839 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception\n","stream":"stdout","time":"2024-06-04T13:13:14.860662077Z"} {"log":"java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer\n","stream":"stdout","time":"202 4-06-04T13:13:14.860664034Z"} {"log":"\u0009at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)\n","stream":"stdout","time":"2024-06-04T13:13:14.860666204Z"} {"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1888)\n","stream":"stdout","time":"2024-06-04T13:13:14.860668127Z"} {"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1337)\n","stream":"stdout","time":"2024-06-04T13:13:14.860670284Z"} {"log":"\u0009at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n","stream":"stdout","time":"2024-06-04T13:13:14.860672466Z"} {"log":"\u0009at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n","stream":"stdout","time":"2024-06-04T13:13:14.860675202Z"} {"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n","stream":"stdout","time":"2024-06-04T13:13:14.860677397Z"} {"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n","stream":"stdout","time":"2024-06-04T13:13:14.860679315Z"} {"log":"\u0009at java.base/java.lang.Thread.run(Thread.java:829)\n","stream":"stdout","time":"2024-06-04T13:13:14.860681186Z"} {"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeEvent_v4-1 at offset 8398. If needed, please seek past the record to continue consumption.\n","stream":"std out","time":"2024-06-04T13:13:14.860682971Z"} {"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1\n","stream":"stdout","time":"2024-06-04T13:13:14.860684996Z"} {"log":"Caused by: java.lang.ArrayIndexOutOfBoundsException: null\n","stream":"stdout","time":"2024-06-04T13:13:14.860687043Z"} {"log":"2024-06-04 13:13:14,839 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception\n","stream":"stdout","time":"2024-06-04T13:13:14.860689132Z"} {"log":"java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer\n","stream":"stdout","time":"202 4-06-04T13:13:14.860691352Z"} {"log":"\u0009at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)\n","stream":"stdout","time":"2024-06-04T13:13:14.860695442Z"} {"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1888)\n","stream":"stdout","time":"2024-06-04T13:13:14.860698688Z"} {"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1337)\n","stream":"stdout","time":"2024-06-04T13:13:14.860700815Z"} {"log":"\u0009at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n","stream":"stdout","time":"2024-06-04T13:13:14.860702735Z"} {"log":"\u0009at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n","stream":"stdout","time":"2024-06-04T13:13:14.860704589Z"} {"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n","stream":"stdout","time":"2024-06-04T13:13:14.860706556Z"} {"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n","stream":"stdout","time":"2024-06-04T13:13:14.860708471Z"} {"log":"\u0009at java.base/java.lang.Thread.run(Thread.java:829)\n","stream":"stdout","time":"2024-06-04T13:13:14.860710379Z"} {"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeEvent_v4-1 at offset 8398. If needed, please seek past the record to continue consumption.\n","stream":"std out","time":"2024-06-04T13:13:14.860712236Z"} {"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1\n","stream":"stdout","time":"2024-06-04T13:13:14.860714475Z"} {"log":"Caused by: java.lang.ArrayIndexOutOfBoundsException: null\n","stream":"stdout","time":"2024-06-04T13:13:14.860716339Z"} {"log":"2024-06-04 13:13:14,840 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception\n","stream":"stdout","time":"2024-06-04T13:13:14.860718279Z"} {"log":"java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer\n","stream":"stdout","time":"202 4-06-04T13:13:14.860720342Z"} {"log":"\u0009at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)\n","stream":"stdout","time":"2024-06-04T13:13:14.860722787Z"} {"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1888)\n","stream":"stdout","time":"2024-06-04T13:13:14.860725144Z"} {"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1337)\n","stream":"stdout","time":"2024-06-04T13:13:14.860727191Z"} {"log":"\u0009at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n","stream":"stdout","time":"2024-06-04T13:13:14.860729181Z"} {"log":"\u0009at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n","stream":"stdout","time":"2024-06-04T13:13:14.860731022Z"} {"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n","stream":"stdout","time":"2024-06-04T13:13:14.860732897Z"} {"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n","stream":"stdout","time":"2024-06-04T13:13:14.860734943Z"} {"log":"\u0009at java.base/java.lang.Thread.run(Thread.java:829)\n","stream":"stdout","time":"2024-06-04T13:13:14.860736949Z"} {"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeEvent_v4-1 at offset 8398. If needed, please seek past the record to continue consumption.\n","stream":"std out","time":"2024-06-04T13:13:14.860739722Z"}
and an individual message as seen from logstash (the above I took directly from the host)
{ "_index": "logstash-k8s-1-7.0.0-1-2024.06.04", "_id": "gySe4o8BWp6-Qp2u1boX", "_version": 1, "_score": null, "_source": { "kubernetes": { "pod_name": "datahub-mce-consumer-production-7b46487d75-k9hrz", "labels": { "pod-template-hash": "7b46487d75", "app": "datahub-mce-consumer", "release": "production", "routed_via": "production" }, "host": "dse-k8s-worker1005.eqiad.wmnet", "namespace_id": "766f4ee5-f4af-4d0c-82da-01fd9b01ecce", "container_name": "datahub-mce-consumer-production", "namespace_labels": { "chart": "raw-0.3.0", "heritage": "Helm", "app_kubernetes_io/managed-by": "Helm", "pod-security_kubernetes_io/audit": "restricted", "app": "raw", "release": "namespaces", "kubernetes_io/metadata_name": "datahub" }, "pod_id": "a2c73e80-d7e0-40c6-a5bc-2abae9974878", "master_url": "https://dse-k8s-ctrl.svc.eqiad.wmnet:6443", "creation_timestamp": "2024-04-30T16:18:44Z", "namespace_name": "datahub" }, "stream": "stdout", "log": "2024-06-04 09:38:17,179 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception\n", "tags": [ "input-kafka-rsyslog-shipper", "rsyslog-shipper", "kafka", "es", "syslog", "es", "normalized_message_untrimmed" ], "normalized_message": "%{message}", "level": "NOTICE", "metadata": { "fileoffset": "53615741", "filename": "/var/log/containers/datahub-mce-consumer-production-7b46487d75-k9hrz_datahub_datahub-mce-consumer-production-5f06ac5976f2c46347eb3bbbd697c213e3644e25f22d799c7135c23ca098dba5.log" }, "program": "input-file-kubernetes", "host": "dse-k8s-worker1005", "logsource": "dse-k8s-worker1005", "time": "2024-06-04T09:38:17.20885223Z", "@timestamp": "2024-06-04T09:40:10.861Z", "docker": { "container_id": "5f06ac5976f2c46347eb3bbbd697c213e3644e25f22d799c7135c23ca098dba5" }, "type": "syslog", "timestamp": "2024-06-04T09:38:23.998801+00:00", "facility": "local0", "@version": "1" }, "fields": { "@timestamp": [ "2024-06-04T09:40:10.861Z" ] }, "highlight": { "host": [ "@opensearch-dashboards-highlighted-field@dse-k8s-worker1005@/opensearch-dashboards-highlighted-field@" ] }, "sort": [ 1717494010861 ] }
cc @BTullis