Page MenuHomePhabricator

Test common operations in the flink operator/k8s/Flink ZK environment
Open, In Progress, HighPublic

Description

Per T328561 , we need to consider and document the different aspects of operating a Flink application (for example: how to handle restarts, upgrades, etc) in the new flink operator/k8s/Flink ZK environment we are currently building .

AC:

Anything that's already done today:

  • Initial deployment of the Flink job
  • Version upgrade of the Flink job
  • Restart the jobs without upgrading: ran kubectl rollout restart deployment flink-app-wdqs on dse-k8s, the Flink jobmanager was able to recover without human intervention.
  • Recovery on Flink failure (restarts in the same place). We have successfully restored from both checkpoints and savepoints. Documented here
  • Test running multiple flink apps controlled by the same flink operator, using the same Zookeeper instance for Flink HA. Crossing out as this doesn't seem to be a blocker; we have confirmation from the Flink mailing list and personal observation that the Flink Kubernetes Operator uses the name of the helm deployment as the cluster id, so namespace collision shouldn't be a problem.
  • Stop the operator and see what happens to the application.

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript
Gehel triaged this task as High priority.Jul 24 2023, 3:30 PM
Gehel moved this task from needs triage to Current work on the Discovery-Search board.
Gehel edited projects, added Discovery-Search (Current work); removed Discovery-Search.
Gehel set Final Story Points to 8.
bking added a subscriber: tchin.

Some observed behavior from T344614 ,

Flink-app will start even when HA is misconfigured, which could lead the operator to believe HA was working when it actually wasn't. Is there a way to test/replicate this?

Change 957967 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] rdf-streaming-updater: start adding per-env ZK path root

https://gerrit.wikimedia.org/r/957967

Once the patch above is merged, I think we'll need to do a little operations work:

  • Stop the app (helmfile destroy)
  • Copy the /flink znode data to our new path /dse-k8s-eqiad/rdf-streaming-updater/wdqs*
  • Redeploy the app

*I'm not sure how to do this. Some ZK CLI/telnet interface commands that work on earlier Zookeepers don't seem to work on our flink-zk Bookworm installation (3.8.0, as opposed to 3.4.13 on Bullseye).
I haven't had time to look closely, but I think it's related to the Four Letter Word restrictions as listed here . That being said, get doesn't seem to work either. Corrected, this was a matter of understanding what the get command actually does. Still unsure how to dump/restore znode data though.

Change 957967 merged by Bking:

[operations/deployment-charts@master] rdf-streaming-updater: start adding per-env ZK path root

https://gerrit.wikimedia.org/r/957967

Change 959790 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] dse-k8s: Trigger flink-app savepoint

https://gerrit.wikimedia.org/r/959790

Change 959790 merged by jenkins-bot:

[operations/deployment-charts@master] dse-k8s: Trigger flink-app savepoint

https://gerrit.wikimedia.org/r/959790

Change 960080 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] dse-k8s: Manually restore flink-app

https://gerrit.wikimedia.org/r/960080

Change 960080 merged by Bking:

[operations/deployment-charts@master] dse-k8s: Manually restore flink-app

https://gerrit.wikimedia.org/r/960080

Change 960090 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] dse-k8s: Trigger flink-app savepoint

https://gerrit.wikimedia.org/r/960090

Change 960087 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] dse-k8s: Trigger flink-app savepoint

https://gerrit.wikimedia.org/r/960087

Change 960090 abandoned by Bking:

[operations/deployment-charts@master] dse-k8s: Trigger flink-app savepoint

Reason:

patch already covered by 960087

https://gerrit.wikimedia.org/r/960090

Change 960087 merged by Bking:

[operations/deployment-charts@master] dse-k8s: Trigger flink-app savepoint

https://gerrit.wikimedia.org/r/960087

Change 963057 had a related patch set uploaded (by DCausse; author: DCausse):

[operations/deployment-charts@master] rdf-streaming-updater: take a savepoint manually

https://gerrit.wikimedia.org/r/963057

Change 963057 merged by jenkins-bot:

[operations/deployment-charts@master] rdf-streaming-updater: take a savepoint manually

https://gerrit.wikimedia.org/r/963057

Moving to blocked, as we cannot start the test service until T326914 is resolved.

bking changed the task status from Open to In Progress.Oct 5 2023, 2:59 PM

We're unblocked now, and we were able to test some flink operations.

  • Operation: Temporarily roll back breaking chart changes without rolling back in SCM.
    • Result: Success.
  • Operation: Stop the service with a savepoint
    • Result: Success, but due to recent chart changes the s3 bucket path changed. We had to manually create the bucket using swiftly head ${bucket} before the application would restart cleanly. The error manifested as a 404.
  • Operation: Delete flink-operator deployment.
    • Result: The flink application continued to work.
  • Operation: Delete flink taskmanager pod while flink-operator was not running.
    • Result: Kubernetes created a new taskmanager pod and the application continued to work.
  • Operation: Redeploy flink-operator
    • Result: We had some trouble getting this to work. Destroying the deploy leaves some resources behind. To redeploy correctly, it's necessary to:
      • helmfile -e dse-k8s-eqiad -l name=flink-operator -i destroy from the admin_ng directory, followed by
      • helmfile -e dse-k8s-eqiad -l name=flink-operator -i apply

You can verify the health of the application by looking at the logs: kubectl logs -l component=jobmanager -c flink-main-container -f. If the logs show triggered and completed savepoints, the application is healthy:

{"@timestamp":"2023-10-05T14:55:45.723Z","log.level": "INFO","message":"Triggering checkpoint 35130 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1696517745715 for job 57136eb7d596a1739171fe7b360efd02.", "ecs.version": "1.2.0","process.thread.name":"Checkpoint Timer","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator"}

{"@timestamp":"2023-10-05T14:58:19.441Z","log.level": "INFO","message":"Marking checkpoint 35130 as completed for source Source: KafkaSource:eqiad.mediawiki.page-suppress.", "ecs.version": "1.2.0","process.thread.name":"SourceCoordinator-Source: KafkaSource:eqiad.mediawiki.page-suppress","log.logger":"org.apache.flink.runtime.source.coordinator.SourceCoordinator"}
bking moved this task from Blocked / Waiting to Done on the Data-Platform-SRE board.

At this point, I am confident that we have enough information to properly manage the service, and I believe we are ready to start deploying in staging. As such, I'm resolving this ticket. However, if @dcausse , @pfischer , @gmodena or anyone else feel like more testing is needed, please feel free to reopen.

@bking thanks for taking care of this! Something I can't remember if this was done or not is simulating a k8s upgrade and see if the job recovers from its latest checkpoint (hopefully still available in zk).
The procedure could look like:

  • have the job running
  • undeploy
  • inspect the namespace and make sure that it's empty
  • redeploy the job and test if it picks up the latest checkpoint from zookeeper

@bking thanks for taking care of this! Something I can't remember if this was done or not is simulating a k8s upgrade and see if the job recovers from its latest checkpoint (hopefully still available in zk).
The procedure could look like:

  • have the job running
  • undeploy
  • inspect the namespace and make sure that it's empty
  • redeploy the job and test if it picks up the latest checkpoint from zookeeper

Tested this procedure above and it did not work.
Undeploying via helmfile probably told the flink-k8s-operator to cancel the job and it might have discarded the H/A metadata because when re-deploying it attempted to start from the initialSavePointPath provided in that values.yaml file instead of the latest known checkpoint.
Recovering the job required going thru the Manual recovery procedure (identifying the checkpoint manually) and deploying with:
helmfile -e dse-k8s-eqiad --selector name=wikidata -i apply --set app.job.initialSavepointPath=s3://rdf-streaming-updater-staging-k8s-op-test-dse/wikidata/checkpoints/57136eb7d596a1739171fe7b360efd02/chk-37424
So I think we still need to figure out what is the procedure to apply during k8s cluster upgrades.

I think the semantic of destroying the flinkdeployment resource is to get rid of the job and this is what the flink k8s operator will do.
So the k8s upgrade procedure must not undeploy these (at least not while the k8s operator is running).
We might ask service ops what happens during k8s upgrades and how they bring down the cluster, if during that step they undeploy individual services (via helmfile destroy) this might not work for flink-app. If not we should try to simulate what they'll be doing to see if the native recovery mechanism will survive a re-deploy.

@JMeybohm Is it possible for us to simulate a cluster upgrade (maybe by running the upgrade cookbook against a testing cluster?) Based on what @elukey mentioned in #wikimedia-k8s-sig IRC, it sounds like we'll be OK. But it would definitely be better to have more certainty before we're in production.

It should be possible to do the cluster upgrade procedure without an actual update, yes ("simulation" does not seem appropriate though as you will still delete all of etcd). But I still wonder if what @dcausse describes is desired. Maybe the operator can be instructed to never "purge" data? Seems a bit more safe to me.

It should be possible to do the cluster upgrade procedure without an actual update, yes ("simulation" does not seem appropriate though as you will still delete all of etcd). But I still wonder if what @dcausse describes is desired. Maybe the operator can be instructed to never "purge" data? Seems a bit more safe to me.

The operator will purge the H/A metadata if the flindeployment is explicitly removed, I think this makes sense from the flink-k8s-operator perspective. It does not appear to have an option to tell it to keep the H/A metadata even-though we tell it to delete the deployment, the sole option it has is to take a very last savepoint before the cleanup.

@JMeybohm are all services (after beeing depooled) explicitly "undeployed" (via helmfile destroy) as part of the procedure to bring the upgraded k8s cluster down or are they going to simply die because of lack worker nodes available?

The operator will purge the H/A metadata if the flindeployment is explicitly removed, I think this makes sense from the flink-k8s-operator perspective. It does not appear to have an option to tell it to keep the H/A metadata even-though we tell it to delete the deployment, the sole option it has is to take a very last savepoint before the cleanup.

Makes sense indeed.

@JMeybohm are all services (after beeing depooled) explicitly "undeployed" (via helmfile destroy) as part of the procedure to bring the upgraded k8s cluster down or are they going to simply die because of lack worker nodes available?

No actions will be taken to undeploy anything at all. After depooling the workload the etcd contents will be wiped and the workload will just die at some point (when the it's running on is re-imaged at latest).

@JMeybohm are all services (after beeing depooled) explicitly "undeployed" (via helmfile destroy) as part of the procedure to bring the upgraded k8s cluster down or are they going to simply die because of lack worker nodes available?

No actions will be taken to undeploy anything at all. After depooling the workload the etcd contents will be wiped and the workload will just die at some point (when the it's running on is re-imaged at latest).

Thanks! so I believe we should be good, the H/A metadata can't go away if the operator is not explicitly told to do so. It is my test scenario (using helmfile destroy) that was not correct.

To be certain that it'll work I guess we could manually bring this workload down (the operator and the rdf-streaming-updater job) and then manually cleanup the rdf-streaming-updater namespace while the operator is down.

bking moved this task from Done to In Progress on the Data-Platform-SRE board.

We might still want to test a k8s upgrade, but this should not be blocking moving to production. Worst case, we can do a manual recovery.