Page MenuHomePhabricator

Reliable publish / subscribe event bus
Closed, ResolvedPublic

Description

We need a reliable way to distribute a variety of update events emitted from MediaWiki core (and other services) to various consumers. Currently we use the job queue for this (ex: Parsoid extension), but this is fairly complex, not very reliable and does not support multiple consumers without setting up separate job types.

We are looking for a solution that decouples producers from consumers, and gives us better reliability than the current job queue.

Benefits

  • Simplification: Avoid the need to write and maintain a separate MediaWiki extension for each event consumer. Reduce maintenance by focusing on one standard queuing solution.
  • Single points of failure: A failure of the job queue Redis instance or EventLogging database will cause instant failures of update jobs and the loss of events / jobs. A robust event queue eliminates these single points of failure.
  • Robust updates at scale: Updates like cache purges are currently propagated on a best-effort basis. If a node is down when the event is sent, there is no way to catch up. With more services and more aggressive caching we'll need more reliability at scale. Currently the only way to achieve this would be creating one job per consumer. However, this does not scale to many consumers.
  • Performance and scalability: Job queue overload has in the past slowed down edit latency significantly. Both the job queue and EventLogging are hitting scalability limits.
  • SOA integration: The job queue is a MediaWiki-specific solution that cannot be used by other services. The event queue should provide a clearly defined service interface, so that both MediaWiki and other services can produce and consume events using it.

Event type candidates

Moved to T116247: Define edit related events for change propagation.

Requirements for an implementation

  • persistent: state does not disappear on power failure & can support large delays (order of days) for individual consumers
  • no single point of failure
  • supports pub/sub consumers with varying speed
  • ideally, lets various producers enqueue new events (not just MW core)
    • example use case: restbase scheduling dependent updates for content variants after HTML was updated
  • can run publicly: consumer may be anyone on the public Internet (think random Mediawiki installation with instant Commons or instant Wikidata) instead of only selected ones with special permissions

Option 1: Kafka

Kafka is a persistent and replicated queue with support for both pub/sub and job queue use cases. We already use it at high volume for request log queueing, so have operational experience and a working puppetization. This makes it a promising candidate.

Rough tasks for an implementation:

Open questions

Related Objects

StatusSubtypeAssignedTask
Resolvedaaron
Resolvedaaron
ResolvedOttomata
DuplicateNone
DuplicateNone
DuplicateBBlack
Resolved ema
Resolveddaniel
Resolved GWicke
ResolvedOttomata
InvalidOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedRobH
ResolvedOttomata
Resolved Cmjohnson
Resolvedelukey
ResolvedRobH
Resolved mobrovac
ResolvedEevans
Declined csteipp
Resolved csteipp
Resolved GWicke
Resolvedssastry
Resolved Pchelolo
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
ResolvedOttomata
Resolved madhuvishy
ResolvedOttomata
Resolved madhuvishy
ResolvedOttomata
Resolved mobrovac
Resolved mobrovac

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
GWicke raised the priority of this task from to Needs Triage.Dec 18 2014, 9:24 PM
GWicke changed Security from none to None.
GWicke edited subscribers, added: aaron; removed: Aklapper.
GWicke subscribed.
GWicke triaged this task as Medium priority.Dec 18 2014, 9:30 PM
GWicke updated the task description. (Show Details)

The nature of these event type candidates is such that they are changes with a log existing at the provider. The only persistent state that each consuming service needs to know is the revision/time up to which it has applied the changes. The historic (recent) changes are already kept on the providing service, so a solution would not necessarily have to retain those. It is probably with a bit of care possible to apply changes out of order and in a way that it is idempotent so the consuming service could apply restartable and in parallel.

The actual data can be pulled by the consumer from the provider; duplicating it in the queue probably has no benefit. I think the only thing here where pub/sub brings anything worth is the event that new changes are available to decrease latency of updates. Which means only ever one event in queue for all consumers, as new events contain all information of the previous ones.

How is the job queue currently not reliable? ( https://www.mediawiki.org/wiki/Job_queue_redesign and https://www.mediawiki.org/wiki/Manual:Job_queue mention other problems but nothing regarding reliability problems. )

The nature of these event type candidates is such that they are changes with a log existing at the provider.

Wikidata might be the exeception here. Most other events are not available at the provider in a reliable manner.

The actual data can be pulled by the consumer from the provider;

Is there an API for this already?

I think the only thing here where pub/sub brings anything worth is the event that new changes are available to decrease latency of updates. Which means only ever one event in queue for all consumers, as new events contain all information of the previous ones.

A major benefit of an event stream is reliability, performance and simplicity. The same generic client (using websockets for example) can be used to consume a variety of events.

Queues like Kafka are optimized for the use case and perform really well at high request volumes. As an example, we are currently processing about 50k messages/second (request logs) with two Kafka nodes. I know that most other events are lower volume than this, but it's good to have headroom in the system.

How is the job queue currently not reliable?

For one, it uses redis for storage, which only supports async master/slave replication and has limited options for durability. Async replication means that you are likely to lose messages in a fail-over. Limited durability means that you lose messages on power loss. It also does not support multiple consumers for each event (no pub/sub), which results in fairly static coupling of producer and consumer. You need to create a new job per consumer. Finally, the job runners are doing all kinds of processing instead of pure event delivery. As mis-behaving jobs compete with others for resources, this makes them less reliable than a pure event delivery system would be.

The actual data can be pulled by the consumer from the provider;

Is there an API for this already?

https://www.wikidata.org/w/api.php?action=help&modules=wbgetentities and recent changes, see also T85103 and T85100.

https://www.wikidata.org/w/api.php?action=help&modules=wbgetentities and recent changes, see also T85103 and T85100.

@JanZerebecki, none of these seem to provide the change information apart from 'something in this entity has changed'. Is there a way to efficiently get data for the actual changes?

As in a diff? Not that I am aware of. The granularity of actual changes in the DB is one entitiy, smaller granularity might only be present before the change is done then a full new revision of the entity is written. Anything smaller would have to be computed from two revisions of an entity. Though there is no API that combines both which entities changed and their current data. Getting a list of changed entities and then in a second request getting their current data might be efficient enough CPU wise.

While talking to @daniel he noticed that I missed that the actual serialized diff is saved in https://www.mediawiki.org/wiki/Wikibase/Schema/wb_changes there is just no API.

can support large delays (order of days) for individual consumers

Do you have a strong use case to support this need? Kafka may very well be able to support this but I'm wondering it there is a specific and strong reason for such a long duration for an event bus. This seems like a specification for an event based storage system rather than a comm bus.

can support large delays (order of days) for individual consumers

Do you have a strong use case to support this need?

Yes. Hosts can go down for multiple days, and if the event stream is used to do something like reliable purges then it'll be necessary to replay those or throw away the entire cache. Really reliable purges will become more important once we cache for logged-in users as well.

There can also be bugs in consumers, which need to be fixed by re-starting the processing from a clean snapshot.

From what I hear, Analytics would love to get even longer event traces. @Halfak mentioned a back-of-the-envelope calculation that basically all the primary events he lists on his proposal since the beginning of Wikipedia might fit into 200G.

For comparison, I think we currently have several day's worth of buffer for our traffic logs in kafka, which helps to avoid loss if the consumer has issues. That's much higher volume at up to 150k messages/s, while we are looking at low hundreds for edit-related events.

http://www.fedmsg.com might fit this need. It is used/developed by Fedora and Debian people and is a federated, reliable message bus with history of cryptographically authenticated json messages building on 0mq with python.

http://www.fedmsg.com might fit this need. It is used/developed by Fedora and Debian people and is a federated, reliable message bus with history of cryptographically authenticated json messages building on 0mq with python.

Since 0mq is not actually durable or replicated this does not cover the 'reliable' bit.

Re signatures: We can always send signed JWTs on top of whatever solution we end up using *if* there is a need for per-message authentication. I don't see a strong need for this though.

Since 0mq is not actually durable or replicated this does not cover the 'reliable' bit.

That is done on top of 0mq. Every message is stored and numbered, no number is ever skipped. Thus if you get 4 but never received 3 you can request 3 knowing you missed it. So it emulates reliability via storage and sequential numbering. (This is done transparently if the endpoint is configured that way, see http://www.fedmsg.com/en/latest/config/#term-replay-endpoints and http://www.fedmsg.com/en/latest/replay/ .) Also quoting en.wp on 0mq: [...] message transports include TCP [...], which means in addition usually a reliable transports is used.

Re: reliability, RELP might be of help on the application level.

See also: @aaron is working on a cache update service at https://github.com/AaronSchulz/python-m
emcached-relay

This could also be seen as a replacement for some use cases of MW hooks and hook listeners. Right now, if the hook listener throws an exception, that breaks the code that triggered the hook (e.g. T102874: Using Special:EnableFlow on a French Wikiproject page has broken the page completely).

If the hook doesn't have to alter or return any values (e.g. to tell the hook-running code to do something), it might be able to use an event bus instead. If the event bus listener has a bug, my understanding is that wouldn't cause the code that emits the event to break.

BTW, T102082 is mainly about analytics eventlogging, but the confluent stuff would be good for an event bus used for application stuff too.

GWicke claimed this task.

A basic event bus is now available in production, and is being populated with edit events from MediaWiki. Consumption is directly from Kafka at this point.

This means that the core proposal of this task is implemented. I'm closing this task to reflect this.