In T328675 we created a test app for running Flink with the operator pattern. Our next step is to integrate Zookeeper for Flink HA.
AC:
- Update the flink-app deployment chart to use Zookeeper and confirm operation.
In T328675 we created a test app for running Flink with the operator pattern. Our next step is to integrate Zookeeper for Flink HA.
AC:
Status | Subtype | Assigned | Task | ||
---|---|---|---|---|---|
Open | None | T317045 [Epic] Re-architect the Search Update Pipeline | |||
Open | None | T340548 [EPIC] Deployment of the Search Update Pipeline on Flink / k8s | |||
Open | lbowmaker | T328561 Flink Operations | |||
Open | bking | T326409 Migrate the wdqs streaming updater flink jobs to flink-k8s-operator deployment model | |||
Open | bking | T342149 Test common operations in the flink operator/k8s/Flink ZK environment | |||
Resolved | bking | T344614 Add Zookeeper config to 'rdf-streaming-updater' test service on DSE cluster | |||
Resolved | bking | T345957 Restore dse-k8s' rdf-streaming-updater from savepoint/improve bootstrapping process | |||
Resolved | bking | T346048 Troubleshoot rdf-streaming-updater/dse-k8s cluster |
Example configuration from the Flink website :
high-availability.type: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /cluster_one # important: customize per cluster high-availability.storageDir: hdfs:///flink/recovery
We have a couple of different places for rdf-streaming-updater test config within the deployment-charts repo. There's helmfile.d/dse-k8s-services/rdf-streaming-updater/values-dse-k8s-eqiad.yaml ,and helmfile.d/services/rdf-streaming-updater/values-staging.yaml
I'm going to start with the dse cluster at it seems more complete.
Change 951551 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] rdf-streaming-updater-dse-k8s: Add Zookeeper HA
Change 951551 merged by jenkins-bot:
[operations/deployment-charts@master] rdf-streaming-updater-dse-k8s: Add Zookeeper HA
Change 952891 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] flink-app: Increment chart version
Change 952891 merged by jenkins-bot:
[operations/deployment-charts@master] flink-app: Increment chart version
Change 952927 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] flink-app: change HA config for test
Change 952927 merged by jenkins-bot:
[operations/deployment-charts@master] flink-app: change HA config for test
Change 952929 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] flink-app: remove "high-availability.cluster-id" config key
Change 952929 merged by jenkins-bot:
[operations/deployment-charts@master] flink-app: remove "high-availability.cluster-id" config key
I've added config and redeployed rdf-streaming-updater via helmfile on the DSE cluster, but I don't see any evidence that the new deployment is using Zookeeper. I haven't had time to troubleshoot in depth, but here's some ideas:
Not too much to show here. Basically, when ZK is properly managing HA, there are "znodes" (paths) referencing the managed service.
We've enabled egress traffic, and I can confirm that the kube workers can reach the ZK cluster hosts.
I still haven't found anything useful in logstash or kube logs yet.
I can confirm that the Flink app is writing to its HA storagedir by looking at Swift:
swiftly get rdf-streaming-updater-staging | grep zook k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/blob/job_66699c279a65adebcdb367d1396e09bb/blob_p-00843054bdf5fce5eec60b4d7555b8d598424013-8822f62734185d3dc261a96a887128ac k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/completedCheckpoint5d6b5562e23f k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/submittedJobGraph03706fb54771
Still running down the leads I mentioned in the last update.
Change 953688 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] rdf-streaming-updater: Update egress rules for ZK
Change 953688 merged by jenkins-bot:
[operations/deployment-charts@master] rdf-streaming-updater: Update egress rules for ZK
I don't see znodes on the new cluster:
elukey@flink-zk1001:~$ sudo -u zookeeper /usr/share/zookeeper/bin/zkCli.sh SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Connecting to localhost:2181 Welcome to ZooKeeper! JLine support is disabled SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". SLF4J: Defaulting to no-operation MDCAdapter implementation. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. WATCHER:: WatchedEvent state:SyncConnected type:None path:null ls / [zookeeper]
From the logs on taskmanager/etc.. I'd expect to find a /flink znode:
{"@timestamp":"2023-09-01T06:54:02.056Z","log.level": "INFO","message":"Loading configuration property: high-availability.zookeeper.path.root, /flink", "ecs.version": "1.2.0","process.thread.name":"main","log.logger":"org.apache.flink.configuration.GlobalConfiguration"}
I checked via nsenter and both taskmanager and app pods are able to contact the zookeeper nodes on port 2181.
Something odd:
root@deploy1002:~# kubectl logs flink-app-wdqs-c7f6bff77-xlgs5 -n rdf-streaming-updater flink-main-container | grep RESOURCE_PARAMS | sed -e "s/\[\] \-/'\n'/g" | grep high ' Loading configuration property: high-availability.cluster-id, flink-app-wdqs\nINFO ' ' Loading configuration property: high-availability.zookeeper.path.root, /flink\nINFO ' ' Loading configuration property: high-availability.storageDir, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage\nINFO ' ' Loading configuration property: high-availability.zookeeper.quorum, flink-zk1001.eqiad.wmnet:2181,flink-zk1002.eqiad.wmnet:2181,flink-zk1003.eqiad.wmnet:2181\nINFO ' ' Loading configuration property: high-availability.type, ZOOKEEPER\nINFO ' ' Loading configuration property: high-availability.jobmanager.port, 6123\nINFO ' ' Loading configuration property: high-availability, KUBERNETES\nINFO '
The high-availability property seems to be set to KUBERNETES after ZOOKEEPER, is it expected? In theory the high-availability property shouldn't have any effect (I don't find traces of it in the docs), but it is confusing anyway.
Change 954957 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] rdf-streaming-updater: Resolve contradictory configs
Change 954957 merged by jenkins-bot:
[operations/deployment-charts@master] rdf-streaming-updater: Resolve contradictory configs
Change 954985 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] rdf-streaming-updater: reduce job managers from 3 to 1
Change 954985 merged by jenkins-bot:
[operations/deployment-charts@master] rdf-streaming-updater: reduce job managers from 3 to 1
Change 955969 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] dse-k8s-services: init rdf-streaming-updater
Change 955969 merged by jenkins-bot:
[operations/deployment-charts@master] dse-k8s-services: init rdf-streaming-updater
The flink-app in dse-k8s is healthy again, but I have no evidence that it's talking to Zookeeper. I ran the same logging command as @elukey did above, but unlike him I couldn't find any references to high availability at all.
Firewall rules could be a factor too: we supposedly enabled inbound access to the ZK hosts from dse-k8s in this patch , and I can see the rules from that PR on flink-zk1001 at /etc/ferm/conf.d/10_zookeeper. But iptables -L -v -n does not show the IP range for dse-k8s-worker1001 (where the flink job manager is running).
Regardless, I'm not seeing any port 2181 traffic generated by the dse-k8s-worker, so even if the FW rules are wrong, we should still see some traffic.
I checked the pods' logs and I see the high-availability settings, it seems ok afaics.
Firewall rules could be a factor too: we supposedly enabled inbound access to the ZK hosts from dse-k8s in this patch , and I can see the rules from that PR on flink-zk1001 at /etc/ferm/conf.d/10_zookeeper. But iptables -L -v -n does not show the IP range for dse-k8s-worker1001 (where the flink job manager is running).
Regardless, I'm not seeing any port 2181 traffic generated by the dse-k8s-worker, so even if the FW rules are wrong, we should still see some traffic.
The firewall rules are ok, you don't have host-specific IPs but ranges (the following is the DSE one registered in puppet):
elukey@flink-zk1001:~$ sudo iptables -L -v -n | grep 10.67.24.0/21 2 120 ACCEPT 6 -- * * 10.67.24.0/21 0.0.0.0/0 tcp dpt:2181 0 0 ACCEPT 6 -- * * 10.67.24.0/21 0.0.0.0/0 tcp dpt:2182 0 0 ACCEPT 6 -- * * 10.67.24.0/21 0.0.0.0/0 tcp dpt:2183
I also tested some pods via nsenter and I can connect to the IP:2181 port of flink-zk1001, so it shouldn't be a firewall issue..
The flink cluster in eqiad looks healthy:
elukey@flink-zk1001:~$ echo "srvr" | nc localhost 2181 Zookeeper version: 3.8.0-${mvngit.commit.id}, built on 2023-04-06 13:44 UTC Latency min/avg/max: 0/0.0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x100000043 Mode: follower Node count: 6 elukey@flink-zk1001:~$ echo "srvr" | nc flink-zk1002.eqiad.wmnet 2181 Zookeeper version: 3.8.0-${mvngit.commit.id}, built on 2023-04-06 13:44 UTC Latency min/avg/max: 1/5.1/13 Received: 11 Sent: 10 Connections: 1 Outstanding: 0 Zxid: 0x100000043 Mode: follower Node count: 6 elukey@flink-zk1001:~$ echo "srvr" | nc flink-zk1003.eqiad.wmnet 2181 Zookeeper version: 3.8.0-${mvngit.commit.id}, built on 2023-04-06 13:44 UTC Latency min/avg/max: 0/0.7987/9 Received: 448 Sent: 447 Connections: 1 Outstanding: 0 Zxid: 0x100000043 Mode: leader Node count: 6 Proposal sizes last/min/max: 48/48/109
@bking another thing to verify:
root@deploy1002:~# kubectl logs flink-app-wdqs-54cd5c5567-zjq7r -n rdf-streaming-updater flink-main-container | grep RESOURCE_PARAMS | sed -e "s/\[\] \-/'\n'/g" | grep s3 ' Loading configuration property: high-availability.storageDir, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage\nINFO ' ' Loading configuration property: s3.secret-key, ******\nINFO ' ' Loading configuration property: execution.savepoint.path, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata/savepoints/init_20230904\nINFO ' ' Loading configuration property: s3.access-key, wdqs:savepoints\nINFO ' ' Loading configuration property: job-result-store.storage-path, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/job-result-store/flink-app-wdqs/2b196e5c-3e73-4160-a5fd-732f38a177d3\nINFO ' ' Loading configuration property: state.savepoints.dir, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata/savepoints\nINFO ' ' Loading configuration property: s3.endpoint, thanos-swift.discovery.wmnet\nINFO ' ' Loading configuration property: s3.path.style.access, true\nINFO ' ' Loading configuration property: state.checkpoints.dir, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata/checkpoints\nINFO '
I checked the above and I do see s3 being used correctly:
elukey@stat1004:~$ sudo s3cmd -H -c dse-rdf-thanos.cfg ls s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage DIR s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/ elukey@stat1004:~$ sudo s3cmd -H -c dse-rdf-thanos.cfg ls s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/ DIR s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/ elukey@stat1004:~$ sudo s3cmd -H -c dse-rdf-thanos.cfg ls s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/ DIR s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/blob/ 2023-08-30 15:56 444K s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/completedCheckpoint4f5270d4cd2a 2023-08-28 20:58 678K s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/submittedJobGraph03706fb54771 2023-09-01 06:49 678K s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/submittedJobGraph2aa41b8edb69
It is a little weird to see rdf-streaming-updater-staging used for DSE too, but afaics the paths shouldn't mess with wikikube's staging, so in theory it should be good, but I would differentiate them in a better way.
IIUC metadata is saved on the s3 path, and the JobManagers coordinate via Zookeeper when needed. My suggestion is to start a tcp dump for port 2181 on the flink-zk nodes, and force Flink on dse to do something (like process data etc..). It may be that the zookeeper Znodes are only added afterwards, and not straight from the start.
@bking I had a chat with @dcausse and from https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/ha/zookeeper_ha/#example-configuration (the 1.16 docs) it seems that an example config is:
high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /cluster_one # important: customize per cluster high-availability.storageDir: hdfs:///flink/recovery
We set high-availability.type, that is not supported in 1.16. Moreover we should also set the cluster-id too.
We set high-availability.type, that is not supported in 1.16. Moreover we should also set the cluster-id too.
FWIW we have base Flink 1.17 docker images available (that's what the content enrichment app builds on) at https://docker-registry.wikimedia.org/flink/tags/.
Change 957298 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] flink-app: Correct ZK config for dse-k8s
Change 957298 merged by jenkins-bot:
[operations/deployment-charts@master] flink-app: Correct ZK config for dse-k8s
Change 957303 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] flink-app: remove high-availability.cluster-id
Change 957303 merged by jenkins-bot:
[operations/deployment-charts@master] flink-app: remove high-availability.cluster-id
Change 957951 had a related patch set uploaded (by Bking; author: Bking):
[operations/deployment-charts@master] dse-k8s: Add egress rules for flink-operator
Change 957951 merged by Bking:
[operations/deployment-charts@master] dse-k8s: Add egress rules for flink-operator
I'm happy to report that the flink-operator is connected to Zookeeper!
We can see the znodes now:
ls -R /flink/flink-app-wdqs/jobs
/ad699cfb0eb2c53365df1d982b806b70 /flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70 /flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoint_id_counter /flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints /flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints/0000000000000 000013 /flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints/0000000000000 000013/locks /flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints/0000000000000 000013/locks/83d10ca5-765b-46fc-83e7-00a1f5273e20 get /flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints/000000000 0000000013/locks/83d10ca5-765b-46fc-83e7-00a1f5273e20
Thanks to everyone who helped out on this.
Moving this ticket to done. Operations/chaos engineering testing continues in T342149 .