Page MenuHomePhabricator

Verify duplicate entry warnings logged by the m4 mysql consumer
Closed, ResolvedPublic13 Estimated Story Points

Description

Sometimes the eventlogging_consumer-mysql-m4-master-00.log shows warnings like the following:

/usr/lib/python2.7/dist-packages/sqlalchemy/engine/default.py:324: Warning: Duplicate entry '9c635730293d562d9c6b483355007919' for key 'ix_Print_17630514_uuid'
  cursor.execute(statement, parameters)

In theory we'd tolerate this warning to happen only when Kafka Broker changes disrupt the regular flow of data (Broker restarted, etc..), but judging from the timings it seems that it happens also in other use cases. Let's investigate what is happening and verify that we are not rejecting data for some unknown reason.

Event Timeline

elukey triaged this task as Medium priority.Jan 19 2018, 8:08 AM
elukey created this task.

One thing that I noticed is that when a burst of warning happens the following is registered around the same time in syslog:

Jan 19 08:08:39 eventlog1001 kernel: [325759.935997] init: eventlogging/processor (client-side-06) main process (29447) terminated with status 1
Jan 19 08:08:39 eventlog1001 kernel: [325759.936015] init: eventlogging/processor (client-side-06) main process ended, respawning

It happens for multiple processors, and it seems not sporadically during a day:

root@eventlog1001:/srv/log/upstart# grep eventlogging/processor /var/log/syslog.1 | grep -c terminated
276

In this particular use case this happened:

2018-01-19 08:08:40,288 [15009] (MainThread) kafka 'topic' argument is deprecated, use 'topics'
2018-01-19 08:08:40,362 [15009] (MainThread) old ssl module detected. ssl error handling may not operate cleanly. Consider upgrading to python 3.5 or 2.7
2018-01-19 08:08:40,423 [15009] (MainThread) Consuming topics [u'eventlogging-client-side'] from Kafka in group eventlogging_processor_client_side_00 as eventlogging_processor_client_side_00-eventlog1001.eqiad.wmnet.15009
2018-01-19 08:08:40,424 [15009] (MainThread) Updating subscribed topics to: [u'eventlogging-client-side']
2018-01-19 08:08:40,425 [15009] (MainThread) Error sending GroupCoordinatorRequest_v0 to node 18 [NodeNotReadyError: 18]
2018-01-19 08:08:40,527 [15009] (MainThread) Group coordinator for eventlogging_processor_client_side_00 is BrokerMetadata(nodeId=22, host=u'kafka1022.eqiad.wmnet', port=9092, rack=None)
2018-01-19 08:08:40,528 [15009] (MainThread) Discovered coordinator 22 for group eventlogging_processor_client_side_00
2018-01-19 08:08:40,529 [15009] (MainThread) Revoking previously assigned partitions set([]) for group eventlogging_processor_client_side_00
2018-01-19 08:08:40,530 [15009] (MainThread) (Re-)joining group eventlogging_processor_client_side_00
2018-01-19 08:09:09,102 [15009] (MainThread) Joined group 'eventlogging_processor_client_side_00' (generation 859) with member_id eventlogging_processor_client_side_00-eventlog1001.eqiad.wmnet.15009-ce074758-ec4c-4f16-b69f-0da30f61fb97
2018-01-19 08:09:09,153 [15009] (MainThread) Successfully joined group eventlogging_processor_client_side_00 with generation 859
2018-01-19 08:09:09,154 [15009] (MainThread) Updated partition assignment: [TopicPartition(topic=u'eventlogging-client-side', partition=4)]
2018-01-19 08:09:09,154 [15009] (MainThread) Setting newly assigned partitions set([TopicPartition(topic=u'eventlogging-client-side', partition=4)]) for group eventlogging_processor_client_side_00

root@eventlog1001:/srv/log/upstart# grep eventlogging/processor /var/log/syslog.1 | grep -c terminated
276

There are 12 processor instances, so this means roughly an average of 1 restart per processor per hour.

So the processor does event['uuid'] = capsule_uuid(event) that is defined like this:

def capsule_uuid(capsule):
    """Generate a UUID for a capsule object.
    Gets a unique URI for the capsule using `EVENTLOGGING_URL_FORMAT`
    and uses it to generate a UUID5 in the URL namespace.
    ..seealso:: `RFC 4122 <https://www.ietf.org/rfc/rfc4122.txt>`_.
    :param capsule: A capsule object (or any dictionary that defines
      `recvFrom`, `seqId`, and `dt`).
    """
    uuid_fields = {
        'recvFrom': capsule.get('recvFrom'),
        'seqId': capsule.get('seqId'),
        # TODO: remove this timestamp default as part of T179625
        'dt': capsule.get('dt', iso8601_from_timestamp(
            capsule.get('timestamp', time.time())
        ))
    }

    id = uuid5(uuid.NAMESPACE_URL, EVENTLOGGING_URL_FORMAT % uuid_fields)
    return '%032x' % id.int

So maybe, for some reason, the above disconnect events cause the processor's kafka consumer to read the same event two times, ending up down the line with these warnings?

Just tested the use case in the description on stat1004 with:

kafkacat -C -b kafka1023.eqiad.wmnet -t eventlogging_Print -p0 -o beginning | grep --color 9c635730293d562d9c6b483355007919

It returns two identical events (same dt, same recvFrom, same seqId). So we are not loosing data but only duplicating it.

elukey edited projects, added Analytics-Kanban; removed Analytics.
elukey moved this task from Next Up to In Progress on the Analytics-Kanban board.

Hm, this sounds right to me. The question now is why is the processor process restarting?

So from https://github.com/dpkp/kafka-python/blob/master/kafka/errors.py it seems that Error sending GroupCoordinatorRequest_v0 to node 18 [NodeNotReadyError: 18] should be related to this:

class RecordListTooLargeError(BrokerResponseError):
    errno = 18
    message = 'RECORD_LIST_TOO_LARGE'
    description = ('If a message batch in a produce request exceeds the maximum'
                   ' configured segment size.')

I may not have followed the right trail during the last time, I tried to retrace again an error today. Started from eventlogging_consumer-mysql-m4-master-00.log, took a look to the start of a warning sequence:

2018-01-22 08:18:21,445 [1428] (MainThread) Inserted 205 MultimediaViewerDimensions_10014238 events in 0.122139 seconds
/usr/lib/python2.7/dist-packages/sqlalchemy/engine/default.py:324: Warning: Duplicate entry '037e8695e553539b89dd284b5b70a154' for key 'ix_Print_17630514_uuid'
  cursor.execute(statement, parameters)
/usr/lib/python2.7/dist-packages/sqlalchemy/engine/default.py:324: Warning: Duplicate entry '8b7f9b498c145d728fe6f194b2254c6d' for key 'ix_Print_17630514_uuid'
  cursor.execute(statement, parameters)
/usr/lib/python2.7/dist-packages/sqlalchemy/engine/default.py:324: Warning: Duplicate entry '1bde0b51d4d2546b8a0fbe650b510d0a' for key 'ix_Print_17630514_uuid'
  cursor.execute(statement, parameters)
/usr/lib/python2.7/dist-packages/sqlalchemy/
[..]

In /var/log/syslog I can see:

Jan 22 08:16:50 eventlog1001 kernel: [585650.277565] init: eventlogging/processor (client-side-08) main process (9735) terminated with status 1
Jan 22 08:16:50 eventlog1001 kernel: [585650.277593] init: eventlogging/processor (client-side-08) main process ended, respawning
Jan 22 08:17:01 eventlog1001 CRON[25933]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)
Jan 22 08:17:01 eventlog1001 CRON[25935]: (prometheus) CMD (/usr/local/bin/prometheus-puppet-agent-stats --outfile /var/lib/prometheus/node.d/puppet_agent.prom)
Jan 22 08:17:20 eventlog1001 kernel: [585680.574404] init: eventlogging/processor (client-side-11) main process (10053) terminated with status 1
Jan 22 08:17:20 eventlog1001 kernel: [585680.574420] init: eventlogging/processor (client-side-11) main process ended, respawning
Jan 22 08:17:50 eventlog1001 kernel: [585710.508824] init: eventlogging/processor (client-side-07) main process (9907) terminated with status 1
Jan 22 08:17:50 eventlog1001 kernel: [585710.508840] init: eventlogging/processor (client-side-07) main process ended, respawning
Jan 22 08:17:51 eventlog1001 kernel: [585711.146030] init: eventlogging/processor (client-side-04) main process (10047) terminated with status 1
Jan 22 08:17:51 eventlog1001 kernel: [585711.146044] init: eventlogging/processor (client-side-04) main process ended, respawning
Jan 22 08:17:53 eventlog1001 kernel: [585713.804672] init: eventlogging/processor (client-side-10) main process (10240) terminated with status 1
Jan 22 08:17:53 eventlog1001 kernel: [585713.804699] init: eventlogging/processor (client-side-10) main process ended, respawning
Jan 22 08:18:01 eventlog1001 CRON[26286]: (prometheus) CMD (/usr/local/bin/prometheus-puppet-agent-stats --outfile /var/lib/prometheus/node.d/puppet_agent.prom)
Jan 22 08:18:24 eventlog1001 kernel: [585744.308362] init: eventlogging/processor (client-side-03) main process (10476) terminated with status 1
Jan 22 08:18:24 eventlog1001 kernel: [585744.308374] init: eventlogging/processor (client-side-03) main process ended, respawning
Jan 22 08:18:55 eventlog1001 kernel: [585775.632579] init: eventlogging/processor (client-side-00) main process (10595) terminated with status 1
Jan 22 08:18:55 eventlog1001 kernel: [585775.632594] init: eventlogging/processor (client-side-00) main process ended, respawning
Jan 22 08:19:01 eventlog1001 CRON[26603]: (prometheus) CMD (/usr/local/bin/prometheus-puppet-agent-stats --outfile /var/lib/prometheus/node.d/puppet_agent.prom)
Jan 22 08:19:30 eventlog1001 kernel: [585810.172887] init: eventlogging/processor (client-side-06) main process (11137) terminated with status 1
Jan 22 08:19:30 eventlog1001 kernel: [585810.172902] init: eventlogging/processor (client-side-06) main process ended, respawning

Processor 08, 8:16:

2018-01-22 08:16:26,970 [9735] (MainThread) Unable to validate: [...]

Traceback (most recent call last):
  File "/srv/deployment/eventlogging/analytics/bin/eventlogging-processor", line 151, in <module>
    w.send(event)
  File "/srv/deployment/eventlogging/analytics/eventlogging/handlers.py", line 426, in kafka_confluent_writer
    kafka_producer.produce(message_topic, message_value, message_key)
BufferError: Local: Queue full

2018-01-22 08:16:50,742 [25850] (MainThread) Publishing valid JSON events to kafka-confluent:///kafka1012.eqiad.wmnet:9092,kafka1013.eqiad.wmnet:9092,kafka1014.eqiad.wmnet:9092,kafka1020.eqiad.wmnet:9092,kafka1022.eqiad.wmnet:9092,kafka1023.eqiad.wmnet:9092?topic=eventlogging_{schema}&message.send.max.retries=6,retry.backoff.ms=200.
2018-01-22 08:16:50,750 [25850] (MainThread) Publishing valid JSON events to kafka-confluent:///kafka1012.eqiad.wmnet:9092,kafka1013.eqiad.wmnet:9092,kafka1014.eqiad.wmnet:9092,kafka1020.eqiad.wmnet:9092,kafka1022.eqiad.wmnet:9092,kafka1023.eqiad.wmnet:9092?topic=eventlogging-valid-mixed&blacklist=^(Analytics|CentralNoticeBannerHistory|ImageMetricsLoadingTime|ImageMetricsCorsSupport|Popups|SearchSatisfaction)$&message.send.max.retries=6,retry.backoff.ms=200.
2018-01-22 08:16:50,750 [25850] (MainThread) Publishing invalid raw events to kafka-confluent:///kafka1012.eqiad.wmnet:9092,kafka1013.eqiad.wmnet:9092,kafka1014.eqiad.wmnet:9092,kafka1020.eqiad.wmnet:9092,kafka1022.eqiad.wmnet:9092,kafka1023.eqiad.wmnet:9092?topic=eventlogging_{schema}&message.send.max.retries=6,retry.backoff.ms=200.

2018-01-22 08:16:50,751 [25850] (MainThread) kafka 'topic' argument is deprecated, use 'topics'
2018-01-22 08:16:50,838 [25850] (MainThread) old ssl module detected. ssl error handling may not operate cleanly. Consider upgrading to python 3.5 or 2.7

2018-01-22 08:16:50,919 [25850] (MainThread) Consuming topics [u'eventlogging-client-side'] from Kafka in group eventlogging_processor_client_side_00 as eventlogging_processor_client_side_00-eventlog1001.eqiad.wmnet.25850
2018-01-22 08:16:50,919 [25850] (MainThread) Updating subscribed topics to: [u'eventlogging-client-side']
2018-01-22 08:16:50,920 [25850] (MainThread) Error sending GroupCoordinatorRequest_v0 to node 20 [NodeNotReadyError: 20]
2018-01-22 08:16:51,021 [25850] (MainThread) Group coordinator for eventlogging_processor_client_side_00 is BrokerMetadata(nodeId=22, host=u'kafka1022.eqiad.wmnet', port=9092, rack=None)
2018-01-22 08:16:51,021 [25850] (MainThread) Discovered coordinator 22 for group eventlogging_processor_client_side_00
2018-01-22 08:16:51,022 [25850] (MainThread) Revoking previously assigned partitions set([]) for group eventlogging_processor_client_side_00
2018-01-22 08:16:51,022 [25850] (MainThread) (Re-)joining group eventlogging_processor_client_side_00
2018-01-22 08:16:51,022 [25850] (MainThread) Error sending JoinGroupRequest_v0 to node 22 [NodeNotReadyError: 22]
2018-01-22 08:16:51,123 [25850] (MainThread) (Re-)joining group eventlogging_processor_client_side_00
2018-01-22 08:17:20,008 [25850] (MainThread) Joined group 'eventlogging_processor_client_side_00' (generation 1698) with member_id eventlogging_processor_client_side_00-eventlog1001.eqiad.wmnet.25850-def040a4-d241-4008-b2ac-f5f1890bfdb2
2018-01-22 08:17:20,019 [25850] (MainThread) Successfully joined group eventlogging_processor_client_side_00 with generation 1698
2018-01-22 08:17:20,020 [25850] (MainThread) Updated partition assignment: [TopicPartition(topic=u'eventlogging-client-side', partition=10)]
2018-01-22 08:17:20,020 [25850] (MainThread) Setting newly assigned partitions set([TopicPartition(topic=u'eventlogging-client-side', partition=10)]) for group eventlogging_processor_client_side_00
2018-01-22 08:17:23,870 [25850] (MainThread) Heartbeat failed for group eventlogging_processor_client_side_00 because it is rebalancing
2018-01-22 08:17:23,871 [25850] (MainThread) Heartbeat failed ([Error 27] RebalanceInProgressError: ); retrying
2018-01-22 08:17:23,875 [25850] (MainThread) Revoking previously assigned partitions set([TopicPartition(topic=u'eventlogging-client-side', partition=10)]) for group eventlogging_processor_client_side_00
2018-01-22 08:17:23,875 [25850] (MainThread) (Re-)joining group eventlogging_processor_client_side_00
2018-01-22 08:17:23,971 [25850] (MainThread) Skipping heartbeat: no auto-assignment or waiting on rebalance
2018-01-22 08:17:50,027 [25850] (MainThread) Joined group 'eventlogging_processor_client_side_00' (generation 1699) with member_id eventlogging_processor_client_side_00-eventlog1001.eqiad.wmnet.25850-def040a4-d241-4008-b2ac-f5f1890bfdb2

2018-01-22 08:17:50,030 [25850] (MainThread) Auto offset commit failed for group eventlogging_processor_client_side_00: CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2018-01-22 08:17:50,031 [25850] (MainThread) Successfully joined group eventlogging_processor_client_side_00 with generation 1699
2018-01-22 08:17:50,031 [25850] (MainThread) Updated partition assignment: [TopicPartition(topic=u'eventlogging-client-side', partition=9)]

Checked processor 11, the error looks the same.. So the BufferError: Local: Queue full seems to be causing the processor to die as far as I can see (pids change in logging), then the rebalance happens when it re-connects.

Change 405687 had a related patch set uploaded (by Elukey; owner: Elukey):
[eventlogging@master] [WIP] kafka_confluent_writer: handle BufferError exception when producing

https://gerrit.wikimedia.org/r/405687

IINnteresting! Could be worth a try for sure.

Super interesting that deployment-eventlog02 shows the same duplicate behavior in the logs, and also:

Traceback (most recent call last):
  File "/srv/deployment/eventlogging/analytics/bin/eventlogging-processor", line 151, in <module>
    w.send(event)
  File "/srv/deployment/eventlogging/analytics/eventlogging/handlers.py", line 426, in kafka_confluent_writer
    kafka_producer.produce(message_topic, message_value, message_key)
BufferError: Local: Queue full

After reading a bit of BufferError issues on github (it happens a lot to people) one thing popped up: batch.num.messages is 10000 by default, and queue.buffering.max.messages is 100000, so I am wondering if we should lower down the former to something like 100. Thoughts?

Other thought: Magnus seems to suggest a call to poll(0) every time a async produce is done, so I am wondering if it would be helpful even if we set of callback at all (say for some reason librdkafka's internals need it regardless).

Following up on my latest comment: https://github.com/confluentinc/confluent-kafka-go/issues/109#issuecomment-344072558 is interesting, it explains very well what is contained in the local buffer. I am wondering if not calling poll() does not free the callback queue (even if we don't set any callback), ending up in the exception that we are seeing. Maybe we could test adding a simple poll(0) after produce and see how it goes?

batch.num.messages is 10000 by default, and queue.buffering.max.messages is 100000

Hm, I thought there was a setting for batch sizes in time as well? I can't seem to find it. I had thought that a batch would either be 10000 messages, or made every N seconds or so.

Mentioned in SAL (#wikimedia-analytics) [2018-02-05T15:51:39Z] <elukey> live hacked deployment-eventlog02's /srv/deployment/eventlogging/analytics/eventlogging/handlers.py to add poll(0) to the confluent kafka producer - T185291

On deployment-eventlog02 I tested https://gerrit.wikimedia.org/r/405687 and with my surprise none of the new warning logs were displayed. There were two possibilities:

  1. The code fixed entirely the occurrence of the BufferError exception.
  1. The logging was not configured/behaving correctly.

After a bit of digging with Andrew I confirmed that 2) is good, so assuming 1) I tried to comment the following line of code:

             # Non blocking poll
-            kafka_producer.poll(0)
+            #kafka_producer.poll(0)

After a while I saw the following logs:

root@deployment-eventlog02:/var/log/upstart# grep -i "Local produce queue full" eventlogging_processor-client-side-0*
eventlogging_processor-client-side-00.log.1:2018-02-11 18:09:56,319 [30582] (MainThread) Local produce queue full, waiting for events delivered.
eventlogging_processor-client-side-00.log.1:2018-02-11 18:10:10,496 [30582] (MainThread) Local produce queue full, waiting for events delivered.

eventlogging_processor-client-side-00.log.1:2018-02-11 23:52:12,281 [30582] (MainThread) Local produce queue full, waiting for events delivered.
eventlogging_processor-client-side-00.log.1:2018-02-11 23:52:37,604 [30582] (MainThread) Local produce queue full, waiting for events delivered.

eventlogging_processor-client-side-00.log.1:2018-02-12 06:07:59,014 [30582] (MainThread) Local produce queue full, waiting for events delivered.
eventlogging_processor-client-side-00.log.1:2018-02-12 06:08:58,219 [30582] (MainThread) Local produce queue full, waiting for events delivered.

eventlogging_processor-client-side-01.log.1:2018-02-11 18:10:58,275 [30586] (MainThread) Local produce queue full, waiting for events delivered.
eventlogging_processor-client-side-01.log.1:2018-02-11 18:11:15,248 [30586] (MainThread) Local produce queue full, waiting for events delivered.

eventlogging_processor-client-side-01.log.1:2018-02-11 23:52:04,838 [30586] (MainThread) Local produce queue full, waiting for events delivered.
eventlogging_processor-client-side-01.log.1:2018-02-11 23:52:38,059 [30586] (MainThread) Local produce queue full, waiting for events delivered.

eventlogging_processor-client-side-01.log.1:2018-02-12 06:09:46,276 [30586] (MainThread) Local produce queue full, waiting for events delivered.
eventlogging_processor-client-side-01.log.1:2018-02-12 06:10:52,190 [30586] (MainThread) Local produce queue full, waiting for events delivered.

So most of the times two retries with poll(0.5) are sufficient to clear the queue. My theory is that the poll() function needs to be called regardless of the presence of a callback function, following what's suggested in https://github.com/confluentinc/confluent-kafka-go/issues/109#issuecomment-344072558.

The commented line, kafka_producer.poll(0), performs a non blocking poll call after every message delivered. Magnus suggests that it is lightweight enough to be called frequently without hitting performances as far as I can read.

Change 405687 merged by Elukey:
[eventlogging@master] kafka_confluent_writer: prevent and handle BufferError exception

https://gerrit.wikimedia.org/r/405687

elukey set the point value for this task to 13.Feb 20 2018, 10:53 AM

After the deployment I can't see any more duplicate errors in the m4-consumer log and no trace of processors dying and respawining periodically in syslog. All the metrics looks good!

Sorry it took me soooo looong to look (and understand) this code