Page MenuHomePhabricator

datahub-mce-consumer spamming logstash with messages
Closed, ResolvedPublic

Description

starting today at ~9:30 logstash is getting spammed with messages such as the following:

{"log":"2024-06-04 13:13:14,839 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception\n","stream":"stdout","time":"2024-06-04T13:13:14.860662077Z"}
{"log":"java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer\n","stream":"stdout","time":"202
4-06-04T13:13:14.860664034Z"}                                                                                                                                                                                                                  {"log":"\u0009at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)\n","stream":"stdout","time":"2024-06-04T13:13:14.860666204Z"}
{"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1888)\n","stream":"stdout","time":"2024-06-04T13:13:14.860668127Z"}
{"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1337)\n","stream":"stdout","time":"2024-06-04T13:13:14.860670284Z"}
{"log":"\u0009at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n","stream":"stdout","time":"2024-06-04T13:13:14.860672466Z"}
{"log":"\u0009at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n","stream":"stdout","time":"2024-06-04T13:13:14.860675202Z"}
{"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n","stream":"stdout","time":"2024-06-04T13:13:14.860677397Z"}
{"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n","stream":"stdout","time":"2024-06-04T13:13:14.860679315Z"}
{"log":"\u0009at java.base/java.lang.Thread.run(Thread.java:829)\n","stream":"stdout","time":"2024-06-04T13:13:14.860681186Z"}
{"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeEvent_v4-1 at offset 8398. If needed, please seek past the record to continue consumption.\n","stream":"std
out","time":"2024-06-04T13:13:14.860682971Z"}                                                                                                                                                                                                  {"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1\n","stream":"stdout","time":"2024-06-04T13:13:14.860684996Z"}
{"log":"Caused by: java.lang.ArrayIndexOutOfBoundsException: null\n","stream":"stdout","time":"2024-06-04T13:13:14.860687043Z"}
{"log":"2024-06-04 13:13:14,839 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception\n","stream":"stdout","time":"2024-06-04T13:13:14.860689132Z"}
{"log":"java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer\n","stream":"stdout","time":"202
4-06-04T13:13:14.860691352Z"}                                                                                                                                                                                                                  {"log":"\u0009at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)\n","stream":"stdout","time":"2024-06-04T13:13:14.860695442Z"}
{"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1888)\n","stream":"stdout","time":"2024-06-04T13:13:14.860698688Z"}
{"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1337)\n","stream":"stdout","time":"2024-06-04T13:13:14.860700815Z"}
{"log":"\u0009at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n","stream":"stdout","time":"2024-06-04T13:13:14.860702735Z"}
{"log":"\u0009at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n","stream":"stdout","time":"2024-06-04T13:13:14.860704589Z"}
{"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n","stream":"stdout","time":"2024-06-04T13:13:14.860706556Z"}
{"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n","stream":"stdout","time":"2024-06-04T13:13:14.860708471Z"}
{"log":"\u0009at java.base/java.lang.Thread.run(Thread.java:829)\n","stream":"stdout","time":"2024-06-04T13:13:14.860710379Z"}
{"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeEvent_v4-1 at offset 8398. If needed, please seek past the record to continue consumption.\n","stream":"std
out","time":"2024-06-04T13:13:14.860712236Z"}                                                                                                                                                                                                  {"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1\n","stream":"stdout","time":"2024-06-04T13:13:14.860714475Z"}
{"log":"Caused by: java.lang.ArrayIndexOutOfBoundsException: null\n","stream":"stdout","time":"2024-06-04T13:13:14.860716339Z"}
{"log":"2024-06-04 13:13:14,840 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception\n","stream":"stdout","time":"2024-06-04T13:13:14.860718279Z"}
{"log":"java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer\n","stream":"stdout","time":"202
4-06-04T13:13:14.860720342Z"}                                                                                                                                                                                                                  {"log":"\u0009at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:198)\n","stream":"stdout","time":"2024-06-04T13:13:14.860722787Z"}
{"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1888)\n","stream":"stdout","time":"2024-06-04T13:13:14.860725144Z"}
{"log":"\u0009at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1337)\n","stream":"stdout","time":"2024-06-04T13:13:14.860727191Z"}
{"log":"\u0009at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n","stream":"stdout","time":"2024-06-04T13:13:14.860729181Z"}
{"log":"\u0009at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n","stream":"stdout","time":"2024-06-04T13:13:14.860731022Z"}
{"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n","stream":"stdout","time":"2024-06-04T13:13:14.860732897Z"}
{"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n","stream":"stdout","time":"2024-06-04T13:13:14.860734943Z"}
{"log":"\u0009at java.base/java.lang.Thread.run(Thread.java:829)\n","stream":"stdout","time":"2024-06-04T13:13:14.860736949Z"}
{"log":"Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeEvent_v4-1 at offset 8398. If needed, please seek past the record to continue consumption.\n","stream":"std
out","time":"2024-06-04T13:13:14.860739722Z"}

and an individual message as seen from logstash (the above I took directly from the host)

{
  "_index": "logstash-k8s-1-7.0.0-1-2024.06.04",
  "_id": "gySe4o8BWp6-Qp2u1boX",
  "_version": 1,
  "_score": null,
  "_source": {
    "kubernetes": {
      "pod_name": "datahub-mce-consumer-production-7b46487d75-k9hrz",
      "labels": {
        "pod-template-hash": "7b46487d75",
        "app": "datahub-mce-consumer",
        "release": "production",
        "routed_via": "production"
      },
      "host": "dse-k8s-worker1005.eqiad.wmnet",
      "namespace_id": "766f4ee5-f4af-4d0c-82da-01fd9b01ecce",
      "container_name": "datahub-mce-consumer-production",
      "namespace_labels": {
        "chart": "raw-0.3.0",
        "heritage": "Helm",
        "app_kubernetes_io/managed-by": "Helm",
        "pod-security_kubernetes_io/audit": "restricted",
        "app": "raw",
        "release": "namespaces",
        "kubernetes_io/metadata_name": "datahub"
      },
      "pod_id": "a2c73e80-d7e0-40c6-a5bc-2abae9974878",
      "master_url": "https://dse-k8s-ctrl.svc.eqiad.wmnet:6443",
      "creation_timestamp": "2024-04-30T16:18:44Z",
      "namespace_name": "datahub"
    },
    "stream": "stdout",
    "log": "2024-06-04 09:38:17,179 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer - Consumer exception\n",
    "tags": [
      "input-kafka-rsyslog-shipper",
      "rsyslog-shipper",
      "kafka",
      "es",
      "syslog",
      "es",
      "normalized_message_untrimmed"
    ],
    "normalized_message": "%{message}",
    "level": "NOTICE",
    "metadata": {
      "fileoffset": "53615741",
      "filename": "/var/log/containers/datahub-mce-consumer-production-7b46487d75-k9hrz_datahub_datahub-mce-consumer-production-5f06ac5976f2c46347eb3bbbd697c213e3644e25f22d799c7135c23ca098dba5.log"
    },
    "program": "input-file-kubernetes",
    "host": "dse-k8s-worker1005",
    "logsource": "dse-k8s-worker1005",
    "time": "2024-06-04T09:38:17.20885223Z",
    "@timestamp": "2024-06-04T09:40:10.861Z",
    "docker": {
      "container_id": "5f06ac5976f2c46347eb3bbbd697c213e3644e25f22d799c7135c23ca098dba5"
    },
    "type": "syslog",
    "timestamp": "2024-06-04T09:38:23.998801+00:00",
    "facility": "local0",
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2024-06-04T09:40:10.861Z"
    ]
  },
  "highlight": {
    "host": [
      "@opensearch-dashboards-highlighted-field@dse-k8s-worker1005@/opensearch-dashboards-highlighted-field@"
    ]
  },
  "sort": [
    1717494010861
  ]
}

cc @BTullis

Event Timeline

BTullis triaged this task as High priority.

I am roll-restarting the deployment of datahub on the dse-k8s cluster:

btullis@deploy1002:/srv/deployment-charts/helmfile.d/dse-k8s-services/datahub$ helmfile -e dse-k8s-eqiad --state-values-set roll_restart=1 sync

Currently it is deployed three times, twice on wikikube (eqiad,codfw) and once on dse-k8s. Talking to the same backend database, opensearch cluster, karapace, and kafka cluster.
This should be cleaner once T366338: Delete datahub WikiKube release references is completed.

Change #1038786 had a related patch set uploaded (by Cwhite; author: Cwhite):

[operations/puppet@production] logstash: drop messages from datahub-mce-consumer

https://gerrit.wikimedia.org/r/1038786

Change #1038786 merged by Cwhite:

[operations/puppet@production] logstash: drop messages from datahub-mce-consumer

https://gerrit.wikimedia.org/r/1038786

Interestingly, it was the *mce* consumer this time, but the *mae* consumer last time T363843: Datahub mae consumer spamming logstash in wikikube staging.
Similar failure mode, as the components are quite similar themselves, but not identical.

Change #1038783 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Update the logstash filters for datahub mae/mce consumer pods

https://gerrit.wikimedia.org/r/1038783

Change #1038783 abandoned by Btullis:

[operations/puppet@production] Update the logstash filters for datahub mae/mce consumer pods

Reason:

Dumplicate of https://gerrit.wikimedia.org/r/c/operations/puppet/+/1038787

https://gerrit.wikimedia.org/r/1038783

I believe that we have an idea why this happened. There was a discrepancy between the OpenSearch indices used and the database that was used for datahub-next.
As it has stopped spamming and has also been filtered out of the logstash indices, I think we can close this, for now.

We will try to schedule T363856: Configure datahub to produce structured logs by configuring slf4j or similar for the near future, so that we can get some better logging, then perhaps re-enable it in logstash when we have more confidence in it.