Page MenuHomePhabricator

Create reliable change stream for specific wiki
Closed, ResolvedPublic

Description

Requirements:

  • Real-time - I can get changes from wiki within small time (<30 seconds) of the time they happened, with that time being defined as the time the changes have been committed to the database and visible to the users on wiki.
  • Reliable - if I consume every individual change message in the stream in the sequence the stream provides, I will know about all the changes in the wiki content.
  • Seekable - I can connect to the stream in a predictable point at wiki history (either by timestamp or by RC ID) and get all the messages. At least 14 days of messages back should be available, but larger availability is not a must.
  • Resumable - I can disconnect from the stream and then reconnect later and resume consumption from the same point I have left it. The service should not require constant connection for getting the updates, and the stream from disconnected and resumed connection should be the same as if connection has never been interrupted (except the obvious difference in message delivery times, etc.)
  • Scalable - there's no hard limit on the number of clients connecting, within the reasonable limits of infrastructure, networking, etc.
  • Stateless - the server does not keep per-client state and the client always has all the information to continue stream consumption from the point it has stopped at (this may be not a very important one if scalability is kept).

Current use case:

Supplying update stream for Wikidata Query Service.

Delta for existing services:

API:Recentchanges

  • Not reliable - messages can be backfilled into the stream with timestamp many seconds in the past, which means sequential reading of the stream by timestamp would miss those (T161342). Even if reading by RCID is implemented, parallel commits could still lead to backfilling and thus unreliable stream.
  • Does not have events for page props updates (T145712) which happen asynchronously from main article update.
  • May miss some deletion events if it is combined with revision hiding.

EventStreams/Kafka, as currently implemented

  • Not seekable by timestamp
  • Does not have data back more than 7 days

Related Objects

StatusSubtypeAssignedTask
ResolvedLucas_Werkmeister_WMDE
ResolvedOttomata
ResolvedOttomata
DeclinedOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
DeclinedNone
Resolveddcausse
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
DuplicateOttomata
ResolvedOttomata
DeclinedOttomata

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript
Smalyshev triaged this task as Medium priority.Mar 29 2017, 6:26 PM
Anomie subscribed.

The pony being requested here is not going to happen in the action API, removing tag. This is probably an effective duplicate of T152731: Implement server side filtering for EventStreams (if we should) because EventStreams the most likely place for something like this to be implemented.

I think EventStreams is closest to the goal too, but I want to have a complete description of the pony for the record so that we know what we need and what is missing. If and once it's implemented (T152731 covers part of it but not all - still need seeking and longer backlog) we could close it, it doesn't have to have any patches dedicated for it, being an epic.

@Smalyshev let's jump in a hangout sometime to discuss this more.

Just a few quick points:

Does not have data back more than 7 days

We could probably bump this up to 14 days for specific topics like recentchange.

Scalable - there's no hard limit on the number of clients connecting

EventStreams does not have a 'hard limit', but it definitely isn't intended to be used for wiki-reader scale. Scalable service updates should be fine though.

Not consumable per-wiki due to the lack of filtering (T152731)

We haven't done this because we are waiting for a strong use case. Implementing it will be a lot of bike shedding, and we've put that off due to a lack of solid use case. If you really need this, say so, and we will start the bike shedding!

Qs:

  • We have a more than recentchange in other schemas, and we can add more. If there is a desire, we can expose these in EventStreams. Do you have desire? :)
  • Does WDQS run in production? I think it does, right? If so, you may want to consider consuming from Kafka rather than EventStreams. Kafka consumers support parallelization and scale much better than the EventStreams HTTP Kafka consumer proxy.

let's jump in a hangout sometime to discuss this more.

Would be glad to. I'll try to set up something next week.

If there is a desire, we can expose these in EventStreams. Do you have desire? :)

Yes, see T145712 - recentchanges ignores pageprops updates, and it would be nice to have those too.

Does WDQS run in production?

Yes.

If so, you may want to consider consuming from Kafka rather than EventStreams.

I am considering this too, but I assume it's more code for me to write (maybe wrongly, I didn't look at it closely).

Kafka consumers support parallelization and scale much better than the EventStreams HTTP Kafka consumer proxy.

I may have written requirements to ask for more than I need, it gives an impression it has to serve Wikipedia-scale consumer load maybe. But what in fact I need is something like 10-20 parallel consumers, that order of magnitude. So I don't really foresee too much problem here as long as it doesn't require doing unindexed DB queries. I expect both Kafka and ES be fine with it.

If so, you may want to consider consuming from Kafka rather than EventStreams.

I am considering this too, but I assume it's more code for me to write (maybe wrongly, I didn't look at it closely).

It will be more, a lot more. What language are you working in?

But what in fact I need is something like 10-20 parallel consumers

Load balanced parallel consumers, or all distinct consumers doing different stuff?

Nice thing about using a Kafka client directly, is you can subscribe to multiple topics/partitions in a single consumer group, and have the load automatically balanced between them (up to the total number of topic-partitions). If any of those processes goes down, a different consumer process will be auto-assigned its work.

It will be more, a lot more. What language are you working in?

The end consumer will be Java, but I don't want to consume the raw Kafka stream from Java, I'd rather have some intermediary that cleans up, deduplicates, etc. the changes.

Load balanced parallel consumers, or all distinct consumers doing different stuff?

All distinct. They will be pretty close to each other, but nothing forces them to consume exactly the same things, they are completely independent.

If any of those processes goes down, a different consumer process will be auto-assigned its work.

That's kind of the opposite of what I need :) Each client is independent and should get all the updates.

I'd rather have some intermediary that cleans up, deduplicates, etc. the changes.

FYI, neither base Kafka Consumer clients nor EventStreams does this.

FYI, neither base Kafka Consumer clients nor EventStreams does this.

Yes, I know :) It's one of the decisions I still haven't figured out - how much I can/should do on the backend so I don't have to do it on the client, vs sending the client the raw firehose output and let it do all the work. Minimal req is reliability (which I don't have now) but I am also interested in better performance such as making clients download and process less irrelevant data.

ping @Smalyshev is this still a need? Maybe we should set up a short 30 minute sync up

@Nuria yes, still very much needed and unsolved. Please feel welcome to set up a meet.

From meeting:

  • @Smalyshev can consume from either kafka or event stream once we add the ability to consume from a given point in time, this is what is mean by "seekable" (on new kafka cluster, next quarter, Q1) .
  • Keeping data for longer than 7 days is not an issue for topics as small as these. The volume of events is less than 100 per sec.
  • Ability to filter by wiki is not a blocker as given volume of events filtering is possible client side.

Action Item: @Nuria
and @Ottomata to ping once new time-based consumption is enabled in kafka

As the result of the discussion, we've arrived to a following conclusion:

  • After we have Kafka version installed that allows to start by timestamp, we can create a prototype that takes recent changes from either Kafka or EventStreams.
  • We need to evaluate if unfiltered stream won't hurt performance, but current assumption is that it won't.
  • We may need to store Kafka offset in the DB for processed messages, will probably need to add code to do this since right now RdfRepository does not get any such data from change source or Change class.

@Ottomata, @Nuria what's the status on seekable Kafka streaming - do we have necessary infrastructure now?

Unfortunately not yet! We are very close...the cluster is up and running, but porting clients has been blocked on getting proper keys and certificates for SSL support for a long time now. SSL is finally moving now, so we should be able to start porting clients over soon. We have a goal of getting some of the misc varnish traffic ported to the new cluster this quarter, and will likely (not totally sure) make a goal of getting all remaining clients ported next quarter.

Sorry this is taking so long! Lots of moving parts...

@Ottomata Could @Smalyshev do a test on consuming from the new cluster though with teh understanding it is not yet productionized to make sure it fits the use cases?

Sure, I suppose! You can connect to it with a Kafka client now. The Kafka brokers are kafka-jumbo100[1-6].eqiad.wmnet:9092

I think you are most interested in the eqiad.mediawiki.revision-create topic. I haven't tried yet at all, but these topics should have a broker received timestamp index on them. I am not yet familiar with the Kafka client APIs that know how to consume from timestamps, but they should be out there.

@Ottomata thanks, I can connect to the hosts above, but still not sure how to control the starting point. I'll try to look around for clients that can do this.

@Ottomata thanks, I can connect to the hosts above, but still not sure how to control the starting point. I'll try to look around for clients that can do this.

Java client has offsetsForTimes implemented and supports seek to an offset. Same is supported by librdkafka and most of the language-specific clients are built atop librdkafka so I would expect a lot of them already support this functionality.

@Pchelolo thanks for the pointer, this is very helpful!

Indeed, kafkacat for example supports it since a year ago. However, looks like we have this version of Kafka:

Copyright (c) 2014-2015, Magnus Edenhill
Version KAFKACAT_VERSION (JSON) (librdkafka 0.9.3)

which doesn't seem to have it yet (checked on stat1005). Maybe we could upgrade to 1.3.1.

Indeed. Created T182163 for updating kafkacat but it is a bit more complex then simply installing new version - it depends on newer librdkafka, so we firs should upgrade and test that.

You can easily 'quickbuild' kafkacat with a statically linked librdkafka. I've just done this on a stretch labs host, and copied the kafkacat binary to stat1005 at /home/otto/kafkacat. Try it out!

/home/otto/kafkacat runs fine but -Q seems to return this for everything:

eqiad.mediawiki.revision-create [0] offset -1

Maybe I'm doing something wrong?

Hm, actually if I just try to consume from that topic (any topic actually) with -F "%T" that should give me message timestamps it gives -1 as well.

Seems like the mirroring is done by 0.9 MirrorMaker and timestamp handling was added only in 0.10 MirrorMaker.

I got same doing:

/home/otto/kafkacat -Q -b kafka-jumbo1003.eqiad.wmnet -t eqiad.mediawiki.revision-create:0:1512687299 -Xdebug=all

Seems like the mirroring is done by 0.9 MirrorMaker and timestamp handling was added only in 0.10 MirrorMaker.

Hm, ya but I had thought that if a timestamp was not set by the producer, it would be set to server receive time. Maybe I was wrong!

Change 396439 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Set default topic timestamp.type to LogAppendTime

https://gerrit.wikimedia.org/r/396439

Change 396439 merged by Ottomata:
[operations/puppet@production] Set default topic timestamp.type to LogAppendTime

https://gerrit.wikimedia.org/r/396439

Woot, that did it ^. We need topics to default to LogAppendTime.

[@stat1005:/home/otto] $ ./kafkacat -Q -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.revision-create:0:1512759190000
eqiad.mediawiki.revision-create [0] offset 3658631

Nice, Can @Smalyshev check whether consuming from these topics as set would work for his purposes?

So, FYI, the timestamps as they are now are the timestamp that the kafka jumbo-eqiad cluster received the messages. These are replicated from the main-eqiad cluster, and might have a short (seconds usually, minutes max) delay.

Eventually (work not planned yet) we will upgrade the other Kafka clusters, and also configure producers to set timestamps based content time, e.g. revision create timestamp, instead.

@Smalyshev Ok, we aim to have the cluster handling all prod traffic by end of next quarter, until then it will be mirroing data which i think should be sufficient for you to get started in the wdqs consumer? Correct me if I am wrong.

@Nuria yes mostly, though I do have some questions, maybe we should set up a short meeting to discuss them?

Ping @Smalyshev now that you have a reliable stream on the new kafka cluster (that supports time-based consumption) is there any other blockers on your end ?

@Nuria I don't see any immediate blockers so far.

OO yes @Smalyshev and in case you didn't see, we also increased retention of mediawiki topics to 31 days in the main kafka clusters.