Page MenuHomePhabricator

Flink on Kubernetes Helm charts
Closed, ResolvedPublic

Description

flink-kubernetes-operator chart

Create a helm chart that we can use to install and manage the flink-kubernetes-operator. This will mean adapting the upstream helm chart to fit in our deployment-charts repo.

This task is similar to the work done or the spark operator in T318926: Deploy spark-operator to the dse-k8s cluster.

flink application chart

Create a flink-application chart that can be used to deploy Flink applications using the FlinkDeployment CRD deployed as part of the flink-kubernetes-operator

Details

Other Assignee
bking
SubjectRepoBranchLines +/-
operations/deployment-chartsmaster+1 -1
operations/deployment-chartsmaster+9 -58
operations/docker-images/production-imagesmaster+0 -1
operations/docker-images/production-imagesmaster+8 -1
operations/deployment-chartsmaster+9 -16
operations/deployment-chartsmaster+37 -2
operations/deployment-chartsmaster+1 -1
operations/deployment-chartsmaster+163 -23
operations/deployment-chartsmaster+14 -8
operations/deployment-chartsmaster+10 -7
operations/deployment-chartsmaster+10 -2
operations/deployment-chartsmaster+66 -14
operations/deployment-chartsmaster+17 -1
operations/deployment-chartsmaster+1 -1
operations/deployment-chartsmaster+101 -2
operations/deployment-chartsmaster+59 -2
operations/deployment-chartsmaster+3 -0
operations/deployment-chartsmaster+11 -3
operations/deployment-chartsmaster+161 -0
operations/deployment-chartsmaster+1 K -0
operations/deployment-chartsmaster+5 -5
operations/deployment-chartsmaster+145 -20
operations/deployment-chartsmaster+10 K -0
Show related patches Customize query in gerrit

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

Ah, but the upstream helm chart does not work with this feature because of its use of subPath in volumeMounts. volumeMounts that use subPath are never automatically updated.

We can of course fix this in our copy of the helm chart, but I've gone ahead and filed a bug and pull request with upstream to fix. We'll see.

They merged my fix! :D

Q for @dcausse and @gmodena.

I've thus far been making Flink logs go only to the console in ECS format. The console logs are picked up by k8s and available via kubectl logs, and in production should automatically be shipped to logstash (IIUC). Because there are no log files written in the container, the Flink JobManager UI is not able to show logs in the admin interface in the browser. When trying to access TaskManager logs, I get 'The file LOG does not exist on the TaskExecutor.'.

I think I'd prefer not to write log files to the container at all. I think if we can get logs via kubectl logs and logstash, that should be sufficient, right?

I agree, for the wdqs flink job we do not store any log file either. We use both kubectl logs and logstash (the k8s setup will export labels as fields so it's really handy to filter what you want, e.g. kubernetes.labels.component=jobmanager).
As for the flink UI I rarely (never?) open it, the only useful thing you might get from there is a stack dump but I'm sure you can extract that from the flink REST endpoint (everything that the flink UI offers should be available from the REST endpoint anyways).

I think I'd prefer not to write log files to the container at all. I think if we can get logs via kubectl logs and logstash, that should be sufficient, right?

+1. Ideally I would not even want to get logs kubectl, but access them via Kibana instead.

I wonder if we could run Flink headless in prod. One use case for Web UI is troubleshooting tasks (e.g. identify issues with backpressure).
Maybe if we ship metrics to prometheus we won't need a web interface for that?

I think I'd prefer not to write log files to the container at all. I think if we can get logs via kubectl logs and logstash, that should be sufficient, right?

+1. Ideally I would not even want to get logs kubectl, but access them via Kibana instead.

The way we have it setup, what you get via kubectl logs is the same thing that ends up in grafana for what is worth. That was a design decision.

@JMeybohm re needed ingress and egress.

Ingress: I don't think we need anything, but it would be nice to easily access the Flink admin UI (on default container port 8081). This would be for admin access only. In HA setup, there are multiple JobManager pods that each expose an admin UI. If we had some Service routing/balancing, we'd want to route to the active JobManager, which might be a little complicated to detect? But really, having some easy-enough way for admins to tunnel to a JobManager on port 8081 would suffice.
See also my comment above about built in support for ingress to the JobManager admin UI. I suppose if we can use that, it would be best?

Egress: This is service specific, but commonly Kafka cluster(s) and Swift. If we decide to do HA via Zookeeper instead of ConfigMaps, then we'll need Zookeeper too. Many of our use cases are 'enrichment' pipelines, so they'll talk to the MediaWiki API, and/or possibly other APIs if needed. But, ideally those would be configurable at the helmfile/service level, rather than in the chart itself.

Please advise as to what vendor templates to keep and include in the flink-app chart. Thanks!

Ingress: I don't think we need anything, but it would be nice to easily access the Flink admin UI (on default container port 8081). This would be for admin access only. In HA setup, there are multiple JobManager pods that each expose an admin UI. If we had some Service routing/balancing, we'd want to route to the active JobManager, which might be a little complicated to detect? But really, having some easy-enough way for admins to tunnel to a JobManager on port 8081 would suffice.
See also my comment above about built in support for ingress to the JobManager admin UI. I suppose if we can use that, it would be best?

  1. Kubernetes native Ingress objects

We could potentially get away with just the kubernetes native Ingress object the operator is able to create and the "Simple domain based routing" approach described at https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.3/docs/operations/ingress/ . Everything more complex won't be possible as out ingress implementation does not support configuration by annotations.

The downside of this is that we have not really tested the kubernetes native Ingress objects as we decided to go with Istio CRDs instead (context: T287007#7431081). Ultimately both methods will lead to envoy config being generated and traffic being handled by the same envoy instances (istio-ingressgateway) but we've less experience.

  1. Istio Ingress CRDs

For this to work the ingress module would need to be incorporated somehow, probably kind of like described here: https://wikitech.wikimedia.org/wiki/Kubernetes/Ingress#More_complex_setups
Actually, the operator chart could be extended to deploy a generic ingress config (like release "one" in the docs above) while the flink-app chart could make use of the generic config (being release "two"). Multiple flink jobs could then be addressed via subpaths under the same, generic FQDN (like flink.dse-k8s.discovery.wmnet/namespace/flinkjob).

Egress: This is service specific, but commonly Kafka cluster(s) and Swift. If we decide to do HA via Zookeeper instead of ConfigMaps, then we'll need Zookeeper too. Many of our use cases are 'enrichment' pipelines, so they'll talk to the MediaWiki API, and/or possibly other APIs if needed. But, ideally those would be configurable at the helmfile/service level, rather than in the chart itself.

For Egress access the Pod executing the actual job should be extended with the service-proxy (https://wikitech.wikimedia.org/wiki/Envoy#Services_Proxy) like we do with regular services (so that everything external can be accessed via the service-proxy, providing metrics and consistency with everything else). This will require adding mesh.deployment.container and it's dependencies to the podTemplate.

Ingress: Okay, let's put off working on ingress for the jobmanager UI port until later. Accessing it via an ssh tunnel manually is fine for now.

Egress: Okay, I added the mesh.deployment.container. But how to test? I think I can't test this locally.

Change 876200 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-operator - add admin_ng helmfile

https://gerrit.wikimedia.org/r/876200

Change 865100 merged by jenkins-bot:

[operations/deployment-charts@master] flink-kubernetes-operator - Initial commit of upstream helm chart

https://gerrit.wikimedia.org/r/865100

Change 865158 merged by jenkins-bot:

[operations/deployment-charts@master] flink-kubernetes-operator - modify for WMF

https://gerrit.wikimedia.org/r/865158

Change 876203 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] Bump flink-kubernetes-operator chart versions to 1.3.0 to match image and upstream version

https://gerrit.wikimedia.org/r/876203

Change 876203 merged by jenkins-bot:

[operations/deployment-charts@master] Bump flink-kubernetes-operator chart versions to 1.3.0 to match image and upstream version

https://gerrit.wikimedia.org/r/876203

Change 866510 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app chart

https://gerrit.wikimedia.org/r/866510

Change 876200 merged by jenkins-bot:

[operations/deployment-charts@master] flink-operator - add admin_ng helmfile

https://gerrit.wikimedia.org/r/876200

Change 878116 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-kubernetes-operator - use chart version as wmf version

https://gerrit.wikimedia.org/r/878116

Change 878116 merged by jenkins-bot:

[operations/deployment-charts@master] flink-kubernetes-operator - use chart version as wmf version

https://gerrit.wikimedia.org/r/878116

Change 878126 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] admin_ng/flink-operator - crds release depends on kube-system/namespaces

https://gerrit.wikimedia.org/r/878126

Change 878126 merged by Ottomata:

[operations/deployment-charts@master] admin_ng/flink-operator - crds release depends on kube-system/namespaces

https://gerrit.wikimedia.org/r/878126

Change 878134 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-kubernetes-operator - networkpolicies for k8s API

https://gerrit.wikimedia.org/r/878134

Change 878134 merged by Ottomata:

[operations/deployment-charts@master] flink-kubernetes-operator - networkpolicies for k8s API

https://gerrit.wikimedia.org/r/878134

Change 878210 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] Add flink-app-example service

https://gerrit.wikimedia.org/r/878210

Change 878210 merged by jenkins-bot:

[operations/deployment-charts@master] Add flink-app-example service in the dse-k8s-eqiad cluster

https://gerrit.wikimedia.org/r/878210

Change 879116 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-app-example - use correct patch to kubeconfig file in stream-enricnment-poc

https://gerrit.wikimedia.org/r/879116

Change 879116 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app-example - use correct patch to kubeconfig file in stream-enricnment-poc

https://gerrit.wikimedia.org/r/879116

Hm, trying to deploy flink-app-example is erroring, I think we need some extra permissions:

$ helmfile -n stream-enrichment-poc -e dse-k8s-eqiad diff
...
Error: rendered manifests contain a resource that already exists. Unable to continue with install: could not get information about the resource FlinkDeployment "flink-app-main" in namespace "stream-enrichment-poc": flinkdeployments.flink.apache.org "flink-app-main" is forbidden: User "stream-enrichment-poc-deploy" cannot get resource "flinkdeployments" in API group "flink.apache.org" in the namespace "stream-enrichment-poc"

Also:

$ kube_env stream-enrichment-poc dse-k8s-eqiad

$ kubectl get flinkdeployments

Error from server (Forbidden): flinkdeployments.flink.apache.org is forbidden: User "stream-enrichment-poc" cannot list resource "flinkdeployments" in API group "flink.apache.org" in the namespace "stream-enrichment-poc"

Oops, yeah. We are pretty restrictive with permissions for deployment users (only allowing particular object types etc.).

Change 879122 had a related patch set uploaded (by JMeybohm; author: JMeybohm):

[operations/deployment-charts@master] admin_ng RBAC: Permit deploy users to interact with more resources

https://gerrit.wikimedia.org/r/879122

Or, hm, @JMeybohm @BTullis, is this because I am deploying into a namespace and not using the 'flink' ServiceAccount? I'm not totally sure how the 'stream-enrichment-poc-deploy' user defined(?) in e.g. /etc/kubernetes/stream-enrichment-poc-deploy-dse-k8s-eqiad.config relates to ServiceAccounts.

Or, hm, @JMeybohm @BTullis, is this because I am deploying into a namespace and not using the 'flink' ServiceAccount? I'm not totally sure how the 'stream-enrichment-poc-deploy' user defined(?) in e.g. /etc/kubernetes/stream-enrichment-poc-deploy-dse-k8s-eqiad.config relates to ServiceAccounts.

No. It's because "you" (as in the deploy user you are authenticating as to the k8s api) is not allowed to create flinkdeployment objects.

Change 879122 merged by jenkins-bot:

[operations/deployment-charts@master] admin_ng RBAC: Permit deploy users to interact with more resources

https://gerrit.wikimedia.org/r/879122

Getting somewhere! App was deployed, but:

{
  "@timestamp": "2023-01-11T19:22:07.776Z",
  "log.level": "INFO",
  "message": "Could not resolve ResourceManager address akka.tcp://flink@flink-app-main.stream-enrichment-poc:6123/user/rpc/resourcemanager_*, retrying in 10000 ms: Could not connect to rpc endpoint under address akka.tcp://flink@flink-app-main.stream-enrichment-poc:6123/user/rpc/resourcemanager_*.",
  "ecs.version": "1.2.0",
  "process.thread.name": "flink-akka.actor.default-dispatcher-14",
  "log.logger": "org.apache.flink.runtime.jobmaster.JobMaster"
}

Need more networkpolicy? ...

Need more networkpolicy? ...

Hm no.

I think 'flink-app-main.stream-enrichment-poc:6123' should be the correct jobmanager.rpc.address. The name of my JobManager pod is flink-app-main-579dbf9b98-f8vtl, and that pod has:

IP:           10.67.25.10
IPs:
  IP:           10.67.25.10
Controlled By:  ReplicaSet/flink-app-main-579dbf9b98
Containers:
  flink-main-container:
    Container ID:  docker://cb3c6f859854efb09e7362f5f43083082a5b558fd7df31ddd6723297ea853789
    Image:         docker-registry.discovery.wmnet/flink:1.16.0-wmf2
    Image ID:      docker-pullable://docker-registry.discovery.wmnet/flink@sha256:1e2e0cc1c66c31699f7455b0e211126ca4e6d5cda160a92d5d39c77ad4d0fffa
    Ports:         8081/TCP, 6123/TCP, 6124/TCP
    Host Ports:    0/TCP, 0/TCP, 0/TCP

Ah but the not connecting to ResourceManager is a symptom of it not actually starting. Other log says Could not start the ResourceManager:

Could not start the ResourceManager akka.tcp://flink@flink-app-main.stream-enrichment-poc:6123/user/rpc/resourcemanager_1
...
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.watchPodsAndDoCallback(Fabric8FlinkKubeClient.java:245)
    at org.apache.flink.kubernetes.KubernetesResourceManagerDriver.watchTaskManagerPods(KubernetesResourceManagerDriver.java:373)
    at org.apache.flink.kubernetes.KubernetesResourceManagerDriver.initializeInternal(KubernetesResourceManagerDriver.java:113)
    at org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:92)
    at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:171)
...
Caused by: java.net.SocketTimeoutException: connect timed out
...
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.platform.Platform.connectSocket(Platform.java:130)

So, I think the JobManager is not able to talk to the k8s API to watch for pod starts?

Maybe I need to set KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT env vars in the flink pods too, and/or also allow egress to them?

So, I think the JobManager is not able to talk to the k8s API to watch for pod starts?

Maybe I need to set KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT env vars in the flink pods too, and/or also allow egress to them?

That makes sense I guess as, IIUC, the operator creates pods for flink and flink creates pods for jobs. So both need to interact with the kubernetes API at some point.
Getting the Networpolicy right might be a bit tricky, though. That one would need to be in the namespace of the flink cluster and match the pods that the operator manages. Maybe the easier way out is to have the operator chart create a GlobalNetworkpolicy selecting the watchNamespaces (https://projectcalico.docs.tigera.io/archive/v3.16/reference/resources/globalnetworkpolicy) or specific pod selectors regardless of the namespace?

Maybe the easier way out is to have the operator chart create a GlobalNetworkpolicy selecting the watchNamespaces

This could work, and would make sense as the operator is so far responsible for creating some of these kinds of resources in the watchNamespace anyway.

So, would it be okay if all pods in the watchNamespace are allowed to talk to k8s?

Getting the NetworkPolicy right might be a bit tricky, though. That one would need to be in the namespace of the flink cluster and match the pods that the operator manages

Actually, why is this tricky? I had thought this would be just adding an egress rule in the flink-app to .Values.kubernetesMasters.cidrs, just like we did for the operator.

Getting the NetworkPolicy right might be a bit tricky, though. That one would need to be in the namespace of the flink cluster and match the pods that the operator manages

Actually, why is this tricky? I had thought this would be just adding an egress rule in the flink-app to .Values.kubernetesMasters.cidrs, just like we did for the operator.

Yeah. You are right. As the flink-app chart is deployed to the watchNamespace, it could also create/add this additional rule.

Change 879618 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-kubernetes-operator - allow flink-app pods to talk to k8s API

https://gerrit.wikimedia.org/r/879618

Change 879618 merged by jenkins-bot:

[operations/deployment-charts@master] flink-kubernetes-operator - allow flink-app pods to talk to k8s API

https://gerrit.wikimedia.org/r/879618

Change 880991 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-app - set KUBERNETES_SERVICE_{HOST,PORT} in flink-main-container

https://gerrit.wikimedia.org/r/880991

Change 880991 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app - set KUBERNETES_SERVICE_{HOST,PORT} in flink-main-container

https://gerrit.wikimedia.org/r/880991

Rats, neither the NetworkPolicy nor the KUBERNETES_SERVICE_HOST changes worked.

I'm not totally sure where to go from here. I'm pretty sure this is a problem with the flink containers not being able to talk to the k8s API, but I'm not sure if that's because of a misconfigured host/port or because of networking rule or something.

Janis is OOO this week. I'll try to get someone else from ServiceOps to take a look.

Change 881424 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-app-example Set egress enabled in dse-k8s-eqiad

https://gerrit.wikimedia.org/r/881424

Change 881424 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app-example Set egress enabled in dse-k8s-eqiad

https://gerrit.wikimedia.org/r/881424

@JMeybohm hm, is the extra NetworkPolicy we made the flink-operator chart install redundant with the default-network-policy if egress.enabled = true? We made flink operator add kubernetesMasters egress:

Allowing egress traffic:
  To Port: 443/TCP
  To Port: 6443/TCP
  To:
    IPBlock:
      CIDR: 10.64.0.228/32
      Except:
  To:
    IPBlock:
      CIDR: 2620:0:861:101:10:64:0:228/128
      Except:
  To:
    IPBlock:
      CIDR: 10.64.16.237/32
      Except:
  To:
    IPBlock:
      CIDR: 2620:0:861:102:10:64:16:237/128
      Except:
Policy Types: Egress

And default egress has:

  Allowing egress traffic:
    To Port: <any> (traffic allowed to all ports)
    To:
      IPBlock:
        CIDR: 10.64.64.0/21
        Except:
    ----------
...

(I don't see any corresponding IPv6s in the deault policy though.)

Change 881455 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-app 0.1.0

https://gerrit.wikimedia.org/r/881455

Change 881455 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app 0.1.0

https://gerrit.wikimedia.org/r/881455

FYI, I'm reverting the KUBERNETES_SERVICE_HOST change in https://gerrit.wikimedia.org/r/c/operations/deployment-charts/+/881455. I think this is probably not needed.

Change 881458 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] flink-operator: bump version to 1.3.1

https://gerrit.wikimedia.org/r/881458

@akosiaris manually edited the flink-pod-k8s-api NetworkPolicy we added to grant access to the kubernetes apiserver Service on ClusterIP 10.67.32.1. This allowed me to telnet to that IP via nsenter in the flink container network namespace ([@dse-k8s-worker1006:/home/otto] $ sudo nsenter -t 1824562 telnet 10.67.32.1 443), whereas before I could not.

However, the JobManager still fails to start with the same error. I'll paste it in full.

{
  "@timestamp": "2023-01-18T19:38:29.625Z",
  "log.level": "ERROR",
  "message": "Fatal error occurred in ResourceManager.",
  "ecs.version": "1.2.0",
  "process.thread.name": "flink-akka.actor.default-dispatcher-5",
  "log.logger": "org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager",
  "error.type": "org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException",
  "error.message": "Could not start the ResourceManager akka.tcp://flink@flink-app-main.stream-enrichment-poc:6123/user/rpc/resourcemanager_1",
  "error.stack_trace": "org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://flink@flink-app-main.stream-enrichment-poc:6123/user/rpc/resourcemanager_1
    at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:246)
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:621)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:190)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Cannot initialize resource provider.
    at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:177)
    at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:269)
    at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:241)
    ... 25 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.watchPodsAndDoCallback(Fabric8FlinkKubeClient.java:245)
    at org.apache.flink.kubernetes.KubernetesResourceManagerDriver.watchTaskManagerPods(KubernetesResourceManagerDriver.java:373)
    at org.apache.flink.kubernetes.KubernetesResourceManagerDriver.initializeInternal(KubernetesResourceManagerDriver.java:113)
    at org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver.initialize(AbstractResourceManagerDriver.java:92)
    at org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager.initialize(ActiveResourceManager.java:171)
    ... 27 more
Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
    at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperation$1(FutureUtils.java:191)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.CompletionException: io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
    ... 3 more
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:129)
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:122)
    at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.lambda$run$2(WatchConnectionManager.java:133)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at io.fabric8.kubernetes.client.okhttp.OkHttpWebSocketImpl$BuilderImpl$1.onFailure(OkHttpWebSocketImpl.java:72)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.java:570)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$1.onFailure(RealWebSocket.java:216)
    at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:180)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
    ... 3 more
    Suppressed: java.lang.Throwable: waiting here
        at io.fabric8.kubernetes.client.utils.Utils.waitUntilReady(Utils.java:169)
        at io.fabric8.kubernetes.client.utils.Utils.waitUntilReadyOrFail(Utils.java:180)
        at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager.waitUntilReady(WatchConnectionManager.java:96)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java:572)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java:547)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.watch(BaseOperation.java:83)
        at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$3(Fabric8FlinkKubeClient.java:236)
        at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 3 more
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
    at java.base/java.net.Socket.connect(Socket.java:609)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.platform.Platform.connectSocket(Platform.java:130)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:263)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:183)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:224)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at io.fabric8.kubernetes.client.okhttp.OkHttpClientBuilderImpl$InteceptorAdapter.intercept(OkHttpClientBuilderImpl.java:62)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at io.fabric8.kubernetes.client.okhttp.OkHttpClientBuilderImpl$InteceptorAdapter.intercept(OkHttpClientBuilderImpl.java:62)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at io.fabric8.kubernetes.client.okhttp.OkHttpClientBuilderImpl$InteceptorAdapter.intercept(OkHttpClientBuilderImpl.java:62)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at io.fabric8.kubernetes.client.okhttp.OkHttpClientBuilderImpl$InteceptorAdapter.intercept(OkHttpClientBuilderImpl.java:62)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at org.apache.flink.kubernetes.shaded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
    at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:172)
    ... 4 more
"
}

@akosiaris to reproduce, just delete the flink-app-main pod in stream-enrichment-poc and tail the logs.

@akosiaris manually edited the flink-pod-k8s-api NetworkPolicy we added to grant access to the kubernetes apiserver Service on ClusterIP 10.67.32.1. This allowed me to telnet to that IP via nsenter in the flink container network namespace ([@dse-k8s-worker1006:/home/otto] $ sudo nsenter -t 1824562 telnet 10.67.32.1 443), whereas before I could not.

you forgot -n to nsenter ;-). So telnet was using the IP of the node. And no, my change was actually not useful and is not needed.

However, the JobManager still fails to start with the same error. I'll paste it in full.

{
  "@timestamp": "2023-01-18T19:38:29.625Z",
  "log.level": "ERROR",
  "message": "Fatal error occurred in ResourceManager.",
  "ecs.version": "1.2.0",
  "process.thread.name": "flink-akka.actor.default-dispatcher-5",
  "log.logger": "org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager",
  "error.type": "org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException",
  "error.message": "Could not start the ResourceManager akka.tcp://flink@flink-app-main.stream-enrichment-poc:6123/user/rpc/resourcemanager_1",
  "error.stack_trace": "org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://flink@flink-app-main.stream-enrichment-poc:6123/user/rpc/resourcemanager_1
  at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
[snip]
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at io.fabric8.kubernetes.client.okhttp.OkHttpClientBuilderImpl$InteceptorAdapter.intercept(OkHttpClientBuilderImpl.java:62)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at org.apache.flink.kubernetes.shaded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
    at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:172)
    ... 4 more
"
}

Ah yes, the beauty of debugging JVM applications that spew tons of back traces that one needs to dig deep in to find the 1 useful message. And in this case, that 1 useful message, is not particularly verbose. A connection timed out, but to which endpoint... unclear.

Anyway, I did manage to proceed a bit. it was indeed trying to reach out to the Kubernetes API (a tcpdump showed that). The problem was label matching after all

kubectl -n stream-enrichment-poc describe pods flink-app-main-794b6767c7-dsqj4 
Name:         flink-app-main-794b6767c7-dsqj4
Namespace:    stream-enrichment-poc
Priority:     0
Node:         dse-k8s-worker1006.eqiad.wmnet/10.64.132.8
Start Time:   Thu, 19 Jan 2023 10:48:28 +0000
Labels:       app=flink-app-main <============ Note this one
              component=jobmanager
              pod-template-hash=794b6767c7
              release=main
              routed_via=main

vs

kubectl -n stream-enrichment-poc describe netpol flink-pod-k8s-api
Name:         flink-pod-k8s-api
Namespace:    stream-enrichment-poc
Created on:   2023-01-17 18:41:28 +0000 UTC
Labels:       app.kubernetes.io/managed-by=Helm
              app.kubernetes.io/name=flink-kubernetes-operator
              app.kubernetes.io/version=1.3.0
              helm.sh/chart=flink-kubernetes-operator-2.2.0
Annotations:  meta.helm.sh/release-name: flink-operator
              meta.helm.sh/release-namespace: flink-operator
Spec:
  PodSelector:     app=flink-app <======= This is different, doesn't match the above

I hand edited it to match and now the app speaks to the API.

And ofc it croaked because

88 Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname 10.67.32.1 not verified:
89     certificate: sha256/Z4R87wO1q3QtSUHZaWh+Ph8UinMuoTamUhpaHr2AiI8=
90     DN: CN=dse-k8s-ctrl.svc.eqiad.wmnet
91     subjectAltNames: [dse-k8s-ctrl1002.eqiad.wmnet, dse-k8s-ctrl.svc.eqiad.wmnet, kubernetes.default.svc.cluster.local, dse-k8s-ctrl001.eqiad.wmnet]

that error was pretty verbose and nice.

@Ottomata, I think you need to re-revert https://gerrit.wikimedia.org/r/c/operations/deployment-charts/+/881455 and also fix the label matching and you should be good to go.

YES! Thank you so much @akosiaris!

Okay just so I understand, setting KUBERNETES_SERVICE_HOST to kubernetes.default.svc.cluster.local is always needed when trying to get pods to talk to k8s because otherwise k8s will provide a value as the IP, which will fail cert validation because the cert is for those names?

Really appreciate it!

Change 881605 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink - fix access to k8s api

https://gerrit.wikimedia.org/r/881605

Something is messing with app label I'm setting, and we suspect it is the flink-kubernetes-operator. Just sent an email to the mailing list.

Response: https://lists.apache.org/thread/dont796lp84vfqnovolryw0y0470mqsv

The app label itself is used by Flink internally for a different purpose so it’s overriden. This is completely expected.

I think it would be better to use some other label :)

@akosiaris I don't love it, but I added a engine: flink label to the pods, and am selecting them using that label in the NetworkPolicy.

'engine' is the best I came up with, please bikeshed away (if you care?) :)

Change 881907 had a related patch set uploaded (by Bking; author: Bking):

[operations/docker-images/production-images@master] flink-kubernetes-operator: bump version to 1.3.1

https://gerrit.wikimedia.org/r/881907

Change 881605 merged by jenkins-bot:

[operations/deployment-charts@master] flink - fix access to k8s api

https://gerrit.wikimedia.org/r/881605

Change 881910 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] sync flink-kubernetes-operator-crds and flink-kubernetes-operator versions

https://gerrit.wikimedia.org/r/881910

Change 881910 merged by Ottomata:

[operations/deployment-charts@master] sync flink-kubernetes-operator-crds and flink-kubernetes-operator versions

https://gerrit.wikimedia.org/r/881910

'engine' is the best I came up with, please bikeshed away (if you care?) :)

I changed this to k8s_api_allowed: true.

I'm trying to apply this to admin_ng/flink-operator in dse-k8s-eqiad, but am getting a annoying helmfile error. I'm sure I'm just doing something wrong.

Trying to do:

root@deploy1002:/srv/deployment-charts/helmfile.d/admin_ng/flink-operator# kube_env admin dse-k8s-eqiad
root@deploy1002:/srv/deployment-charts/helmfile.d/admin_ng/flink-operator# helmfile --debug -n flink-operator  diff

...

template syntax error: template: stringTemplate:18:44: executing "stringTemplate" at <get "wmf-stable/flink-kubernetes-operator-crds" "">: error calling get: unexpected type(<nil>) of value for key "wmf-stable/flink-kubernetes-operator-crds": it must be either map[string]interface{} or any struct

What am I doing wrong?

What am I doing wrong?

You're just running the command from the wrong working directory. For admin_ng stuff, everything needs to run though the "master helmfile" in /srv/deployment-charts/helmfile.d/admin_ng. You can limit the scope with -l name=<release name>:

root@deploy1002:/srv/deployment-charts/helmfile.d/admin_ng# helmfile -e dse-k8s-eqiad -l name=flink-operator diff

Change 882662 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink-app - explicitly set Flink ports and configure ingress netpol for them

https://gerrit.wikimedia.org/r/882662

Change 882680 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] flink - avoid adding an extra 'k8s_api_enabled' label by using component label instead

https://gerrit.wikimedia.org/r/882680

Change 882662 merged by jenkins-bot:

[operations/deployment-charts@master] flink-app - explicitly set Flink ports and configure ingress netpol for them

https://gerrit.wikimedia.org/r/882662

Change 882680 merged by jenkins-bot:

[operations/deployment-charts@master] flink - avoid adding an extra 'k8s_api_enabled' label by using component label instead

https://gerrit.wikimedia.org/r/882680

FINALLY GOT flink-example-app running. YESSSS!

Change 881907 merged by Ottomata:

[operations/docker-images/production-images@master] flink-kubernetes-operator: bump version to 1.3.1

https://gerrit.wikimedia.org/r/881907

Change 883657 had a related patch set uploaded (by Bking; author: Bking):

[operations/docker-images/production-images@master] flink-operator: remove unnecessary newline

https://gerrit.wikimedia.org/r/883657

Change 883657 merged by Ottomata:

[operations/docker-images/production-images@master] flink-operator: remove unnecessary newline

https://gerrit.wikimedia.org/r/883657

Change 881458 merged by Bking:

[operations/deployment-charts@master] flink-operator: bump version to 1.3.1

https://gerrit.wikimedia.org/r/881458

Change 884983 had a related patch set uploaded (by Bking; author: Bking):

[operations/deployment-charts@master] flink-k8s-operator: bump internal version

https://gerrit.wikimedia.org/r/884983

Change 884983 merged by Bking:

[operations/deployment-charts@master] flink-k8s-operator: bump internal version

https://gerrit.wikimedia.org/r/884983