This is about dealing with the fact that multiple instances of the Search Update Pipeline run in different DCs. As of now, each DC has its own kafka-main cluster, and elasticsearch cluster. However, kafka messages are replicated between DCs by MirrorMaker. This leads to duplicate events:
- Topic codfw.page_chage contains a message (codfw_page_change_0) which is processed by the codfw aggregator and results in another message (codfw_cirrus_update_0) on topic codfw.cirrus_update
- codfw_page_change_0 is replicated to eqiad ad codfw_page_change_0' and processed by the eqiad aggregator that produces eqiad_cirrus_update_0 on topic eqiad.cirrus_update
- codfw_cirrus_update_0 is replicated to eqiad as codfw_cirrus_update_0'
- eqiad_cirrus_update_0 is replicated to codfw as equiad_cirrus_update_0'
- As a result each indexer sees two messages for the same event (page 0 changed)
As discussed in T341625, we use the following approach (decision record):
- Primary events (anything consumed by the first step, alias aggregator) are only consumed from local (non-replicated) topics.
- Update events (those produced by the aggregator) are consumed from both, local and replicated topics.
See also: context diagram
Suggested implementation
In org.wikimedia.discovery.cirrus.updater.consumer.graph.ConsumerGraphFactory#createDataStreamSource:
final List<String> topics = eventDataStreamFactory.getEventStreamFactory() .createEventStream(config.updateStream()).topics().stream().filter(predicate).collect( Collectors.toList()); KafkaSourceBuilder<Row> sourceBuilder = eventDataStreamFactory.kafkaSourceBuilder( config.updateStream(), config.kafkaSourceBootstrapServers(), config.kafkaSourceGroupId(), topics);
AC
- provide a configurable regex-based filter for kafka topics
- documentation of the above on the project page