Page MenuHomePhabricator

Create a generic stream to populate CirrusSearch weighted_tags
Closed, ResolvedPublic

Description

It would be useful to have a stream that supports the addition and deletion of CirrusSearch weighted_tags.
The stream would allow users willing to tag/un-tag pages in the search index to simply emit events to this stream.

There might be 2 different use-cases to support:

  • realtime processes bound to the lifecycle of the page
  • batch processes possibly sending a large number of modification

We might consider exposing 2 different streams giving us the opportunity to route or throttle the events accordindly:

  • events bound to the lifecycle of the page might enter the merge window of the SUP producer so that they get a chance to be joined with other events related to the same edit
  • events produced in batch might skip that window and possibly be throttled (if deemed necessary) to limit the impact on latencies of the realtime events.

For now, we start with a single steam.

AC:

  • define a schema for this stream
  • define a stream config
  • create kafka topics (1 partition, 7 days retention):
    • eqiad.mediawiki.cirrussearch.page_weighted_tags_change.rc0
    • codfw.mediawiki.cirrussearch.page_weighted_tags_change.rc0
  • adapt the SUP producer to read these streams
    • possibly consider using watermark alignment and see if this helps the case where the batch stream might produce a lot of events at once
  • adapt the https://wikitech.wikimedia.org/wiki/Search/WeightedTags documentation on wikitech
  • adapt existing users of weitghed_tags to use this stream:
    • Growth using \CirrusSearch\Updater::[update|reset]WeightedTags, see T372904
    • Image recommendation using hive partition, see T372912

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript

From a SUP perspective this would replace all sources of weighted tags (config option: stream name):

  • article-topic-stream: mediawiki.page_outlink_topic_prediction_change.v1
  • draft-topic-stream: mediawiki.revision_score_drafttopic
  • recommendation-create-stream: mediawiki.revision-recommendation-create

Regarding rate limiting: Is there a way to rate-limit a kafka source itself? The only option I see would be to wrap a KafkaSource (flink-connector-kafka) in a custom RateLimitedKafkaSource (to be created) that forwards all calls, only the createReader method's response would be wrapped in a RateLimitedSourceReader (flink-core). But I don't know how that effects back pressure. At least, it would be confusing if the back pressure metric goes up due to the rate limiting.

public class RateLimitedKafkaSource<O> implements Source<O, KafkaPartitionSplit, KafkaSourceEnumState> {
    private final KafkaSource<O> delegate;
    private final RateLimiterStrategy rateLimiterStrategy;

    RateLimitedKafkaSource(KafkaSource<O> delegate, RateLimiterStrategy rateLimiterStrategy) {
        this.delegate = delegate;
        this.rateLimiterStrategy = rateLimiterStrategy;
    }

    @Override
    @Internal
    public SourceReader<O, KafkaPartitionSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        return new RateLimitedSourceReader<>(
                delegate.createReader(readerContext),
                rateLimiterStrategy.createRateLimiter(readerContext.currentParallelism()));
    }

    /* … remaining delegate methods … */
}
env.fromSource(new RateLimitedKafkaSource<>(source, RateLimiterStrategy.perSecond(10)), /* … */)

From a SUP perspective this would replace all sources of weighted tags (config option: stream name):

  • article-topic-stream: mediawiki.page_outlink_topic_prediction_change.v1
  • draft-topic-stream: mediawiki.revision_score_drafttopic
  • recommendation-create-stream: mediawiki.revision-recommendation-create

Ideally yes but it is unclear if these producers will be willing to support our new stream, so we might need to keep those.

Regarding rate limiting: Is there a way to rate-limit a kafka source itself? The only option I see would be to wrap a KafkaSource (flink-connector-kafka) in a custom RateLimitedKafkaSource (to be created) that forwards all calls, only the createReader method's response would be wrapped in a RateLimitedSourceReader (flink-core). But I don't know how that effects back pressure. At least, it would be confusing if the back pressure metric goes up due to the rate limiting.

public class RateLimitedKafkaSource<O> implements Source<O, KafkaPartitionSplit, KafkaSourceEnumState> {
    private final KafkaSource<O> delegate;
    private final RateLimiterStrategy rateLimiterStrategy;

    RateLimitedKafkaSource(KafkaSource<O> delegate, RateLimiterStrategy rateLimiterStrategy) {
        this.delegate = delegate;
        this.rateLimiterStrategy = rateLimiterStrategy;
    }

    @Override
    @Internal
    public SourceReader<O, KafkaPartitionSplit> createReader(SourceReaderContext readerContext)
            throws Exception {
        return new RateLimitedSourceReader<>(
                delegate.createReader(readerContext),
                rateLimiterStrategy.createRateLimiter(readerContext.currentParallelism()));
    }

    /* … remaining delegate methods … */
}
env.fromSource(new RateLimitedKafkaSource<>(source, RateLimiterStrategy.perSecond(10)), /* … */)

I'm not clear how rate limiting could be applied but I believe that you're right that it must happen at the source level otherwise it'll most certainly affect back-pressure.
It is not clear as well if rate limiting will be required, we might have to test how the pipeline behaves in such conditions and see if our SLO related to the update lag are affected during bulk load. Flink used to have ratelimiting built in their old kafka connector but it was dropped in the new kafka source API, this seems to suggest that they no longer consider ratelimiting as a valid use-case for flink.

Gehel triaged this task as High priority.Jun 10 2024, 3:39 PM
Gehel moved this task from needs triage to ML & Data Pipeline on the Discovery-Search board.
pfischer changed the task status from Open to In Progress.Jul 18 2024, 12:19 PM
pfischer claimed this task.

Change #1055226 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[schemas/event/primary@master] Introducing cirrussearch/weighted_tags

https://gerrit.wikimedia.org/r/1055226

Change #1055226 merged by Ottomata:

[schemas/event/primary@master] Introducing cirrussearch/weighted_tags

https://gerrit.wikimedia.org/r/1055226

Change #1056169 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[schemas/event/primary@master] Require wiki_id for development/cirrussearch/page_weighted_tags_change

https://gerrit.wikimedia.org/r/1056169

Change #1056169 merged by Ottomata:

[schemas/event/primary@master] Require wiki_id for development/cirrussearch/page_weighted_tags_change

https://gerrit.wikimedia.org/r/1056169

Change #1056944 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[operations/mediawiki-config@master] EventStreamConfig for mediawiki.cirrussearch.page_weighted_tags_change.rc0

https://gerrit.wikimedia.org/r/1056944

Change #1056944 merged by jenkins-bot:

[operations/mediawiki-config@master] EventStreamConfig for mediawiki.cirrussearch.page_weighted_tags_change.rc0

https://gerrit.wikimedia.org/r/1056944

Mentioned in SAL (#wikimedia-operations) [2024-08-12T20:10:58Z] <zabe@deploy1003> Started scap sync-world: Backport for [[gerrit:1056944|EventStreamConfig for mediawiki.cirrussearch.page_weighted_tags_change.rc0 (T366253)]]

Mentioned in SAL (#wikimedia-operations) [2024-08-12T20:12:58Z] <zabe@deploy1003> pfischer, zabe: Backport for [[gerrit:1056944|EventStreamConfig for mediawiki.cirrussearch.page_weighted_tags_change.rc0 (T366253)]] synced to the testservers (https://wikitech.wikimedia.org/wiki/Mwdebug)

Mentioned in SAL (#wikimedia-operations) [2024-08-12T20:18:53Z] <zabe@deploy1003> Finished scap: Backport for [[gerrit:1056944|EventStreamConfig for mediawiki.cirrussearch.page_weighted_tags_change.rc0 (T366253)]] (duration: 07m 55s)

Kafka topics have been created.

Change #1062726 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[operations/deployment-charts@master] Search update pipeline: consume consolidated page-weighted-tags-change-stream

https://gerrit.wikimedia.org/r/1062726

Change #1062726 merged by jenkins-bot:

[operations/deployment-charts@master] Search update pipeline: consume consolidated page-weighted-tags-change-stream

https://gerrit.wikimedia.org/r/1062726