Page MenuHomePhabricator

[Event Platform] [NEEDS GROOMING] Store Flink HA metadata in Zookeeper
Open, HighPublic

Description

As discussed in T328563#8638389 and elsewhere, using Zookeeper to store Flink HA metadata would avoid manual steps from Service Ops during kubernetes maintenance, like T331126: Update wikikube eqiad to k8s 1.23.

Properly start rdf-streaming-updater flink job (@ dcausse)

To do this, we'll first need T331282: Upgrade flink-kubernetes-operator to 1.4.0, as well as enabling HA for mediawiki-page-content-change-enrichment. We will also need to update zookeeper to a version > 3.7. This might also require either updating the operating system or backporting the Debian package. Right now only Debian Bookworm ships a version of Zookeeper >= 3.7

  • mediawiki-page-content-change-enrichment configured and deployed to store HA metadata in Zookeeper.

Needs discussion

We need to decide whether to pause this task till the target version of zookeeper can be installed on the hosts that support Kafka. Alternatively we could deploy a dedicated zookeeper cluster on Ganeti VMs with either Bookworm or a backported zookeeper package.

Related Objects

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
Ottomata renamed this task from Store Flink HA state in Zookeeper for mediawiki-page-content-change-enrichment to Store Flink HA metadata in Zookeeper for mediawiki-page-content-change-enrichment.Mar 6 2023, 12:30 PM
Ottomata updated the task description. (Show Details)

IIRC there was an objection against using zookeeper because it is really only used by kafka and kafka does no longer require it in newer(?) versions. So we could potentially get rid of it if we're not make other services depend on it until then. (I might be totally misremembering this)

That is indeed an objection! :) But we have two choices it seems:

  • k8s config maps
  • zookeeper

Using k8s config maps means that there will be manual steps when restarting k8s clusters. Using zookeeper (external to k8s)...there shouldn't be!

I think given the options, I (and I'd guess SRE too?) would prefer Zookeeper.

I am guessing flink supports natively writing to zookeeper it's state ? And that this isn't a functionality we are developing ?

Overall yay for not storing the state in k8s config maps, but whatever data store we store that it should be well supported. If kafka ends up not using zookeeper and we end up with flink being the only user of zookeeper, this will effectively make whoever runs flink the de facto owners of zookeeper, which might very well not be what some teams planned for.

I am guessing flink supports natively writing to zookeeper it's state ?

Yes, but more specifically, what is stored is just a URI pointer to the latest checkpoint location (in swift, or wherever) that should be used when restarting the job.

this will effectively make whoever runs flink the de facto owners of zookeeper

Makes sense. If/when this happens, zookeeper ownership should probably move to Data Engineering and/or Search Engineering and/or ML Engineering (AKA DataScienceEngineering DSE).

Ottomata renamed this task from Store Flink HA metadata in Zookeeper for mediawiki-page-content-change-enrichment to Store Flink HA metadata in Zookeeper for.Apr 3 2023, 5:45 PM
Ottomata renamed this task from Store Flink HA metadata in Zookeeper for to Store Flink HA metadata in Zookeeper.

@dcausse @bking we aren't quite ready to test this for mediawiki-page-content-change-enrichment (need T330693 before we can even do checkpointing). Are you using checkpointing in any of your DSE experiments yet? If so, maybe you could try out using Zookeeper for the checkpoint pointers?

I can help with any configuration / access to zookeeper.

@Ottomata yes the job running with the flink-operator on the dse cluster is using checkpoints, it can be used to experiment with zookeeper, we might not have the bandwidth to lead this work but are happy to help merge/deploy/verify any patches related to this.

Zookeeper is probably going to be supported for a long time at the WMF, it is mostly Kafka related but migrating away from it means:

  • Upgrade all clusters to Kafka 3.x
  • Find a different way to store ACLs for Kafka Jumbo topics (for example, we limit to produce events/messages to webrequest topics to Varnishkafka clients (providing a TLS client certificate etc..). Or maybe drop ACLs, if we don't want them anymore, but it needs to be discussed.
  • Wait for KRaft to be production ready and stable, and afaics from the Kafka docs they are not completely there yet. This includes having a way to migrate brokers to KRaft in a rolling fashion (Kafka Jumbo otherwise would need to shutdown for a while loosing data for example).

If it is only HA metadata it should be fine to add this use case in my opinion :)

After chatting with @elukey about this it seems it's a good option to move away from kubernetes configmaps to zookeeper for Flink state. As Zookeeper will be around for quite a while still there might even be the chance that another storage option arises during that time.

So please test/verify Zookeeper so we can maybe start with that directly on wikikube and don't have to migrate afterwards. Speaking of migrations: It would be nice to know how easy it is to switch between the two HA Service implementations.

It would be nice to know how easy it is to switch between the two HA Service implementations.

IIUC, it should be easy. We'd change the HA service config, and when restarting the job, provide a manual pointer to the latest checkpoint. After that, the job will save new checkpoint paths in the new HA system. @dcausse does this sound right?

It would be nice to know how easy it is to switch between the two HA Service implementations.

IIUC, it should be easy. We'd change the HA service config, and when restarting the job, provide a manual pointer to the latest checkpoint. After that, the job will save new checkpoint paths in the new HA system. @dcausse does this sound right?

Yes exactly we've already done this kind of migration with the session cluster approach in T302494, with the operator approach it should be very similar, stop with savepoint, undeploy the job, manually cleanup the resources (confimaps and the H/A resources in the object store), update the chart with the new savepoint path, redeploy.

@elukey @JMeybohm since it seems we reached consensus, I'd like to enable Zookeeper HA in our app. Who's currently responsible for Zookeeper? Is there a request process I should follow, or can I just go ahead and configure the application helmfile?

Flink docs recommend setting zookeeper https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#high-availability-zookeeper-path-root. I need some help to brainstorm / bikeshed a naming convention.

I was thinking we could do something along the lines of what is done for Kafka (assuming I looked at the right doc/configs) and use:

/flink/<cluster-name>/<flink-application-name>

This would turn out to something like:
/flink/dse-k8s/mediawiki-page-content-change-enrichment.

I don't _think_ application name is strictly necessary, but I'd lean towards being explicit. I also don't think we need to include additional metadata (release name, versioning), at least at this stage. My assumptions is that we'll care for HA only for production, which in our case should always be a main release. We might want do deploy canary releases, but I don't think HA is a requirement for those.

Any thoughts?

+1. FYI this would end up being

/flink/dse-k8s-eqiad/mediawiki-page-content-change-enrichment
/flink/wikikube-eqiad/mediawiki-page-content-change-enrichment (or maybe main-eqiad? for k8s cluster name?)

Change 920268 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] mediawiki-page-content-change-enrichment: enable HA

https://gerrit.wikimedia.org/r/920268

As we have one zookeeper cluster per DC, I think it's not required to include the DC name in the Zookeeper path. Apart from that the path looks good to me.
It would be nice if that path could be baked into the flink-app chart somehow to prevent mistakes.

Who's currently responsible for Zookeeper? Is there a request process I should follow, or can I just go ahead and configure the application helmfile?

I think this is "us" as in Service-Ops because we inherited kafka-main somehow. As far as I know there is no request process for this (nothing apart from Kafka uses Zookeeper anyways) - let's treat this as approval. ;)

Who's currently responsible for Zookeeper? Is there a request process I should follow, or can I just go ahead and configure the application helmfile?

I think this is "us" as in Service-Ops because we inherited kafka-main somehow. As far as I know there is no request process for this (nothing apart from Kafka uses Zookeeper anyways) - let's treat this as approval. ;)

Hi @JMeybohm. Thanks for the ack and approval. I'll move forward with this task. Code changes are in place (tested locally) but not deployed yet (we are bikeshedding some naming). I expect we'll start hitting zookeeper either this week or the next one (Event Platform sprint 14A).

As we have one zookeeper cluster per DC, I think it's not required to include the DC name in the Zookeeper path.

Hm, true, but we probably shouldn't try to un-suffix the k8s cluster names. It is redundant, but it is probably better if we just refer to clusters as their full names. This will be much easier for referencing things in configs and templates. E.g. the fact that wikikube-eqiad and wikikube-codfw (I think maybe these are actually called just 'eqiad' and 'codfw'?) are counterpart clusters is not something that any of the code really knows; they are distinct clusters.

It would be nice if that path could be baked into the flink-app chart somehow to prevent mistakes.

Agree, and maybe we can do the same for the s3 buckets too.

Let's do that as a separate task.

As we have one zookeeper cluster per DC, I think it's not required to include the DC name in the Zookeeper path.

Hm, true, but we probably shouldn't try to un-suffix the k8s cluster names. It is redundant, but it is probably better if we just refer to clusters as their full names.

Ack. My understanding was that eqiad would signify k8s eqiad, not ZK. If it breaks naming conventions, let's make sure we document things properly.

It would be nice if that path could be baked into the flink-app chart somehow to prevent mistakes.

Agree, and maybe we can do the same for the s3 buckets too.

Let's do that as a separate task.

Sounds good.

Hm, I mean let's keep your proposed naming schema of:

/flink/<k8s-cluster-name>/<flink-application-name>

We do this for the kafka cluster namespacing in zookeeper too:

12:47:48 [@conf1007:/home/otto] $ sudo /usr/share/zookeeper/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 1] ls /kafka
[main-codfw, jumbo-eqiad, logging-eqiad, eqiad, main-eqiad]

('eqiad' there is a holdover and could probably be deleted. And uh, I don't know why 'main-codfw' is in the eqiad ZK cluster.)

As we have one zookeeper cluster per DC, I think it's not required to include the DC name in the Zookeeper path.

Hm, true, but we probably shouldn't try to un-suffix the k8s cluster names. It is redundant, but it is probably better if we just refer to clusters as their full names. This will be much easier for referencing things in configs and templates. E.g. the fact that wikikube-eqiad and wikikube-codfw (I think maybe these are actually called just 'eqiad' and 'codfw'?) are counterpart clusters is not something that any of the code really knows; they are distinct clusters.

Unfortunately naming of the main/wikikube clusters is a mess. The actual clusters are called eqiad and codfw, the "cluster group" is called "main" currently, but should really be called "wikikube". The helmfile environments are again called eqiad and codfw and I think there is no concept of the cluster group in helmfiles for services (there is for admin_ng, though). So ultimately you don't have access to that particular name from within a helm chart currently. (Oh and I created {T336861: Fix naming confusion around main/wikikube kubernetes clusters} to look into this naming problem again)

Aye :)

It would be nice if that path could be baked into the flink-app chart somehow to prevent mistakes.

Q: is .Environment.Name (AKA k8s cluster name?) available in chart templates? I only see it used in helmfile.yaml.

Aye :)

It would be nice if that path could be baked into the flink-app chart somehow to prevent mistakes.

Q: is .Environment.Name (AKA k8s cluster name?) available in chart templates? I only see it used in helmfile.yaml.

.Environment.Name can be made available as a .Values variable using helmfile, but it is currently not defined as "the cluster name" (although it probably matches sometimes). So I would be careful with that and rather define that manually for now. It would be good to choose a generic .Values structure for that which can be reused by other charts as well.

Change 922497 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/puppet@production] Allow access to conf cluster zookeeper from wikikube and dse-k8s

https://gerrit.wikimedia.org/r/922497

Change 922497 merged by Ottomata:

[operations/puppet@production] Allow access to conf cluster zookeeper from wikikube and dse-k8s

https://gerrit.wikimedia.org/r/922497

Change 920268 merged by jenkins-bot:

[operations/deployment-charts@master] mw-page-content-change-enrich: enable HA and checkpointing

https://gerrit.wikimedia.org/r/920268

Welp, we finally got all the configs and egress rules right, only to discover:

https://curator.apache.org/zk-compatibility-34.html
https://curator.apache.org/breaking-changes.html

ZooKeeper 3.4.x is no longer supported (the associated Compatibility classes/methods have been removed). If you still need to use Curator with ZooKeeper 3.4.x you will need to use a previous version. Click here for details.

Flink uses curator 5.2.0 and zookeeper 3.7.1 clients. We run Zookeeper 3.4.13 server.

In order to use Zookeeper for Flink HA, we need to upgrade our Zookeeper clusters.

Change 922568 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] mw-page-content-change-enrich - disable ZK based HA

https://gerrit.wikimedia.org/r/922568

Change 922568 merged by jenkins-bot:

[operations/deployment-charts@master] mw-page-content-change-enrich - disable ZK based HA

https://gerrit.wikimedia.org/r/922568

Change 922874 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-operator - deploy in wikikube eqiad and codfw

https://gerrit.wikimedia.org/r/922874

Change 922874 merged by jenkins-bot:

[operations/deployment-charts@master] flink-operator - deploy in wikikube eqiad and codfw

https://gerrit.wikimedia.org/r/922874

gmodena renamed this task from Store Flink HA metadata in Zookeeper to [NEEDS GROOMING] Store Flink HA metadata in Zookeeper.Jun 28 2023, 10:35 AM
gmodena updated the task description. (Show Details)
Ahoelzl renamed this task from [NEEDS GROOMING] Store Flink HA metadata in Zookeeper to [Event Platform] [NEEDS GROOMING] Store Flink HA metadata in Zookeeper.Oct 20 2023, 4:59 PM