Page MenuHomePhabricator

Job queue should not overload the DB servers when there is replication lag
Closed, ResolvedPublic

Description

Special:RunSingleJob waits for replication using LoadBalancer::waitForAll() after every job. @jcrespo reports that when there is replication lag, this causes so many connections waiting in SELECT MASTER_GTID_WAIT() that the connection limit is reached and the site goes down.

The point of waitForAll() is to throttle a loop so that the loop runs at the speed of the slowest slave. There's no point in calling it if the rate at which jobs are executed is not affected by the latency of each job.

My proposal is that the LoadBalancer::waitForAll() call be removed from JobExecutor. Instead, ChangeProp should monitor replication lag itself, and stop popping jobs from Kafka for a given section/partition if the lag is too high.

Also, the concurrency limits should be reviewed.

The exact algorithm that ChangeProp should use to monitor lag is a tricky subject. If it keeps executing jobs until 3s of lag is reached, then stops completely, then job execution may cause the lag to be permanently stuck at 3s, which is too high. But if it offloads at say 0.2s, then it may never get anything done. The difficulty is in assigning a cause to an increase in lag. Any lag that is caused by job execution should be a signal to reduce the rate. Lag that is caused by other things should be a signal too, but a weaker one. Executing jobs at a high rate may exacerbate pre-existing lag, but executing jobs at a near-zero rate should have no effect.

One possibility would be to have a PID controller which controls a rate limiter. The rate limiter could add a sleep time between jobs derived by combining the lag (P), the sum of the lag times tallied over some time interval (I), and the rate at which lag is increasing (D).

Incorporating the integral (I) term means that it can allow temporary lag, but take action if the lag continues. The controller is linear so there are no surprising thresholds, the rate will ramp up and down continuously.

Tuning a PID controller can be difficult, since it can oscillate if the I or D factors are too high. The reason I like it is because it's smart while still being possible to understand and debug. You can expose separate P, I and D metrics as well as the resulting added latency.

Lag can be measured using the MW API e.g. https://en.wikipedia.org/w/api.php?action=query&meta=siteinfo&siprop=dbrepllag&sishowalldb=

Event Timeline

tstarling created this task.
tstarling created this object with visibility "Custom Policy".

Just one very primary thought:

The point of waitForAll() is to throttle a loop so that the loop runs at the speed of the slowest slave. There's no point in calling it if the rate at which jobs are executed is not affected by the latency of each job.

Change-Prop uses the concurrency limit instead of the rate-limit. The idea is that we have X number of slots per job type for jobs being executed and when the slots are filled up no more jobs are dequeued. As soon as the job finishes, a slot gets freed up and the new job is dequeued. So, when the average job latency goes up (which incorporates waiting for slaves), the average rate goes down. You can see that, for example, for the refreshLinks.

But, I guess this either has a bug and not working as designed, or it's not enough by design.

My first guess that it's the latter and the problem is that with the fixed concurrency, it's too simple. The avg rate is basically X/avg(latency) where latency is the actual execution time + current lag, so in the times of lag the concurrency is too high and we need to slow down more, while the constant concurrency cannot really be decreased permanently as we would not catch up during the good times.

We did come around the idea of dynamic concurrency for better deduplication but didn't get close to it yet T188667

tstarling changed the visibility from "Custom Policy" to "Public (No Login Required)".Aug 17 2018, 1:56 AM

I think I've found the correct configuration file now, at mediawiki/services/change-propagation/jobqueue-deploy/scap/vars.yaml . I couldn't tell if the concurrency limits are normally reached, and I couldn't figure out how they add up to a global connection count. Looking at current connection counts from scb* to jobrunner.svc with netstat I see counts of 113, 318, 52, 107. MediaWiki has 60 job types, is it correct to multiply that by 30, which is the top-level concurrency in vars.yaml, and then to adjust for the overridden queue types? 55 classes with 30 connections each plus the 5 overrides would make 1970 connections. Then I multiply this by 4 scb servers, for a total of 7880 maximum connections. Is this correct?

What I'm saying is that there are shared resources between these job types so it may make sense to have a global concurrency limit. You could use PoolCounter for this. poolcounter1001 is handling ~6k connections per second with 6% CPU usage, so it could easily keep track of jobrunner state at a rate of 1k req/s. You can do a nonblocking lock in poolcounter with timeout=0. PoolCounter does not store the concurrency limit, it only stores the population, so concurrency limits can be adjusted with each lock acquisition request.

To write the task description, I pondered for a while about what exactly the PID controller should control. If it controls the concurrency limit, then its control is imprecise once the concurrency limit gets down to small numbers like zero or one. This would be improved by pooling the concurrency limits across job types. Maybe this is the best thing to do if you already have plans for that.

If you control the rate limit, then it has no effect unless the rate limit is less than the normal rate. So maybe you have to scale it by the insertion rate or something. If you control the latency but keep the concurrency fixed, then you have a similar situation to what we have now, except that the waiting will be done without so many resources tied up. It has no effect until the latency rises enough to cause the concurrency limit to be reached, but then it continues working in a natural way as the latency rises, even if the concurrency limit is 1.

I think I've found the correct configuration file now, at mediawiki/services/change-propagation/jobqueue-deploy/scap/vars.yaml .

Yes, this is the correct one.

I couldn't tell if the concurrency limits are normally reached

For low-traffic jobs, which are the vast majority of the jobs, the normal rate is less than 1 per second and the latency is within one second, so the limit is never reached. For high-traffic jobs like RecordLint or refreshLinks - those that have the overrides - we've tuned the limit to be just a little bit more than required for normal operation, so I would say you can count is very close to the limit.

is it correct to multiply that by 30, which is the top-level concurrency in vars.yaml, and then to adjust for the overridden queue types? 55 classes with 30 connections each plus the 5 overrides would make 1970 connections. Then I multiply this by 4 scb servers, for a total of 7880 maximum connections. Is this correct?

There's one "hidden" override, as refreshLinks job has 8 partitions that all work in parallel and the limit is 20 per partition, so the overall override for refreshLinks is 160. So the math would be 54 * 30 + 10 + 100 + 10 + 50 + 150 + 160 = 2100. However, multiplying it by 4 is not required, since only a single process handles one topic-partition - that means a single topic-partition will only be running on a single node at a time (when that node fails, it will failover to the other one, but in a given moment only one is running)

I see counts of 113, 318, 52, 107

This is more like the truth, cause for 54 classes the limit is almost never reached - we can probably estimate the average for each of the 54 classes to be around 1. For 6 exception the limit is almost reached most of the time. So your number overall is 590 and if my estimations are correct the number should be around 534 - given that my estimated average for 54 low-traffic classes is back of the napkin math, the numbers are close enough to seem correct.

I've looked into the historical data for the 54 low-traffic jobs rates and there's rarely spikes simultaneously, so I don't believe the overall limit is ever reached during the normal operation. There is one case where it might be reached - during deploys when the service is stopped for a bit, some backlog is accumulated and then when it's started all the backlog is cleaned up quite quickly. If @jcrespo has examples of the graphs where the problem is seen, I can correlate that with the deploys and look if that contributes to the problem.

As a start, I propose to vastly decrease the default limit, fine-tune jobs that don't fit and look if anything changes - that's much easier then implementing the global limit, can be done within a day and will give us additional data points.

To be continued. I need more time to reflect on the last 3 paragraphs, just wanted to get the easy answers out quicker.

If @jcrespo has examples of the graphs where the problem is seen

I can stop replication or partially bring one server down and show it, but @tstarling won't let me for now. You have examples on the instances I mention at T180918, the part that it is the job queue or other maintenance jobs, that I don't know (I don't disagree either, I just haven't checked) but obviously I trust Tim on being on the right track.

during the normal operation

I think the issue is with not-normal operation, e.g. one db server down or lagged considerably. And maybe some kind of cache-stampede? Honestly, I don't know the details, I just reported what I saw (please take my doubt as I am confident on the effects I see on outages, I am unsure about the causes).

I can stop replication or partially bring one server down and show it, but @tstarling won't let me for now.

We can test it once https://gerrit.wikimedia.org/r/c/mediawiki/core/+/453091 is deployed

As a start, I propose to vastly decrease the default limit, fine-tune jobs that don't fit and look if anything changes - that's much easier then implementing the global limit, can be done within a day and will give us additional data points.

To avoid poking in the darkness with this immediate action, I've made a PR to introduce a new metric into change-prop that would show the actual number of concurrently running jobs per job type. After that's deployed on Monday we can decrease the individual job concurrency in an informed way.

One thing that's probably happening (and the new metric will allow us to prove that) is that when DB becomes slow, and the jobs start executing slower, all the high-traffic jobs, that run close to the concurrency limit reach the max concurrency and the lag starts to accumulate in Kafka.

However, all the low-traffic jobs normally run well under the concurrency limit, so they start piling up in the executing state, using more and more connections until they reach the absolute maximum of 2100 connections. Given the high variety of rates for different jobs it's hard to estimate how many connections we end up accumulating in reality, but the situation is definitely wrong regardless of the actual numbers.

Lowering and fine-tuning the individual concurrencies will make the situation much better, but I don't think that a solution in the long run.

Change 453472 had a related patch set uploaded (by Ppchelko; owner: Ppchelko):
[mediawiki/services/change-propagation/jobqueue-deploy@master] WIP: Fine-tune the concurrencies for jobs.

https://gerrit.wikimedia.org/r/453472

Mentioned in SAL (#wikimedia-operations) [2018-08-20T16:58:57Z] <ppchelko@deploy1001> Started deploy [cpjobqueue/deploy@4989231]: Create metrics to track the actual concurrent job executions T202107

Mentioned in SAL (#wikimedia-operations) [2018-08-20T16:59:53Z] <ppchelko@deploy1001> Finished deploy [cpjobqueue/deploy@4989231]: Create metrics to track the actual concurrent job executions T202107 (duration: 00m 55s)

Mentioned in SAL (#wikimedia-operations) [2018-08-20T22:26:43Z] <ppchelko@deploy1001> Started deploy [cpjobqueue/deploy@541932a]: Make the queue sized report stats once a second T202107

Mentioned in SAL (#wikimedia-operations) [2018-08-20T22:27:30Z] <ppchelko@deploy1001> Finished deploy [cpjobqueue/deploy@541932a]: Make the queue sized report stats once a second T202107 (duration: 00m 46s)

Finally, after several wrongful attempts to create it, we now have a graph that shows the number of concurrently executing jobs per job type. We need some time to accumulate more data, and then we'd be able to fine-tune concurrencies in the job queue.

NOTE: refreshLinks is separated into 8 partitions, so real number for refreshLinks is multiplied by 8. Not sure how to approach this, since we don't really want to include partition number into the metric (for now)

Here are some numbers I've got from our new graphs:

Job NameActual ConcurrencyConcurrency Limit
cirrusSearchLinksUpdatePrioritized52.3199150
webVideoTranscode25.027230
ThumbnailRender11.295130
refreshLinks9.825820
webVideoTranscodePrioritized6.507930
ORESFetchScoreJob6.468530
cirrusSearchIncomingLinkCount5.187250
cirrusSearchCheckerJob3.930910
categoryMembershipChange3.037030
wikibase-addUsagesForPage2.775130
refreshLinksPrioritized2.314530
RecordLintJob2.1314100
htmlCacheUpdate1.580910
wikibase-InjectRCRecords0.428630
CentralAuthCreateLocalAccountJob0.422430
ChangeNotification0.284030
recentChangesUpdate0.160630
flaggedrevs-CacheUpdate0.127730
cirrusSearchDeletePages0.108730
enotifNotify0.100630
EchoNotificationDeleteJob0.079830
cdnPurge0.072530
LocalGlobalUserPageCacheUpdateJob0.055330
LoginNotifyChecks0.052530
AssembleUploadChunks0.049030
PublishStashedFile0.048630
CognateCacheUpdateJob0.038430
cirrusSearchOtherIndex0.034330
CognateLocalJobSubmitJob0.027030
UpdateRepoOnMove0.023130
globalUsageCachePurge0.023030
cirrusSearchLinksUpdate0.017430
updateBetaFeaturesUserCounts0.010630
cirrusSearchElasticaWrite0.007330
TTMServerMessageUpdateJob0.006730
UpdateRepoOnDelete0.006730
GlobalUserPageLocalJobSubmitJob0.005330
LocalRenameUserJob0.005230
EchoNotificationJob0.003130
LocalPageMoveJob0.001430
TranslationsUpdateJob0.001030
MessageGroupStatesUpdaterJob0.001030
TranslateRenderJob0.000530
BounceHandlerJob0.000430
cirrusSearchDeleteArchive0.000230
renameUser0.000130
userGroupExpiry0.000130
NOTE: refreshLinks is split into 8 partitions, the numbers here are per-partition. cirrusSearchChecker job comes in batches and the concurrency limit was carefully selected.

Also, my previous math was incorrect - most of the lowest-traffic jobs are a part of one rule, so they are executed within the same process and share the concurrency. The jobs that are NOT a part of the catch-all rule are marked in italic in the table.

CirrusSearch jobs have their own rules because they need special treatment regarding retries and delays, some low-traffic jobs have their own rules for historical reasons.

I will try to restructure the config in order to reflect reality better.

Very nice work @Pchelolo ! I would second your suggestion with a modification: perhaps we should move some jobs out of the catch-all rule as there is a clear disparity in processing. Case in point: ThumbnailRender. This job should definitely have its own rule.

Alternatively, we could group the rules a bit differently: have rules that are executed in the (1,20) range in one rule and those that are usually in the (0,1) in another. That way we could fine-tune the concurrency (and number of connections) slightly better than we can now without the need to resort to having special rules for exceptions (like the Thumbnail job above).

If we do that, the first and easy part of this task will be done. Making concurrencies automatically adjustable would be the next step

I would say that after tuning the concurrency settings, we need to sit and wait to see what happens. Having auto-adjustments for this setting means employing heuristics which may do more harm than good, so we have to be careful with that. Slashing the maximum number of connections by a factor of three is already a pretty big deal, IMHO.

Change 453472 merged by Mobrovac:
[mediawiki/services/change-propagation/jobqueue-deploy@master] Fine-tune the concurrencies for jobs.

https://gerrit.wikimedia.org/r/453472

You could measure concurrency by summing a timing metric, like I did for the "load" metrics in the API dashboard: https://grafana.wikimedia.org/dashboard/db/api-backend-summary?refresh=5m&orgId=1 . I used:

aliasByNode(highestAverage(scaleToSeconds(MediaWiki.api.*.executeTiming.sum,0.001), 10), 2)

This is based on the idea that the concurrency is equal to the number of seconds of request latency consumed per second of actual time. scaleToSeconds() is the only function in Graphite which gives you access to the sampling interval, its second argument is a scaling factor, here 0.001 due to the fact that this execution time is measured in milliseconds.

This would solve your partitioning problem, since partitions would add together in the expected way.

Change 454573 had a related patch set uploaded (by Ppchelko; owner: Ppchelko):
[mediawiki/services/change-propagation/jobqueue-deploy@master] Revisit concurrencies for some high-traffic jobs.

https://gerrit.wikimedia.org/r/454573

Change 454573 merged by Ppchelko:
[mediawiki/services/change-propagation/jobqueue-deploy@master] Revisit concurrencies for some high-traffic jobs.

https://gerrit.wikimedia.org/r/454573

Mentioned in SAL (#wikimedia-operations) [2018-08-22T17:20:41Z] <ppchelko@deploy1001> Started deploy [cpjobqueue/deploy@32a81be]: Revisit jobs concurrencies T202107

Mentioned in SAL (#wikimedia-operations) [2018-08-22T17:21:30Z] <ppchelko@deploy1001> Finished deploy [cpjobqueue/deploy@32a81be]: Revisit jobs concurrencies T202107 (duration: 00m 49s)

mobrovac claimed this task.

Things have been stable for a while now, so closing. We also have effective execution rate on the graphs, so we can alert on them.