Page MenuHomePhabricator

Support multi DC statsv
Closed, ResolvedPublic13 Estimated Story Points

Description

statsv should be available in eqiad as well as codfw. We are currently moving Kafka clients off of the cluster in favor of jumbo. statsv should go to main Kafka instead. We should then set up statsv instance in eqiad and codfw.

Event Timeline

Change 391703 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/statsv@master] Support consumption from multiple topics

https://gerrit.wikimedia.org/r/391703

Change 391705 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Move statsv varnishkafka and service to use main Kafka cluster(s)

https://gerrit.wikimedia.org/r/391705

@Krinkle, I may have forgotten the exact implementation we talked about. Was this it? Should statsv instances in each DC consume all statsv messages from both DCs? Or should a DC's statsv instance only receive messages from that DC?

@Krinkle, I may have forgotten the exact implementation we talked about. Was this it? Should statsv instances in each DC consume all statsv messages from both DCs? Or should a DC's statsv instance only receive messages from that DC?

In the current setup, statsd/graphite is in a master/replica setup. And for write purposes, single-dc only (secondary is backup). There should be only one active instance of statsv.py consuming from kafka that writes to statsd.

If the underlying production of messages for this topic is going to be changed in a way that you need to consume from both codfw and eqiad in order to get "all" actual beacon/statsv messages from end users, then it should indeed be updated to consume from two topics.

Is that the plan? I'm curious how those would be distributed in that case, given we also have varnishkafka instances in the edge clusters, right? (ulsfo, esams)

Wow ok, had so much discussion in IRC today with folks, especially with @BBlack

https://gerrit.wikimedia.org/r/#/c/391705/ is updated with the results of that discussion.

This is the first time we've needed to produce webrequest traffic from varnish in a multi-DC fashion. So, we needed a way to pick the Kafka cluster that should be produced to from a given DC, e.g. varnishkafka in esams should produce to Kafka main-eqiad. I've added a kafka_datacenter_map hiera config that is used to figure that out.

@Krinkle, I just realized that yall don't have webperf[12]001 set up yet, right? Since my change here requires that statsv run in codfw to get the statsv messages from the main-codfw cluster, we'll need to set that up first. I guess I can factor out the statsv related stuff from profile::webperf, and we can just include that on webperf[12]001 now. Thoughts?

I'm going to bring this discussion back here from gerrit, since it will be easier to keep track of and find later.

Reading further, it sounds like this is aiming not to do cross-dc replication of the Kafka messages, is that right?

Correct, for graphite metrics, replication is handled by graphite mirroring.

  • Proposed:
    • varnishkafka::statsv from main and edge DCs write to a main DC kafka instance to "statsv" topic.
    • statsv.py runs in each DC consuming from that dc-local topic.
  • Alternate-1:
    • varnishkafka write a main DC Kafka "statsv" topic (as above).
    • This topic is somehow replicated.
    • The DC were statsd is active, statsv.py also runs, and consumes from both topics. This means that if a main DC is down, messages will stop replicating, but nothing changes from statsv.py perspective.
  • Alternate 2:
    • varnishkafka write a main DC Kafka "statsv" topic (as above).
    • The DC were statsd is active, statsv.py also runs and consumes from all main DC brokers (eqiad+codfw). This means that if a main DC is down, the statsv.py client will need to handle re-connection and such. I don't know enough about python-kafka or Kafka to know what is best here.

I don't have a strong preference any model, but I feel Alternate-1 matches other services we have. But maybe we want to do it differently intentionally? Or maybe I misunderstand the other services we have (quite likely!). I also don't think the alternate is better, just want to make sure I understand the decisions.

I also considered Alternate-1 first. However we don't really have a concept of an 'active/inactive' DC. Individual services can be switched between DCs, but we don't have a way of switching an entire DC. I couldn't figure out (even with Brandon's help) a good way of deciding which statsv consumer instance should be active. At any given time, there will be a single active statsd instance, so we could use that. But, this is actually switched using discovery DNS, not a hostname change. The active statsd hostname is always statsd.eqiad.wmnet, even if the active statsd instance is actually running in codfw. statsd.eqiad.wmnet's IP will change based on some confd change (in etcd, I think? I don't actually know how this works).

With the Proposed solution, yes, if a DC's main Kafka cluster is 'down', statsv metrics will stop being produced and will be dropped. However, we've never had a Kafka cluster 'down' (except for a short period of time during a botched upgrade due to a compression bug in an older version of Kafka years ago). If we did need to take one down, we could theoretically use the kafka_datacenter_map to route producers to the online datacenter instead. (Perhaps this should be a more statsv producer specific route mapping, not sure.)

It may happen that the traffic team will want to take down a varnish text cluster in a datacenter. If that happens, they will route that DC's traffic to another DC. With the Proposed solution, everything will keep working, as the routed-to DC's varnishkafka instances will be configured to produce to it (most) local Kafka cluster.

BTW, before you said:

In the current setup, statsd/graphite is in a master/replica setup

I think this is not totally correct. @godog should correct me if I'm wrong, but I believe statsd is active/inactive, and graphite is mirrored both ways, eqiad <-> codfw.

For posterity, I've archived my IRC conversation with @BBlack about this here: https://phabricator.wikimedia.org/P6465

I'm going to bring this discussion back here from gerrit, since it will be easier to keep track of and find later.

Reading further, it sounds like this is aiming not to do cross-dc replication of the Kafka messages, is that right?

Correct, for graphite metrics, replication is handled by graphite mirroring.

  • Proposed:
    • varnishkafka::statsv from main and edge DCs write to a main DC kafka instance to "statsv" topic.
    • statsv.py runs in each DC consuming from that dc-local topic.
  • Alternate-1:
    • varnishkafka write a main DC Kafka "statsv" topic (as above).
    • This topic is somehow replicated.
    • The DC were statsd is active, statsv.py also runs, and consumes from both topics. This means that if a main DC is down, messages will stop replicating, but nothing changes from statsv.py perspective.
  • Alternate 2:
    • varnishkafka write a main DC Kafka "statsv" topic (as above).
    • The DC were statsd is active, statsv.py also runs and consumes from all main DC brokers (eqiad+codfw). This means that if a main DC is down, the statsv.py client will need to handle re-connection and such. I don't know enough about python-kafka or Kafka to know what is best here.

I don't have a strong preference any model, but I feel Alternate-1 matches other services we have. But maybe we want to do it differently intentionally? Or maybe I misunderstand the other services we have (quite likely!). I also don't think the alternate is better, just want to make sure I understand the decisions.

I also considered Alternate-1 first. However we don't really have a concept of an 'active/inactive' DC. Individual services can be switched between DCs, but we don't have a way of switching an entire DC. I couldn't figure out (even with Brandon's help) a good way of deciding which statsv consumer instance should be active. At any given time, there will be a single active statsd instance, so we could use that. But, this is actually switched using discovery DNS, not a hostname change. The active statsd hostname is always statsd.eqiad.wmnet, even if the active statsd instance is actually running in codfw. statsd.eqiad.wmnet's IP will change based on some confd change (in etcd, I think? I don't actually know how this works).

Current status quo upon graphite/statsd failover: we'd flip the CNAME for statsd.eqiad.wmnet to point to codfw like we did in T157022, and then restart statsd producers that don't pick up the change by themselves (list at T88997).
This is suboptimal in many ways of course, moving to discovery dns records would at least avoid manually changing the dns since discovery records follow etcd. Restarting the statsd producers would of course still be required, though the list of misbehaving producers in T88997 is getting shorter and shorter.

BTW, before you said:

In the current setup, statsd/graphite is in a master/replica setup

I think this is not totally correct. @godog should correct me if I'm wrong, but I believe statsd is active/inactive, and graphite is mirrored both ways, eqiad <-> codfw.

That's correct, for graphite all metrics sent to eqiad or codfw will be sent to both eqiad and codfw. For statsd we're running active/inactive because statsd aggregations need to happen all in the same place (i.e. statsd.eqiad.wmnet)

Thanks for the clarification. Two last questions. If I understand correctly, the outcome of Proposed is:

  1. Statsv in codfw will actively write to statsd in eqiad, across DCs.
  2. If codfw has issues, then in addition to codfw's own messages, messages from other DCs that route via codfw/kafka would be lost. new messages would work once re-configured to target eqiad.

Regarding 1 - Feel free to ignore, but it /seems// better for the large traffic cross-dc communication to go over verified protocols like TCP/Kafka, as opposed to UDP/Statsd itself. But maybe that is insignificant in our case? Also, does it differ in how/whether it would be encrypted? If so, should it?

Regarding 2 - In what scenario is this multi-dc setup beneficial. From a naive standpoint, it doesn't seem better to have non-codfw DCs send messages to statsd via codfw instead of directly to eqiad. I suspect there is a benefit, but I'm not seeing it. In which scenario? E.g. codfw outage, eqiad outage, routine switchover, other?

Regarding 1 [...] for the large traffic cross-dc communication to go over verified protocols like TCP/Kafka, as opposed to UDP/Statsd itself.

Indeed, but statsd has a problem, because it needs to receive all metrics of the same names in order to do its metric stats (mean, median, p99, etc.). Since the statsv metrics are not themselves prefixed with something like datacenter name, we can only have one active statsd instance at any given time.

Also, does it differ in how/whether it would be encrypted? If so, should it?

Currently the cross DC (non webrequest) Kafka traffic is unencrypted. I doubt @Godog will spend time on figuring out how to encrypt statsd traffic, as I *think* he'd prefer if we moved everything to prometheus.

Regarding 2 - In what scenario is this multi-dc setup beneficial.

If there is a real E.g eqiad outage, then we will route (manually) the statsd instance and all statsv varnishkafka producers to codfw. Aside from that, in this case, running active/active statsv varnishkafka producers & statsv consumers doesn't really offer much benefit. We could have all statsv kafka clients point at eqiad, and then only use codfw for manual failover.

Again, the weirdness of this setup here is due to the fact that statsd is active/passive, but your Kafka clients can work active/active. I think a more ideal setup would be as follows:

  • A 'main' local kafka cluster in every DC.
  • All Kafka messages mirrored from 'main' local DCs to 'aggregate' kafka clusters in each core-DC (eqiad & codfw).
  • Kafka consumers consuming from aggregate kafka clusters in each core DC, writing to (non-mirrored) prometheus instances in core DCs.

In this setup, replication/mirroring of messages are handled by Kafka (MirrorMaker).

Or! We could do like we do for all the other replicated topics, and use DC prefixes, and keep the edge DC kafka cluster routing map. This would be like the above suggestion, but without local kafka clusters in edge DCs (how we have Kafka clusters set up now):

  • statsv varniskafka producers produce to DC prefixed topics based on kafka_datacenter_map, e.g. esams varnishkafka writes to main-eqiad Kafka Cluster eqiad.statsv topic.
  • MirrorMaker replicates eqiad prefixed topics -> main-codfw, and vice versa.
  • statsv consumers in each core DC consume only from their DC's prefixed topics, and then write to their DC's prometheus instances.

Both this and the above suggestion don't use statsd though, so they are just an idealized setup that we can't do right now.

Regarding 1 [...] for the large traffic cross-dc communication to go over verified protocols like TCP/Kafka, as opposed to UDP/Statsd itself.

Indeed, but statsd has a problem, because it needs to receive all metrics of the same names in order to do its metric stats (mean, median, p99, etc.). Since the statsv metrics are not themselves prefixed with something like datacenter name, we can only have one active statsd instance at any given time.

Hm.. I think we misunderstood. I meant that instead of:

Proposed (cross-dc write of statsd)

ulsfo/varnishkafka ---> codfw/kafka -> codfw/statsv ---> eqiad/statsd [writes statsd cross-dc]
esams/varnishkafka ---> eqiad/kafka -> eqiad/statsv -> eqiad/statsd

Compared to

(cross-dc write of kafka, only)

ulsfo/varnishkafka ---> codfw/kafka ---> eqiad/kafka [writes kafka cross-dc]
eqiad/kafka -> eqiad/statsd/eqiad/statsd

The difference being that either we transfer non-eqiad data to eqiad over statsd, or we transfer it to eqiad over kafka, and only write from within the main dc from local kafka to statsd. The latter seems preferred, if we take for granted the traditional problems with UDP (higher packet loss and latency cross-dc, no retry), compared to Kafka, which uses TCP right?

Anyhow, it sounds like your second comment is already proposing the same with MirrorMaker, is what I had in mind, too.

We could do like we do for all the other replicated topics, and use DC prefixes, [..] replicated.

I think this could work well for statsd, as well as Prometheus. This is similar to what we do for EventStreams, right?

We'd have one active statsv instance in the main DC, consuming from local Kafka (eg. topics eqiad.statsv and codfw.statsv, where one is actually a mirrored topic). In case of a switch-over, only the varnishkafka writer map needs to be adjusted, and statsv active flag switched from old to new dc. Not perfect, but a step in the right direction perhaps?

Compared to (cross-dc write of kafka, only)

Ah yeah, that is this:

Aside from that, in this case, running active/active statsv varnishkafka producers & statsv consumers doesn't really offer much benefit. We could have all statsv kafka clients point at eqiad, and then only use codfw for manual failover.

The latter seems preferred, if we take for granted the traditional problems with UDP (higher packet loss and latency cross-dc, no retry), compared to Kafka, which uses TCP right?

This is a good point, although a potential problem is Kafka MirrorMaker lag. Kafka mirroring isn't instantaneous, and if the MirrorMaker service is ever offline, it will continue from where it left off when it comes back. Then all the statsv metrics would get emitted to statsd all at once.

statsv active flag switched from old to new dc. Not perfect, but a step in the right direction perhaps?

I don' really like this, because it requires that either statsv is aware if it is a 'master', OR requires starting the statsv service up in codfw during a switchover.

the varnishkafka writer map needs to be adjusted

I think you are saying this, but ya, we wouldn't need to change this map unless we need to shut down a main Kafka cluster, which in practice has never happened.

More thoughts about ^: in my conversation with @BBlack, I had originally wanted to more simply use cache traffic routing for DC failover stuff. We could tie varnishkafka producer Kafka cluster configuration to cache text routing. E.g. if the esams backend was switched from eqiad -> codfw, then the esams varnishkafka producers should also point at main-codfw. In the case of a full eqiad -> codfw DC switchover, all cache text backends would point ultimately at codfw, and all varnishkafka producers would as well.

More thoughts about ^: in my conversation with @BBlack, I had originally wanted to more simply use cache traffic routing for DC failover stuff. We could tie varnishkafka producer Kafka cluster configuration to cache text routing. E.g. if the esams backend was switched from eqiad -> codfw, then the esams varnishkafka producers should also point at main-codfw. In the case of a full eqiad -> codfw DC switchover, all cache text backends would point ultimately at codfw, and all varnishkafka producers would as well.

I like this idea. I think it'd be best to keep the software agnostic of this internal matching of configurations, but from a maintenance perspective I don't see any reason not to make them behave the same by default. Certainly saves configuration overhead and the less steps during a switchover, the better. I guess this would be expanded/substituted at Puppet provisioning time.

I don't really like this, because it requires that either statsv is aware if it is a 'master', OR requires starting the statsv service up in codfw during a switchover.

Right. The alternative is to have both running by default, but that means (by default) writing significant amounts of UDP/statsv data across data centres, which currently we avoid. I suppose there is one way to avoid this, which is somewhat similar to what we used to do with the job queue (although not anymore).

Namely, that both statsv instances are active by default, and consume only from the local kafka, but that the passive DC's kafka basically receives no messages under normal operation. But that in the event of a switchover, it's standing by and ready to produce. Regardless of the reason for the switchover (MediaWiki/Varnish-related, Kafka-related, or otherwise). In case of serious outage, there would still be a service restart needed in order for the Statsv instance to re-resolve the statsd hostname, though.

Regarding 1 [...] for the large traffic cross-dc communication to go over verified protocols like TCP/Kafka, as opposed to UDP/Statsd itself.

Indeed, but statsd has a problem, because it needs to receive all metrics of the same names in order to do its metric stats (mean, median, p99, etc.). Since the statsv metrics are not themselves prefixed with something like datacenter name, we can only have one active statsd instance at any given time.

Also, does it differ in how/whether it would be encrypted? If so, should it?

Currently the cross DC (non webrequest) Kafka traffic is unencrypted. I doubt @Godog will spend time on figuring out how to encrypt statsd traffic, as I *think* he'd prefer if we moved everything to prometheus.

Indeed, ATM the cross-dc statsd/graphite is unencrypted and I've been focusing on moving to Prometheus instead.

More thoughts about ^: in my conversation with @BBlack, I had originally wanted to more simply use cache traffic routing for DC failover stuff. We could tie varnishkafka producer Kafka cluster configuration to cache text routing. E.g. if the esams backend was switched from eqiad -> codfw, then the esams varnishkafka producers should also point at main-codfw. In the case of a full eqiad -> codfw DC switchover, all cache text backends would point ultimately at codfw, and all varnishkafka producers would as well.

I like this idea. I think it'd be best to keep the software agnostic of this internal matching of configurations, but from a maintenance perspective I don't see any reason not to make them behave the same by default. Certainly saves configuration overhead and the less steps during a switchover, the better. I guess this would be expanded/substituted at Puppet provisioning time.

I like this idea as well, it makes easy to reason about the kafka flow of messages following the same path/routing of http requests. In this scenario statsv in eqiad/codfw would consume all messages in their local kafka and send to statsd. This of course has the issue @Krinkle
mentioned of significant cross-dc udp traffic, though I think this strategy paves the way better for what we'd do with prometheus. Namely, change statsv to also aggregate metrics via prometheus python client and expose them via http. We'd then collect the metrics and aggregate in prometheus.

I don't really like this, because it requires that either statsv is aware if it is a 'master', OR requires starting the statsv service up in codfw during a switchover.

Right. The alternative is to have both running by default, but that means (by default) writing significant amounts of UDP/statsv data across data centres, which currently we avoid. I suppose there is one way to avoid this, which is somewhat similar to what we used to do with the job queue (although not anymore).

Namely, that both statsv instances are active by default, and consume only from the local kafka, but that the passive DC's kafka basically receives no messages under normal operation. But that in the event of a switchover, it's standing by and ready to produce. Regardless of the reason for the switchover (MediaWiki/Varnish-related, Kafka-related, or otherwise). In case of serious outage, there would still be a service restart needed in order for the Statsv instance to re-resolve the statsd hostname, though.

Indeed, this is I would say the other contending strategy, i.e. a single producer to statsd at all times (I've added statsv to the list of services that don't refresh dns in T88997, though of course it would be nice if it did!)

I like this idea.
I like this idea as well,

IIRC, @BBlack did not like this idea, as he didn't like coupling service routing with traffic routing. I don't think statsv routing is quite a 'service', since really it is just webrequest logs, so I think it's fine, but I think we need to hear from him.

I like this idea as well, it makes easy to reason about the kafka flow of messages following the same path/routing of http requests. In this scenario statsv in eqiad/codfw would consume all messages in their local kafka and send to statsd. This of course has the issue @Krinkle
mentioned of significant cross-dc udp traffic, [..]

As Just to confirm, the significant cross-dc statsd write traffic would only happen if a varnishkafka edge instance is mapped to a secondary DC. It sounds, however, like we're proposing to map them all to the primary DC (eg. eqiad). In that case we can leave both the primary DC and secondary DC's statsv instance running at all times, consuming from their local kafka only, and writing (possibly cross-dc) to the primary statsd. But, if we always map varnishkafka/statsv to the primary DC then in practice there won't be any cross-dc statsd write traffic.

That gives us an optimal most-common path (optimised in that we write varnishkafka/statsv to the same DC where we know the active statsd service is, to avoid e.g. ulsfo>codfw>eqiad but instead just ulsfo>eqiad - purely as optimisation). Whilst also having friction-free multi-DC availability of the statsv service in a way that doesn't require manual switching of which one is active.

Interesting, I think your solution is the simplest. It allows us to support manual failover for multi-DC statsv, but wont' let us do any active/active stuff. I was trying to avoid producing Kafka messages to remote DCs if possible, but in this case it doesn't really make much sense, because statsv will write to a remote statsd instance anyway, and Kafka is more reliable that UDP statsd.

This is also simpler in that we don't have to start mainting a Kafka datacenter routing map. Instead, we just maintain a statsv_kafka_cluster_name (or something).

Ok, will submit a new patch... :)

@Ottomata Cool! Also thanks for raising the concern with regards to replication/MirrorMaker. The replication delay and message congestion after (even a short) downtime of replication would cause many problems with statsd due to the assumption that aggregated metrics accurately reflect values per minute. Metrics increments would be wrongly skewed upwards in such case and likely cause alarms. Even a 1 or 2 minute down time, would naturally cause cause a 2-3X increase (3 minutes being replayed as 1 minute) which by all means should trigger alarms for metrics with a threshold of less than 100%.

Change 391703 merged by Ottomata:
[analytics/statsv@master] Support consumption from multiple topics

https://gerrit.wikimedia.org/r/391703

Change 401750 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/statsv@master] Have to subscribe if using multiple kafka topics.

https://gerrit.wikimedia.org/r/401750

Change 401750 merged by Ottomata:
[analytics/statsv@master] Have to subscribe if using multiple kafka topics.

https://gerrit.wikimedia.org/r/401750

Change 391705 merged by Ottomata:
[operations/puppet@production] Move statsv varnishkafka and service to use main Kafka cluster(s)

https://gerrit.wikimedia.org/r/391705

Alright! varnishkafka-statsv is producing to main-eqiad Kafka, and hafnium statsv instance is consuming from main-eqiad.

You should be able to include webperf::statsv with the same params that are currently in profile::webperf somewhere in codfw, and the statsv instance there will consume from main-codfw Kafka.

MirrorMaker is consuming all topics into analytics-eqiad Kafka, including 'statsv' from both main-eqiad and main-codfw Kafka. statsv topic in analytics-eqiad won't really be used, but I don't see the harm in having it there.

@Krinkle, I'll let you deploy the passive statsv instance to codfw whenever yall are ready.

Ottomata changed the point value for this task from 8 to 13.

Change 401778 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Force more exact protocol version for varnishkafka statsv

https://gerrit.wikimedia.org/r/401778