Page MenuHomePhabricator

Add Zookeeper config to 'rdf-streaming-updater' test service on DSE cluster
Closed, ResolvedPublic5 Estimated Story Points

Description

In T328675 we created a test app for running Flink with the operator pattern. Our next step is to integrate Zookeeper for Flink HA.

AC:

  • Update the flink-app deployment chart to use Zookeeper and confirm operation.

Event Timeline

Gehel triaged this task as High priority.Aug 21 2023, 3:13 PM
Gehel moved this task from needs triage to Current work on the Discovery-Search board.
Gehel edited projects, added Discovery-Search (Current work); removed Discovery-Search.
Gehel set the point value for this task to 5.Aug 21 2023, 3:38 PM

Example configuration from the Flink website :

high-availability.type: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: hdfs:///flink/recovery
bking renamed this task from Add Zookeeper config to 'flink-app' test service to Add Zookeeper config to 'rdf-streaming updater' test service on DSE cluster.Aug 21 2023, 3:57 PM
bking renamed this task from Add Zookeeper config to 'rdf-streaming updater' test service on DSE cluster to Add Zookeeper config to 'rdf-streaming-updater' test service on DSE cluster.Aug 21 2023, 4:25 PM

We have a couple of different places for rdf-streaming-updater test config within the deployment-charts repo. There's helmfile.d/dse-k8s-services/rdf-streaming-updater/values-dse-k8s-eqiad.yaml ,and helmfile.d/services/rdf-streaming-updater/values-staging.yaml

I'm going to start with the dse cluster at it seems more complete.

Change 951551 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] rdf-streaming-updater-dse-k8s: Add Zookeeper HA

https://gerrit.wikimedia.org/r/951551

Change 951551 merged by jenkins-bot:

[operations/deployment-charts@master] rdf-streaming-updater-dse-k8s: Add Zookeeper HA

https://gerrit.wikimedia.org/r/951551

Change 952891 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] flink-app: Increment chart version

https://gerrit.wikimedia.org/r/952891

Change 952891 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app: Increment chart version

https://gerrit.wikimedia.org/r/952891

Change 952927 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] flink-app: change HA config for test

https://gerrit.wikimedia.org/r/952927

Change 952927 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app: change HA config for test

https://gerrit.wikimedia.org/r/952927

Change 952929 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] flink-app: remove "high-availability.cluster-id" config key

https://gerrit.wikimedia.org/r/952929

Change 952929 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app: remove "high-availability.cluster-id" config key

https://gerrit.wikimedia.org/r/952929

I've added config and redeployed rdf-streaming-updater via helmfile on the DSE cluster, but I don't see any evidence that the new deployment is using Zookeeper. I haven't had time to troubleshoot in depth, but here's some ideas:

  • Look at working Zookeeper instances, such as the ones that manage Druid HA

Not too much to show here. Basically, when ZK is properly managing HA, there are "znodes" (paths) referencing the managed service.

  • Determine which container is the JobManager leader and tcpdump for traffic from flink-zk hosts

We've enabled egress traffic, and I can confirm that the kube workers can reach the ZK cluster hosts.

  • Keep looking in Logstash/kubectl logs for further hints

I still haven't found anything useful in logstash or kube logs yet.

I can confirm that the Flink app is writing to its HA storagedir by looking at Swift:

swiftly get rdf-streaming-updater-staging | grep zook
k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/blob/job_66699c279a65adebcdb367d1396e09bb/blob_p-00843054bdf5fce5eec60b4d7555b8d598424013-8822f62734185d3dc261a96a887128ac
k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/completedCheckpoint5d6b5562e23f
k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/submittedJobGraph03706fb54771

Still running down the leads I mentioned in the last update.

Change 953688 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] rdf-streaming-updater: Update egress rules for ZK

https://gerrit.wikimedia.org/r/953688

Change 953688 merged by jenkins-bot:

[operations/deployment-charts@master] rdf-streaming-updater: Update egress rules for ZK

https://gerrit.wikimedia.org/r/953688

I don't see znodes on the new cluster:

elukey@flink-zk1001:~$ sudo -u zookeeper /usr/share/zookeeper/bin/zkCli.sh 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /
[zookeeper]

From the logs on taskmanager/etc.. I'd expect to find a /flink znode:

{"@timestamp":"2023-09-01T06:54:02.056Z","log.level": "INFO","message":"Loading configuration property: high-availability.zookeeper.path.root, /flink", "ecs.version": "1.2.0","process.thread.name":"main","log.logger":"org.apache.flink.configuration.GlobalConfiguration"}

I checked via nsenter and both taskmanager and app pods are able to contact the zookeeper nodes on port 2181.

Something odd:

root@deploy1002:~# kubectl logs flink-app-wdqs-c7f6bff77-xlgs5 -n rdf-streaming-updater flink-main-container | grep RESOURCE_PARAMS | sed -e "s/\[\] \-/'\n'/g" | grep high
' Loading configuration property: high-availability.cluster-id, flink-app-wdqs\nINFO  '
' Loading configuration property: high-availability.zookeeper.path.root, /flink\nINFO  '
' Loading configuration property: high-availability.storageDir, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage\nINFO  '
' Loading configuration property: high-availability.zookeeper.quorum, flink-zk1001.eqiad.wmnet:2181,flink-zk1002.eqiad.wmnet:2181,flink-zk1003.eqiad.wmnet:2181\nINFO  '
' Loading configuration property: high-availability.type, ZOOKEEPER\nINFO  '
' Loading configuration property: high-availability.jobmanager.port, 6123\nINFO  '
' Loading configuration property: high-availability, KUBERNETES\nINFO  '

The high-availability property seems to be set to KUBERNETES after ZOOKEEPER, is it expected? In theory the high-availability property shouldn't have any effect (I don't find traces of it in the docs), but it is confusing anyway.

Change 954957 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] rdf-streaming-updater: Resolve contradictory configs

https://gerrit.wikimedia.org/r/954957

Change 954957 merged by jenkins-bot:

[operations/deployment-charts@master] rdf-streaming-updater: Resolve contradictory configs

https://gerrit.wikimedia.org/r/954957

Change 954985 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] rdf-streaming-updater: reduce job managers from 3 to 1

https://gerrit.wikimedia.org/r/954985

Change 954985 merged by jenkins-bot:

[operations/deployment-charts@master] rdf-streaming-updater: reduce job managers from 3 to 1

https://gerrit.wikimedia.org/r/954985

Change 955969 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] dse-k8s-services: init rdf-streaming-updater

https://gerrit.wikimedia.org/r/955969

Change 955969 merged by jenkins-bot:

[operations/deployment-charts@master] dse-k8s-services: init rdf-streaming-updater

https://gerrit.wikimedia.org/r/955969

The flink-app in dse-k8s is healthy again, but I have no evidence that it's talking to Zookeeper. I ran the same logging command as @elukey did above, but unlike him I couldn't find any references to high availability at all.

Firewall rules could be a factor too: we supposedly enabled inbound access to the ZK hosts from dse-k8s in this patch , and I can see the rules from that PR on flink-zk1001 at /etc/ferm/conf.d/10_zookeeper. But iptables -L -v -n does not show the IP range for dse-k8s-worker1001 (where the flink job manager is running).

Regardless, I'm not seeing any port 2181 traffic generated by the dse-k8s-worker, so even if the FW rules are wrong, we should still see some traffic.

The flink-app in dse-k8s is healthy again, but I have no evidence that it's talking to Zookeeper. I ran the same logging command as @elukey did above, but unlike him I couldn't find any references to high availability at all.

I checked the pods' logs and I see the high-availability settings, it seems ok afaics.

Firewall rules could be a factor too: we supposedly enabled inbound access to the ZK hosts from dse-k8s in this patch , and I can see the rules from that PR on flink-zk1001 at /etc/ferm/conf.d/10_zookeeper. But iptables -L -v -n does not show the IP range for dse-k8s-worker1001 (where the flink job manager is running).

Regardless, I'm not seeing any port 2181 traffic generated by the dse-k8s-worker, so even if the FW rules are wrong, we should still see some traffic.

The firewall rules are ok, you don't have host-specific IPs but ranges (the following is the DSE one registered in puppet):

elukey@flink-zk1001:~$ sudo iptables -L -v -n | grep 10.67.24.0/21
    2   120 ACCEPT     6    --  *      *       10.67.24.0/21        0.0.0.0/0            tcp dpt:2181
    0     0 ACCEPT     6    --  *      *       10.67.24.0/21        0.0.0.0/0            tcp dpt:2182
    0     0 ACCEPT     6    --  *      *       10.67.24.0/21        0.0.0.0/0            tcp dpt:2183

I also tested some pods via nsenter and I can connect to the IP:2181 port of flink-zk1001, so it shouldn't be a firewall issue..

The flink cluster in eqiad looks healthy:

elukey@flink-zk1001:~$ echo "srvr" | nc localhost 2181
Zookeeper version: 3.8.0-${mvngit.commit.id}, built on 2023-04-06 13:44 UTC
Latency min/avg/max: 0/0.0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000043
Mode: follower
Node count: 6

elukey@flink-zk1001:~$ echo "srvr" | nc flink-zk1002.eqiad.wmnet 2181
Zookeeper version: 3.8.0-${mvngit.commit.id}, built on 2023-04-06 13:44 UTC
Latency min/avg/max: 1/5.1/13
Received: 11
Sent: 10
Connections: 1
Outstanding: 0
Zxid: 0x100000043
Mode: follower
Node count: 6

elukey@flink-zk1001:~$ echo "srvr" | nc flink-zk1003.eqiad.wmnet 2181
Zookeeper version: 3.8.0-${mvngit.commit.id}, built on 2023-04-06 13:44 UTC
Latency min/avg/max: 0/0.7987/9
Received: 448
Sent: 447
Connections: 1
Outstanding: 0
Zxid: 0x100000043
Mode: leader
Node count: 6
Proposal sizes last/min/max: 48/48/109

@bking another thing to verify:

root@deploy1002:~# kubectl logs flink-app-wdqs-54cd5c5567-zjq7r -n rdf-streaming-updater flink-main-container | grep RESOURCE_PARAMS | sed -e "s/\[\] \-/'\n'/g" | grep s3
' Loading configuration property: high-availability.storageDir, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage\nINFO  '
' Loading configuration property: s3.secret-key, ******\nINFO  '
' Loading configuration property: execution.savepoint.path, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata/savepoints/init_20230904\nINFO  '
' Loading configuration property: s3.access-key, wdqs:savepoints\nINFO  '
' Loading configuration property: job-result-store.storage-path, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/job-result-store/flink-app-wdqs/2b196e5c-3e73-4160-a5fd-732f38a177d3\nINFO  '
' Loading configuration property: state.savepoints.dir, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata/savepoints\nINFO  '
' Loading configuration property: s3.endpoint, thanos-swift.discovery.wmnet\nINFO  '
' Loading configuration property: s3.path.style.access, true\nINFO  '
' Loading configuration property: state.checkpoints.dir, s3://rdf-streaming-updater-staging/k8s_op_test_dse/wikidata/checkpoints\nINFO  '

I checked the above and I do see s3 being used correctly:

elukey@stat1004:~$ sudo s3cmd -H -c dse-rdf-thanos.cfg ls s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage
                    DIR  s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/

elukey@stat1004:~$ sudo s3cmd -H -c dse-rdf-thanos.cfg ls s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/
                    DIR  s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/

elukey@stat1004:~$ sudo s3cmd -H -c dse-rdf-thanos.cfg ls s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/
                    DIR  s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/blob/
2023-08-30 15:56   444K  s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/completedCheckpoint4f5270d4cd2a
2023-08-28 20:58   678K  s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/submittedJobGraph03706fb54771
2023-09-01 06:49   678K  s3://rdf-streaming-updater-staging/k8s_op_test_dse/wdqs/zookeeper_ha_storage/flink-app-wdqs/submittedJobGraph2aa41b8edb69

It is a little weird to see rdf-streaming-updater-staging used for DSE too, but afaics the paths shouldn't mess with wikikube's staging, so in theory it should be good, but I would differentiate them in a better way.

IIUC metadata is saved on the s3 path, and the JobManagers coordinate via Zookeeper when needed. My suggestion is to start a tcp dump for port 2181 on the flink-zk nodes, and force Flink on dse to do something (like process data etc..). It may be that the zookeeper Znodes are only added afterwards, and not straight from the start.

@bking I had a chat with @dcausse and from https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/ha/zookeeper_ha/#example-configuration (the 1.16 docs) it seems that an example config is:

high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: hdfs:///flink/recovery

We set high-availability.type, that is not supported in 1.16. Moreover we should also set the cluster-id too.

We set high-availability.type, that is not supported in 1.16. Moreover we should also set the cluster-id too.

FWIW we have base Flink 1.17 docker images available (that's what the content enrichment app builds on) at https://docker-registry.wikimedia.org/flink/tags/.

Change 957298 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] flink-app: Correct ZK config for dse-k8s

https://gerrit.wikimedia.org/r/957298

Change 957298 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app: Correct ZK config for dse-k8s

https://gerrit.wikimedia.org/r/957298

Change 957303 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] flink-app: remove high-availability.cluster-id

https://gerrit.wikimedia.org/r/957303

Change 957303 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app: remove high-availability.cluster-id

https://gerrit.wikimedia.org/r/957303

Change 957951 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] dse-k8s: Add egress rules for flink-operator

https://gerrit.wikimedia.org/r/957951

Change 957951 merged by Bking:

[operations/deployment-charts@master] dse-k8s: Add egress rules for flink-operator

https://gerrit.wikimedia.org/r/957951

I'm happy to report that the flink-operator is connected to Zookeeper!

We can see the znodes now:
ls -R /flink/flink-app-wdqs/jobs

/ad699cfb0eb2c53365df1d982b806b70
/flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70
/flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoint_id_counter
/flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints
/flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints/0000000000000
000013
/flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints/0000000000000
000013/locks
/flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints/0000000000000
000013/locks/83d10ca5-765b-46fc-83e7-00a1f5273e20
get /flink/flink-app-wdqs/jobs/ad699cfb0eb2c53365df1d982b806b70/checkpoints/000000000
0000000013/locks/83d10ca5-765b-46fc-83e7-00a1f5273e20

Thanks to everyone who helped out on this.

Moving this ticket to done. Operations/chaos engineering testing continues in T342149 .