Page MenuHomePhabricator

Audit and fix observability (logging and metrics) for pyflink jobs
Open, Needs TriagePublic

Description

We have some doubts about how the logs and metrics produced by our Flink pipelines are reaching Grafana and OpenSearch.

We need to ensure that both the logs produced by Flink itself and by our Python code appear in Opensearch.

We also need to ensure that all metrics are shown in Grafana, where we have a dashboard for Flink, but there are other dashboards that show interesting metrics related to our pipelines, like the number of calls the HTTP APIs.

Task is done if:

  • All logs appear in Opensearch and we have documented it.
  • We have documented where the metrics appear in Grafana.

Ideally, we could improve this to:

  • Ensure we use a common format on logs, like JSON, and the fields produced are parsed by logstach/opensearch
  • Ensure there is an Index Pattern that can understand our logs and extract valuable variables.
  • We build an Opensearch dashboard based on the HTML (or general Flink?) pipeline, with important counts and grouped metrics, like:
    • How many messages were rejected/failed by reason.
    • How many messages were rejected/failed by "change_type_kind"
    • ... TBD
  • We build a Grafana dashboard that shows all the important metrics for our HTML (or all Flink?) pipelines together.
    • General Flink status information
    • Number of calls to the HTTP APIs, with codes returned
    • Latency calling the HTTP APIs
    • Avg latency of messages (from source to sink)
      • Avg latency split by "change_type_kind" (to know if there are issues calling the APIs for specific change types, maybe not hitting cache?)
      • Avg latency split by "Flink process step" (to know if a process is slower than others)

Maybe some of this "ideas" can be split into other tickets, some of them might require more custom logs or metrics to be produced.

Details

Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Warnings when exception caughtrepos/data-engineering/eventutilities-python!98javiermontonfeature/warning-errorsmain
Customize query in GitLab

Event Timeline

Nice!

We build a Grafana dashboard that shows all the important metrics for our HTML (or all Flink?) pipelines together.

The Flink App dashboard should be the one. We can surely add more metrics there.

Avg latency split by "change_type_kind" (to know if there are issues calling the APIs for specific change types, maybe not hitting cache?)

This kind of stuff is pretty app specific. If we do this, we should either add it to a dedicated page_change enrichment related section of the dashboard, or a separate dashboard altogether.

Ensure there is an Index Pattern that can understand our logs and extract valuable variables.

This should be ECS. T234565: Standardize the logstash logging format - ECS

These are logs taken from a PyFlink application. They seem to mix JSON with plain text, and many errors are reported as INFO. At the end of these logs are real ERROR logs being produced, it looks like Flink is reporting al logs coming from the Python code (or Java?) as INFOs, until it finally breaks.

{"@timestamp":"2026-03-06T10:23:57.726Z","log.level": "INFO","message":"           ^^^^^^^^^^^", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-13","log.logger":"org.apache.flink.client.python.PythonDriver"}
  File "/usr/local/lib/python3.11/dist-packages/py4j/protocol.py", line 326, in get_return_value
{"@timestamp":"2026-03-06T10:23:57.726Z","log.level": "INFO","message":"  File \"/usr/local/lib/python3.11/dist-packages/py4j/protocol.py\", line 326, in get_return_value", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-13","log.logger":"org.apache.flink.client.python.PythonDriver"}
    raise Py4JJavaError(
{"@timestamp":"2026-03-06T10:23:57.726Z","log.level": "INFO","message":"    raise Py4JJavaError(", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-13","log.logger":"org.apache.flink.client.python.PythonDriver"}
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
{"@timestamp":"2026-03-06T10:23:57.726Z","log.level": "INFO","message":"py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-13","log.logger":"org.apache.flink.client.python.PythonDriver"}
: java.net.MalformedURLException: no protocol: ['file:/usr/local/lib/python3.11/dist-packages/pyflink/opt/flink-python-1.20.2.jar', 'file:/usr/local/lib/python3.11/dist-packages/pyflink/opt/flink-python-1.20.2.jar', 'file:///opt/lib/venv/lib/python3.11/site-packages/eventutilities_python/lib/flink-connector-kafka-3.3.0-1.20.jar', 'file:///opt/lib/venv/lib/python3.11/site-packages/eventutilities_python/lib/eventutilities-flink-1.4.0-jar-with-dependencies.jar', 'file:///opt/lib/venv/lib/python3.11/site-packages/eventutilities_python/lib/flink-python-1.20.0-tests.jar', 'file:///opt/lib/venv/lib/python3.11/site-packages/eventutilities_python/lib/kafka-clients-3.4.0.jar']
{"@timestamp":"2026-03-06T10:23:57.726Z","log.level": "INFO","message":": java.net.MalformedURLException: no protocol: ['file:/usr/local/lib/python3.11/dist-packages/pyflink/opt/flink-python-1.20.2.jar', 'file:/usr/local/lib/python3.11/dist-packages/pyflink/opt/flink-python-1.20.2.jar', 'file:///opt/lib/venv/lib/python3.11/site-packages/eventutilities_python/lib/flink-connector-kafka-3.3.0-1.20.jar', 'file:///opt/lib/venv/lib/python3.11/site-packages/eventutilities_python/lib/eventutilities-flink-1.4.0-jar-with-dependencies.jar', 'file:///opt/lib/venv/lib/python3.11/site-packages/eventutilities_python/lib/flink-python-1.20.0-tests.jar', 'file:///opt/lib/venv/lib/python3.11/site-packages/eventutilities_python/lib/kafka-clients-3.4.0.jar']", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-13","log.logger":"org.apache.flink.client.python.PythonDriver"}
	at java.base/java.net.URL.<init>(URL.java:674)

Another interesting example, it looks like logs defined in Python are picked by Flink and reported as INFO, not matter if they are DEBUG, ERROR or anything else:

Debug as Info:

{"@timestamp":"2026-03-12T15:48:57.088Z","log.level": "INFO","message":"DEBUG:root:No need to resolve https://meta.wikimedia.org/w/api.php, it is already absolute", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-11","log.logger":"org.apache.flink.client.python.PythonDriver"}

Error as Info:

{"@timestamp":"2026-03-12T15:48:57.093Z","log.level": "INFO","message":"error: Option 'enrich.mediawiki_api_endpoint_template' is not accepted", "ecs.version": "1.2.0","process.thread.name":"flink-pekko.actor.default-dispatcher-11","log.logger":"org.apache.flink.client.python.PythonDriver"}
Ottomata renamed this task from Logs and Monitoring for the HTML pipeline to Audit and fix observability (logging and metrics) for pyflink jobs.Mar 16 2026, 1:08 PM

Wanted to bring up if we need to consider alerting for these pipelines and want to add it to this ticket. I am not sure what we have for other streaming apps. Defining and alerting on SLA misses, data loss etc. I assume this data would also be needed later for reconcile purposes.

There is a possible improvement here, not related to logs or monitoring, but I found it while exploring logs:

Config uses deprecated configuration key 'state.backend' instead of proper key 'state.backend.type'

This warning I'm not sure if it appears because the application was restarting and the metric name already existed, or because we are conflicting with something else:

Name collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported.[10.67.25.169, taskmanager, flink-app-production-taskmanager-1-4, mw-page-html-content-change-enrich, kafka__mediawiki.page_html_content_change.dev5: Committer, 1]

More warnings to investigate:

The operator name process_error_enrich_with_parent_html, convert_python_to_pyflink__$null__stream.error exceeded the 80 characters length limit and was truncated.
The operator name kafka__mediawiki.page_change.v1__convert_flink_to_python, filter_remove_canary_events, _stream_key_by_map_operator exceeded the 80 characters length limit and was truncated.
Cannot create recoverable writer due to This s3 file system implementation does not support recoverable writers., will use the ordinary writer.

Possible issues with metrics being reported:

Invalid type for Gauge org.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3230b364: java.lang.String, only number types and booleans are supported by this reporter.
│ {"@timestamp":"2026-03-31T13:48:18.638Z","log.level":"DEBUG","message":"Invalid type for Gauge org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$1569/0x00007f3b0fedb530@1cae3576 │
│ : java.lang.String, only number types and booleans are supported by this reporter.", "ecs.version": "1.2.0","process.thread.name":"prometheus-http-1-2","log.logger":"org.apache.flink.metrics.prometheus.Promet │
│ heusReporter"}

Config uses deprecated configuration key 'state.backend' instead of proper key 'state.backend.type'

Is this possibly related to https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#migrate-from-flink-confyaml-to-configyaml ? I don't recall where we are with that related to recent Flink upgrades.