Page MenuHomePhabricator

Fix PyFlink log levels
Open, Needs TriagePublic

Description

Python logs produced with logging are being printed by Flink as "INFO" all the time, no matter the level they produce. It looks like the message produced by Python is wrapped with the Java logger, so the message becomes a JSON with INFO level where the message contains the whole Python logs as plain text.

Some examples:

Debug as Info:

{"@timestamp":"2026-03-12T15:48:57.088Z","log.level": "INFO","message":"DEBUG:root:No need to resolve https://meta.wikimedia.org/w/api.php, it is already absolute", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-11","log.logger":"org.apache.flink.client.python.PythonDriver"}

Error as Info:

{"@timestamp":"2026-03-12T15:48:57.093Z","log.level": "INFO","message":"error: Option 'enrich.mediawiki_api_endpoint_template' is not accepted", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-11","log.logger":"org.apache.flink.client.python.PythonDriver"}

We aren't able to create alerts based on logs for these PyFlink applications as they always produce "INFO". The only errors that Flink produce are when the Python application finally dies, at that point Flink produces errors. But we might want to create some monitor based on regular and recoverable errors, or warnings, so we need to ensure PyFlink reports the right level.

Task is done if:

  • Python logs are produced with the right level.
  • Python logs are JSON with standard format and fields.

Details

Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Bump eventutilities-pythonrepos/data-engineering/mediawiki-event-enrichment!128javiermontonfeature/bump-eventutilities-python-logsmain
Customize query in GitLab

Event Timeline

We are also seeing real Warning messages like:

Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPConnectionPool(host='localhost', port=6500): Read timed out. (read timeout=5)")': /w/rest.php/v1/revision/1174258630/html

being reported in Logstash.

Maybe the issue is that logs produced in each pod look like Info, but Flink finds them and report them properly? or maybe logstash is doing a good job finding the real log?

I'm trying to check if this is a real issue in Logstash or if it's happening only when looking at the K8s pods directly.

Here there is an example where a Flink application failed and the main reason for failure was reported as INFO:

The log is quite big, and can be found here: https://logstash.wikimedia.org/app/discover#/doc/0fade920-6712-11eb-8327-370b46f9e7a5/ecs-k8s-1-1.11.0-7-2026.12?id=mXuDEZ0BbI6kJ8Wyx5-v

Here we have another example where the logs seem to be produced in multiple lines, rather than JSON, making Logstash to count them as individual logs, where in reality this is just a single log with a long stack strace:

image.png (1×2 px, 360 KB)

Change #1261475 had a related patch set uploaded (by JavierMonton; author: JavierMonton):

[operations/deployment-charts@master] stream: mw-page-html-content-change-enrich-next

https://gerrit.wikimedia.org/r/1261475

Change #1261475 merged by jenkins-bot:

[operations/deployment-charts@master] stream: mw-page-html-content-change-enrich-next

https://gerrit.wikimedia.org/r/1261475

Another log related issue is that flink logger "extra" fields are being omitted in logstash. The current logs are being sent as plain string and the extra fields in logger is ignored.

I think we need to do something like this in if __name__ == "__main__":

setup_logging(
    config_dict=...
)

but this means we need to create a class/function to confirm the log message + extras into a format config_dict can understand. I tried looking for examples where this is currently in use but couldn't find one.

Example of the omission:

About the log level, I'm not completely sure what fixed it, but right now the HTML enrichment pipeline is showing Python logs properly. I added this change in deployment-charts which forces the log4j-console.properties to be included in Flink.

We could add that to other Flink deployments.

About adding extra tags, I tried adding different "extras", using labels.* which should be ECS standard, and other custom fields, but not matter we add, the log in the K8s pod console is always the same.
I also tried adding some config_dict and populate it to eventutitlities-python, but still, logs are removing any extra field we add.

Researching a bit about it, it looks like logs are standardized in the PyFlink/Beam worker and only uses a set of fixed fields, and no matter what we add, only those fields will be produced.
This is an example of a Log from PyFlink custom process function, taken from the K8s pod:

json
{
  "@timestamp": "2026-04-01T10:31:00.673Z",
  "log.level": "INFO",
  "message": "Result for revision_id 1186892539 is 200",
  "ecs.version": "1.2.0",
  "process.thread.name": "grpc-default-executor-6",
  "log.logger": "/srv/app/python/mediawiki_event_enrichment/html_enrichment/page_html_content_change.py:193"
}

We are using apache-beam==2.48.0 and it looks like there was some PRs (like this about structured logging) that might fix this issue, but there were released in newer versions. Maybe upgrading Flink will fix this logging issues at some point.

Another main Error that appears as INFO, just right after a real ERROR:

json
{
  "_index": "ecs-k8s-1-1.11.0-7-2026.14",
  "_id": "G3jBTp0BDahWTbnKpeXf",
  "_version": 1,
  "_score": null,
  "_source": {
    "normalized": {
      "message": "application failed: ",
      "dropped": {
        "no_such_field": [
          "[time]",
          "[timestamp]",
          "[program]",
          "[stream]",
          "[rsyslog][facility]",
          "[rsyslog][timereported]",
          "[rsyslog][severity]",
          "[rsyslog][programname]",
          "[rsyslog][hostname]",
          "[metadata][filename]",
          "[metadata][fileoffset]",
          "[logsource]",
          "[@version]",
          "[docker][container_id]",
          "[normalized_message]",
          "[kubernetes][creation_timestamp]",
          "[kubernetes][host]",
          "[kubernetes][namespace_id]",
          "[kubernetes][pod_id]",
          "[kubernetes][namespace_labels][app_kubernetes_io/managed-by]",
          "[kubernetes][namespace_labels][release]",
          "[kubernetes][namespace_labels][pod-security_kubernetes_io/warn]",
          "[kubernetes][namespace_labels][heritage]",
          "[kubernetes][namespace_labels][kubernetes_io/metadata_name]",
          "[kubernetes][namespace_labels][pod-security_kubernetes_io/audit]",
          "[kubernetes][namespace_labels][app]",
          "[kubernetes][namespace_labels][chart]",
          "[kubernetes][namespace_labels][pod-security_kubernetes_io/enforce]",
          "[kubernetes][labels][pod-template-hash]",
          "[kubernetes][labels][routed_via]",
          "[kubernetes][labels][component]",
          "[kubernetes][labels][type]"
        ]
      }
    },
    "@timestamp": "2026-04-02T15:13:26.804Z",
    "message": "Application FAILED: ",
    "tags": [
      "input-kafka-k8s",
      "rsyslog-shipper",
      "kafka",
      "es",
      "k8s_containerd_msg_field_parsed",
      "k8s_docker_log_field_parsed",
      "normalized_message_untrimmed"
    ],
    "labels": {
      "release": "production",
      "deployment": "flink-app-production",
      "container_name": "flink-main-container"
    },
    "service": {
      "type": "input-file-kubernetes"
    },
    "process": {
      "thread": {
        "name": "flink-pekko.actor.default-dispatcher-14"
      }
    },
    "host": {
      "name": "dse-k8s-worker1017"
    },
    "orchestrator": {
      "namespace": "mw-page-html-content-change-enrich",
      "resource": {
        "name": "flink-app-production-6d7d95964c-5jrck"
      },
      "type": "kubernetes",
      "cluster": {
        "url": "https://dse-k8s-ctrl.svc.eqiad.wmnet:6443"
      }
    },
    "log": {
      "logger": "org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap",
      "level": "INFO",
      "syslog": {
        "priority": 134,
        "facility": {
          "code": 16,
          "name": "local0"
        },
        "severity": {
          "code": 6,
          "name": "informational"
        }
      }
    },
    "error": {
      "stack_trace": "java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED\n\tat org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$7(ApplicationDispatcherBootstrap.java:403)\n\tat java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)\n\tat java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)\n\tat org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$null$2(JobStatusPollingUtils.java:101)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)\n\tat java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)\n\tat org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)\n\tat java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)\n\tat org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1315)\n\tat org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)\n\tat org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)\n\tat java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)\n\tat java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)\n\tat java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)\n\tat java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)\n\tat org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)\n\tat org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)\n\tat org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)\n\tat org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)\n\tat org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)\n\tat scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)\n\tat scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)\n\tat scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)\n\tat scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)\n\tat scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)\n\tat org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:624)\n\tat org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)\n\tat scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)\n\tat scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\n\tat scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\n\tat scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\n\tat org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)\n\tat org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)\n\tat org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:110)\n\tat org.apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59)\n\tat org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:61)\n\tat java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)\n\tat java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)\n\tat java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)\n\tat java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)\n\tat java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)\nCaused by: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED\n\tat org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:71)\n\t... 51 more\nCaused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.\n\tat org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)\n\tat org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:60)\n\t... 51 more\nCaused by: org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 2\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:569)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)\n\tat org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)\n\tat org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172)\n\tat org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)\n\tat org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)\n\t... 7 more\nCaused by: java.lang.RuntimeException: Python process exits with code: 2\n\tat org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)\n\t... 21 more\n",
      "message": "org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: FAILED",
      "type": "java.util.concurrent.CompletionException"
    },
    "ecs": {
      "version": "1.2.0"
    }
  },
  "fields": {
    "@timestamp": [
      "2026-04-02T15:13:26.804Z"
    ]
  },
  "highlight": {
    "orchestrator.cluster.url": [
      "@opensearch-dashboards-highlighted-field@https://dse-k8s-ctrl.svc.eqiad.wmnet:6443@/opensearch-dashboards-highlighted-field@"
    ],
    "orchestrator.namespace": [
      "@opensearch-dashboards-highlighted-field@mw-page-html-content-change-enrich@/opensearch-dashboards-highlighted-field@"
    ],
    "message": [
      "@opensearch-dashboards-highlighted-field@Application@/opensearch-dashboards-highlighted-field@ @opensearch-dashboards-highlighted-field@FAILED@/opensearch-dashboards-highlighted-field@:"
    ]
  },
  "sort": [
    1775142806804
  ]
}

Another main Error that appears as INFO, just right after a real ERROR:

I think this one is correctly INFO. The message is about the application state being changed to FAILED, yes? The actual error should be real ERROR that you saw?