Page MenuHomePhabricator

[Search Update Pipeline] avoid duplicate updates (multi DC)
Closed, ResolvedPublic3 Estimated Story Points

Description

This is about dealing with the fact that multiple instances of the Search Update Pipeline run in different DCs. As of now, each DC has its own kafka-main cluster, and elasticsearch cluster. However, kafka messages are replicated between DCs by MirrorMaker. This leads to duplicate events:

  1. Topic codfw.page_chage contains a message (codfw_page_change_0) which is processed by the codfw aggregator and results in another message (codfw_cirrus_update_0) on topic codfw.cirrus_update
    1. codfw_page_change_0 is replicated to eqiad ad codfw_page_change_0' and processed by the eqiad aggregator that produces eqiad_cirrus_update_0 on topic eqiad.cirrus_update
    2. codfw_cirrus_update_0 is replicated to eqiad as codfw_cirrus_update_0'
    3. eqiad_cirrus_update_0 is replicated to codfw as equiad_cirrus_update_0'
  2. As a result each indexer sees two messages for the same event (page 0 changed)

As discussed in T341625, we use the following approach (decision record):

  • Primary events (anything consumed by the first step, alias aggregator) are only consumed from local (non-replicated) topics.
  • Update events (those produced by the aggregator) are consumed from both, local and replicated topics.

See also: context diagram

Suggested implementation

In org.wikimedia.discovery.cirrus.updater.consumer.graph.ConsumerGraphFactory#createDataStreamSource:

final List<String> topics = eventDataStreamFactory.getEventStreamFactory()
            .createEventStream(config.updateStream()).topics().stream().filter(predicate).collect(
                Collectors.toList());
        
        KafkaSourceBuilder<Row> sourceBuilder =
                eventDataStreamFactory.kafkaSourceBuilder(
                        config.updateStream(),
                        config.kafkaSourceBootstrapServers(),
                        config.kafkaSourceGroupId(), topics);

AC

  • provide a configurable regex-based filter for kafka topics
  • documentation of the above on the project page

Details

TitleReferenceAuthorSource BranchDest Branch
Provide filters for input topicsrepos/search-platform/cirrus-streaming-updater!22ebernhardsonwork/ebernhardson/stream-topic-filtermain
Customize query in GitLab

Event Timeline

Gehel triaged this task as High priority.Aug 21 2023, 3:10 PM
pfischer updated the task description. (Show Details)
Gehel set the point value for this task to 3.

Not sure this ticket discription matches our current understanding. My understanding after the last meeting is that:

  • Topic codfw.page_chage contains a message (codfw_page_change_0) which is processed by the codfw aggregator and results in another message (codfw_cirrus_update_0) on topic codfw.cirrus_update
  • Topic codfw.page_change is only read by the codfw aggregator. The aggregators only subscribe to events generated in their own DC
  • Topic codfw.cirrus_update is replicated to eqiad and codfw_page_change_0 is processed by the search-eqiad and cloudelastic-eqiad indexer

Reviewing our current implementation, what needs to change is to add a generic ability to filter the topics that are subscribed to for each stream. Talked about it in our wed meeting, the general implementation plan is to have arguments of, for example, --page-change-stream and --page-change-stream-filter (or maybe --page-change-stream.filter?).

@EBernhardson, I updated description and would opt for --page-change-stream-topic-pattern.