Page MenuHomePhabricator

Asynchronous processing in production: one queue to rule them all
Closed, ResolvedPublic

Description

Type of activity: Pre-scheduled session
Main topic: https://www.mediawiki.org/wiki/Wikimedia_Developer_Summit/2017/How_to_manage_our_technical_debt

The problem

Wikimedia Foundation has at least three systems processing in-wiki events:

  1. The multicast HTCP purge messages to keep caches in sync. This has several problems, including race conditions in PURGE propagation
  2. The current jobqueue implementation. It is based on a technology that cannot scale at our size (redis used as a queue with transactions) and can't work easily cross-datacenter.
  3. Change-propagation: a service that consumes events from Kafka and is able to make HTTP calls to other services.

to which we could probably add a bunch of cron scripts like the wikidata ones.

It would make sense in general to unify everything into one system, unless there is a strong reason to keep things like they are. In particular the jobqueue:

  1. Has constant scalability issues (given the way we use redis)
  2. Uses a ton of resources (we have 14 servers dedicated to redis alone!)
  3. Is very, very hard to debug whenever something goes wrong
  4. Is limited to work on mediawiki, can't propagate events to the other services

So we need a plan to transition from the old jobqueue to change-propagation, which uses a more solid transport mechanism.

Expected outcome

Consensus among participants about the need (or not!) to transition to change propagation. A general plan for the migration and owners of the various parts of it.

Current status of the discussion

There is not a lot of discussion going on about this, although the inner workings of the current jobqueue are known just to a small number of people (maybe one?). When the change-propagation service was introduced, it was discussed replacing progressively functionalities of the jobqueue, but no timeline nor a plan has ever being laid out. Also, this will need multiple teams to buy into the project.

Links

Note

In various comments to this task, we refer to "small wikis" as a shorthand for "small-scale MediaWiki installations".

Event Timeline

Joe created this task.Oct 28 2016, 8:57 AM
Restricted Application added a subscriber: Aklapper. · View Herald TranscriptOct 28 2016, 8:57 AM
hoo added a subscriber: hoo.Oct 28 2016, 11:47 AM
hashar added a subscriber: hashar.Oct 28 2016, 9:42 PM
bd808 added a subscriber: bd808.Oct 28 2016, 10:17 PM

$DAYJOB-1 operated at a much, much smaller scale than Wikimedia, but we got huge benefits from merging all of the little event queues that had grown up over the years into a single unified system.

The job runner use case is a bit different than the replacement of PURGE broadcasts. For the job queue you want to be able to ensure that a message is processed by one and only one of N consumers. Previously I used ActiveMQ for this, but from my past experience I'm pretty sure that tuning it for our scale would be a very time intensive task. I think this functionality can be approximated in Kafka by using a number of partitions greater than or equal to the number of job processors and limiting each processor to subset of the partitions. It seems like there would be a need for another coordinated data storage (etcd?) that the consumers used to discover un-serviced partitions and probably some watchdog system to make sure that consumers that were supposed to be working a particular partition were active. That sounds like a lot of pieces to build, but there may be something already in the FLOSS marketplace that does most of it. If there isn't a ready made solution on top of kafka, there certainly are quite a few "task queue" systems like beanstalk, celery, and gearman that might be a better fit.

I think this functionality can be approximated in Kafka by using a number of partitions greater than or equal to the number of job processors and limiting each processor to subset of the partitions. It seems like there would be a need for another coordinated data storage (etcd?) that the consumers used to discover un-serviced partitions and probably some watchdog system to make sure that consumers that were supposed to be working a particular partition were active. That sounds like a lot of pieces to build, but there may be something already in the FLOSS marketplace that does most of it. If there isn't a ready made solution on top of kafka, there certainly are quite a few "task queue" systems like beanstalk, celery, and gearman that might be a better fit.

Here's what we do right now in ChangeProp to ensure this: each unit of functionality is called a rule. And example of a rule would be 'Rerender RESTBase when the page was edited'. Each rule gets it's own consumer group name and kafka ensures that one and only one client in the given consumer group receives the message.

Load balancing between workers is done by assigning each of the consumers within the group a random consumer ID, and then Kafka assigns the job to the worker with the lowest consumer ID - since those are random we get a good distribution of work between the workers.

Double-processing is still possible, because during startup several rebalances (reassigning the rule from one worker to another) are happening, and during that process several workers might end up with the same unit of work, but that's a very brief period of time. Also, Kafka itself claims to have at least ones delivery semantics, but in practise, during normal operation, duplication doesn't happen.

@bd808 What are the use-cases you have in mind that are not idempotent and where exactly ones semantics is absolutely required for correctness?

@bd808 What are the use-cases you have in mind that are not idempotent and where exactly ones semantics is absolutely required for correctness?

Ideally every MediaWiki job class should be written in such a way that repeated execution is not harmful. I have no idea if that is actually the case however. I'm actually also not sure if the existing redis based queuing system is truly deliver once. I don't have a doomsday scenario in mind, but for example I know that there are jobs which in turn spawn other jobs. One reason this is done is to throttle job creation when a very large number of jobs need to be done. If several of these spawning jobs were duplicated it might cause issues.

I know that there are jobs which in turn spawn other jobs. One reason this is done is to throttle job creation when a very large number of jobs need to be done. If several of these spawning jobs were duplicated it might cause issues.

Ye, we have same stuff in ChangeProp too and we've got some deduplication logic for rules like this, so in practice even if it gets duplicated, it's noticed and deduplicated soon enough to not cause any harm.

Joe added a comment.Oct 29 2016, 10:08 AM

@bd808 What are the use-cases you have in mind that are not idempotent and where exactly ones semantics is absolutely required for correctness?

Ideally every MediaWiki job class should be written in such a way that repeated execution is not harmful. I have no idea if that is actually the case however. I'm actually also not sure if the existing redis based queuing system is truly deliver once. I don't have a doomsday scenario in mind, but for example I know that there are jobs which in turn spawn other jobs. One reason this is done is to throttle job creation when a very large number of jobs need to be done. If several of these spawning jobs were duplicated it might cause issues.

I am pretty sure the actual system does not ensure single delivery with the certainty you talk about. Also, a lot, and I mean a lot of jobs get repeated, fail halfway thorough, events get duplicated at the source too.

I would wait for a word from @aaron but I'm pretty sure from what I see happening live that "ensure execution only once" is not a requirement of the system.

Tgr added a subscriber: Tgr.Oct 31 2016, 3:52 AM

The job queue can discard duplicates. Not sure if there is any job that relies on that for correctness (as opposed to just avoiding wasted CPU). There is also the root job thing. It does not seem terribly hard to add that kind of functionality to any change propagation system, though.

There is also the usual question of what will happen to pure-PHP installations.

Joe added a comment.Oct 31 2016, 10:59 AM

@Tgr I hope we can implement this switch in a way that would just make it a new jobqueue driver

There is also the usual question of what will happen to pure-PHP installations.

That's surely one of the questions we should answer; do you think we will need to rehaul the job mechanism in MediaWiki completely?

greg updated the task description. (Show Details)Oct 31 2016, 5:41 PM
Anomie added a subscriber: Anomie.Nov 1 2016, 1:12 PM
Anomie added a comment.Nov 1 2016, 1:31 PM

My philosophical standpoint: It's good when MediaWiki has a simple default way to do things in a basic installation and allows for configuring it to use something much more complex and scalable. It's bad when the "something much more complex" is the only available option, where "bad" ranges from "not very" for something completely optional few outside of Wikipedia would care to have to "extremely" for something essential to having a workable wiki.

Even if we overhaul the job mechanism completely, as long as there's a default implementation for "small" wikis that doesn't require more than a webserver, PHP, and a supported SQL database it would be fine from this standpoint.

Joe added a comment.EditedNov 2 2016, 9:51 AM

My philosophical standpoint: It's good when MediaWiki has a simple default way to do things in a basic installation and allows for configuring it to use something much more complex and scalable. It's bad when the "something much more complex" is the only available option, where "bad" ranges from "not very" for something completely optional few outside of Wikipedia would care to have to "extremely" for something essential to having a workable wiki.
Even if we overhaul the job mechanism completely, as long as there's a default implementation for "small" wikis that doesn't require more than a webserver, PHP, and a supported SQL database it would be fine from this standpoint.

I agree fully; in fact what I hope would be possible is just to preserve the current implementation for small wikis, but this is one of the things that need discussion.

In fact we have two distinct things to take care of:

  • Jobs that are originated by edits/other events that already flow through Kafka could be transitioned to be managed by change-propagation (calling runJobs.php, maybe slightly modified), and we could just add a feature flag to disable those in WMF-config
  • Jobs that are originated by events that do not flow through Kafka/eventubs should be probably added to the events we send through eventbus. Are there any of these?

What looks challenging in this hypothesis is that I see the risk for the need of a double implementation (one for the traditional jobqueue, one for CP-generated events) given how the jobqueue works today (IIRC, the messages are serialized PHP, and they get fetched by runJobs.php).

A simpler approach would be to send the jobs to Kafka/eventbus, and have CP doing basically what the jobrunner does today.

I agree as well that we shouldn't complicate things for small wikis. But, correct me if I'm wrong, but small wikis run the jobs as part of normal requests, so as not to need Redis and the other pieces of infrastructure that are present at WMF. If that is the case, then there shouldn't be serious consequences for them in this scenario. That said, I think we also need to find a lightweight replacement for Kafka that would allow small wikis to run a queue if they want to (i.e. running Kafka on a VPS is certainly an overkill).

The current jobqueue provides features that may not be available out of the box with Kafka:

  • we can send a Job with Job::removeDuplicates = true, it seems mainly designed as a performance hint so that heavy jobs are deduplicated. I bet it's possible to just ignore this hint when running with kafka?
  • delayed jobs: we can send jobs that will be processed later: for cirrus it's used to :
    • retry failed writes with a random backoff delay
    • spread over time some sanity checks
    • delay some chained jobs: when updating an article we send another job with a delay (incoming links counting) so that we increase our chances to have the new data ready on the backend.

For cirrus we'll have to double check but I think we can live without deduplication but delayed delivery sounds harder to avoid. Do you think it'll be possible to implement such feature in our context with kafka?

The eventbus system already has retries built in: if a job fails, it is retried a predefined number of times with geometric progression (so as not to put pressure on a part of the infrastructure that might be having problems at that moment). The same logic can be applied to normal jobs and thus achieve the desired delay effect.

  • spread over time some sanity checks

Could you elaborate on this use case?

mobrovac updated the task description. (Show Details)Nov 2 2016, 10:57 AM
mobrovac added a project: User-mobrovac.
  • spread over time some sanity checks

Could you elaborate on this use case?

In cirrus we have a process that checks if the index is properly synced with the db, this process is a brute force check that will scan every possible article ids. It generally takes 2 weeks to complete a full check. The process is :

  • select min/max id article from mysql: e.g. 10 to 45000
  • a cron job creates N jobs that will check for chunks: job-1 checks for ids [10,20], job-2 will check for ids [20,30]
  • when all ids are checked it restarts another loop (with a min rate set at 2 weeks)

These jobs are initially created with a cron job that runs every 2 hours, the delay is interesting here to make sure that we spread the checks in these 2 hours.
The jobqueue is used here as a convenient backend to mimic a job scheduler.
The various options are dependent on wiki size and we select a profile according to the number of ids to check.
e.g. for enwiki we create every 2 hours a max of 2500 checker jobs where each of them will check 100 ids.
The process has various limit to prevent flooding normal updates:

  • do not spawn new checker jobs if more than X are pending (10 for small wikis, 2500 for very large).
  • do not process checker jobs if other cirrus jobqueues have more than Y pending udpates (50 for small wikis, 1000 for very large wikis).
  • a check cannot run for more than 60 secs.
  • wait for 2 weeks before restarting a loop.

I'd say that the features needed to run this process are:

  • be able to count the number of pending jobs on some queues
  • be able to set a deliveryTime

It's not a critical process and we can review/change it if needed.

Anomie added a comment.Nov 2 2016, 2:08 PM

But, correct me if I'm wrong, but small wikis run the jobs as part of normal requests, so as not to need Redis and the other pieces of infrastructure that are present at WMF.

The default configuration does run the jobs as part of normal requests so that there's no need to set up a cronjob to run maintenance/runJobs.php and uses the database-backed implementation so there's no need to set up the redis-backed implementation. But you're slightly wrong in thinking that either of those requires the other.

There are several ways MediaWiki provides for running jobs:

  • Synchronously at the end of a request (if $wgJobRunRate > 0 and $wgRunJobsAsync is false).
  • Asynchronously at the end of a request by making an internal request to Special:RunJobs (if $wgJobRunRate > 0 and $wgRunJobsAsync is true).
  • By something running maintenance/runJobs.php, often cron.

None of these care how the job queue is storing the job information. And none of the storage backends cares which method is used to actually do the running.

In WMF's setup, we use our own custom code instead of using any of the above methods. One part does directly interact with Redis, to determine the parameters to pass to what is basically a streamlined version of maintenance/runJobs.php that actually runs the jobs (and again doesn't care about the queue storage method).

hashar removed a subscriber: hashar.Nov 3 2016, 8:12 AM

There are several ways MediaWiki provides for running jobs:

  • Synchronously at the end of a request (if $wgJobRunRate > 0 and $wgRunJobsAsync is false).
  • Asynchronously at the end of a request by making an internal request to Special:RunJobs (if $wgJobRunRate > 0 and $wgRunJobsAsync is true).
  • By something running maintenance/runJobs.php, often cron.

I guess my real question here is what is the real meaning of small wiki. In my mind, these are the wikis that run jobs in one of the first two configurations described above. If somebody has cron or something similar set up for running the jobs, then we ought to think of them as intermediate or advanced users.

None of these care how the job queue is storing the job information. And none of the storage backends cares which method is used to actually do the running.
In WMF's setup, we use our own custom code instead of using any of the above methods. One part does directly interact with Redis, to determine the parameters to pass to what is basically a streamlined version of maintenance/runJobs.php that actually runs the jobs (and again doesn't care about the queue storage method).

This is exactly what is being proposed here, as far as I can see - find a way to improve and consolidate the WMF infrastructure.

Gilles added a subscriber: Gilles.Nov 3 2016, 11:29 AM
ori moved this task from Inbox to Radar on the Performance-Team board.Nov 3 2016, 9:58 PM
Tgr added a comment.Nov 4 2016, 10:41 PM

So I guess there are three options:

  • Try to integrate Kafka as a jobqueue backend (ie. write a JobQueue subclass that "stores" jobs by emitting dedicated Kafka events and fetches triggered jobs from the Kafka queue, and a JobQueueAggregator subclass that gives state information about Kafka queues; handle deduplication and delay at job fetching time somehow).
  • Deprecate the jobqueue/job runner system. Use EventBus for triggering jobs; that might change the semantics a bit, e.g. if something heavily relies on deduplication, that has to be performed within the job. Write a simple pure-PHP EventBus replacement for small wikis.
  • Try to find something that's scalable and a better match for jobqueue semantics (Gearman etc) and use that as a jobqueue backend.
Joe added a comment.Nov 9 2016, 11:12 AM

@Tgr to be very clear, I'm ok with both the first two options you cited; what I want, from an ops prepsective, is what follows:

  1. Have one, consistent way of propagating events across our cluster. This might not coincide with ALL the jobs we currently spawn, btw
  2. Get rid of our current, redis-based queue that is underperforming, obscure and difficult to properly inspect.

I think the response could be a mix of the two paradigms. See T150283 for the hurdles a practical transition will have to go through.

Volans added a subscriber: Volans.

It would be useful to clarify whether this task is only about WMF or whether someone is proposing to change MediaWiki core in some way. Some of the above comments seem to use "small wikis" for "non-WMF wikis".

It would be useful to clarify whether this task is only about WMF or whether someone is proposing to change MediaWiki core in some way.

This task targets mostly WMF wikis due to scale. However, it should be applicable to other big wikis outside of WMF production as well.

Some of the above comments seem to use "small wikis" for "non-WMF wikis".

Correct, we do use them as synonyms in this context. Note that by small wiki we refer to low-traffic MW installs.

I guess my real question here is what is the real meaning of small wiki. In my mind, these are the wikis that run jobs in one of the first two configurations described above. If somebody has cron or something similar set up for running the jobs, then we ought to think of them as intermediate or advanced users.

Size and sysadmin skill level seem like two totally separate things.

Joe added a comment.Nov 18 2016, 9:26 AM

It would be useful to clarify whether this task is only about WMF or whether someone is proposing to change MediaWiki core in some way. Some of the above comments seem to use "small wikis" for "non-WMF wikis".

I am absolutely neutral on what we do decide to do, I see a few paths we could go down, and which would mean different levels of changes to MediaWiki core. I am perfectly ok with just solving part of the WMF-specific problem (we have two systems replicating functionality, and one queue system that uses the wrong technology), and decide we will just write an appropriate driver for the jobqueue to use something more solid than redis.

Then there is a value for the WMF to consolidate whichever jobs are event-driven into a single system for all its services, and that could just mean discarding specific job types in our case.

Personally when I use the term "small wikis" in this context I mean small wiki installations, which doesn't mean "every non-WMF wiki". I thought the context made that pretty clear, but apologies if it caused some confusion.

@bd808 What are the use-cases you have in mind that are not idempotent and where exactly ones semantics is absolutely required for correctness?

MassMessage relies on this when using the job queue so that messages are only delivered once. Same with TranslationNotifications.

Some of the above comments seem to use "small wikis" for "non-WMF wikis".

Correct, we do use them as synonyms in this context.

Ok. Please stop. Wikimedia Foundation hosts hundreds of wikis which are usually considered "small wikis" and it's extremely confusing to read messages which seem to propose that Wikimedia Foundation wikis use different job queue systems depending on their size.

Nemo_bis updated the task description. (Show Details)Nov 18 2016, 9:41 AM

It would be useful to clarify whether this task is only about WMF or whether someone is proposing to change MediaWiki core in some way.

This task is about discussing what exactly should be done. It might be a WMF-specific JobQueue implementation, or it might replace JobQueue with something different.

Some of the above comments seem to use "small wikis" for "non-WMF wikis".

"Small", as I (and I think most others) have been using it here, means a wiki where someone gets an account on a shared host or VPS with a generic LAMP stack, installs MediaWiki and some extensions, maybe sets up memcached or APC, and has a wiki that works well enough for their purposes. Or something of similar complexity, even if the details differ. In this context, no WMF wikis are "small" because they're all run on the same infrastructure that handles huge, extremely-high-traffic wikis like the English Wikipedia. But something like Wikia or Gamepedia is also not "small".

T149408#2762957 seems to be the only comment that uses "small" to mean the less active WMF-hosted wikis, and that was in the context of a detailed description of how CirrusSearch uses the job queue.

demon awarded a token.Nov 18 2016, 4:55 PM

Using custom undefined jargon makes sure that no users (or non-WMF persons in general) can be involved in the discussion, so I urge again everyone involved to drop any undefined terminology or to add in the task description a glossary of the custom terminology used in this discussion.

Joe updated the task description. (Show Details)Nov 21 2016, 5:57 PM
bd808 awarded a token.Nov 27 2016, 3:56 PM

@bd808 What are the use-cases you have in mind that are not idempotent and where exactly ones semantics is absolutely required for correctness?

MassMessage relies on this when using the job queue so that messages are only delivered once. Same with TranslationNotifications.

I think @Joe is right about the job queue not guaranteeing exactly once semantics in a strict sense:

I am pretty sure the actual system does not ensure single delivery with the certainty you talk about. Also, a lot, and I mean a lot of jobs get repeated, fail halfway thorough, events get duplicated at the source too.
I would wait for a word from @aaron but I'm pretty sure from what I see happening live that "ensure execution only once" is not a requirement of the system.

Implementing strict "exactly once" semantics in a distributed system is fairly difficult. For something like MassMessage, you'd need some means of *atomically* sending a mail & recording that fact on durable, distributed storage. Without atomicity or durability, you run into FLP.

I also agree with @Joe that strict exactly-once guarantees are probably not needed. A very low chance of duplicate mails after issues with the mail infrastructure would likely be tolerable in exchange for availability, simplicity, and performance.

greg triaged this task as Normal priority.Nov 30 2016, 10:59 PM
cscott added a subscriber: cscott.Jan 4 2017, 10:47 PM

FWIW, the offline content generation service (OCG, generates PDFs, ZIM files, books, etc) also has a bespoke job queue system created to ensure that large-ish books (eg, wikiproject medicine w/ lots of articles) can be rendered asynchronously, with feedback provided to the requester about percentage completion, and then a download link to a cached 10s-of-megabytes file at the end. It would be fantastic if that system could be replaced by more standard infrastructure.

On the other hand, there are projects in place to remove or deprecate that function of OCG. Users would have to run software locally to create large books. So this shouldn't be a driver for the new job queue.

Restricted Application added a project: Analytics. · View Herald TranscriptJan 4 2017, 10:47 PM

To the owner of this session: Here is the link to the session guidelines page: https://www.mediawiki.org/wiki/Wikimedia_Developer_Summit/2017/Session_Guidelines. We encourage you to recruit Note-taker(s) 2(min) and 3(max), Remote Moderator, and Advocate (optional) on the spot before the beginning of your session. Instructions about each role player's task are outlined in the guidelines. The physical version of the role cards will be made available in all the session rooms. Good luck prepping, see you at the summit! :)

Joe added a comment.Jan 9 2017, 5:38 PM

Slides for the starting the discussion available here https://docs.google.com/presentation/d/1DCofLYbP1dWnTb1JWNNnsb0Zp_da8sBhDzlwjCXRoq8/edit?usp=sharing

I'll upload those to commons after the Developer Summit.

Note-taker(s) of this session: Follow the instructions here: https://www.mediawiki.org/wiki/Wikimedia_Developer_Summit/2017/Session_Guidelines#NOTE-TAKER.28S.29 After the session, DO NOT FORGET to copy the relevant notes and summary into a new wiki page following the template here: https://www.mediawiki.org/wiki/Wikimedia_Developer_Summit/2017/Your_Session and also link this from the All Session Notes page: https://www.mediawiki.org/wiki/Wikimedia_Developer_Summit/2017/All_Session_Notes. The EtherPad links are also now linked from the Schedule page (https://www.mediawiki.org/wiki/Wikimedia_Developer_Summit/2017/Schedule) for you!

Nuria moved this task from Incoming to Radar on the Analytics board.Jan 23 2017, 4:46 PM

Slides for the starting the discussion available here https://docs.google.com/presentation/d/1DCofLYbP1dWnTb1JWNNnsb0Zp_da8sBhDzlwjCXRoq8/edit?usp=sharing
I'll upload those to commons after the Developer Summit.

Did this happen? If not, could you please do so? :)

I moved the notes to https://www.mediawiki.org/wiki/Wikimedia_Developer_Summit/2017/Asynchronous_processing

Joe added a comment.Jan 27 2017, 11:31 AM

Slides for the starting the discussion available here https://docs.google.com/presentation/d/1DCofLYbP1dWnTb1JWNNnsb0Zp_da8sBhDzlwjCXRoq8/edit?usp=sharing
I'll upload those to commons after the Developer Summit.

Did this happen? If not, could you please do so? :)
I moved the notes to https://www.mediawiki.org/wiki/Wikimedia_Developer_Summit/2017/Asynchronous_processing

Heh, guilty as charged! I'll add the license and upload it today.

Joe closed this task as Resolved.Jan 30 2017, 7:46 AM