We should update bundled dependencies and py4j wrappers to support Flink 1.20
Description
Details
| Title | Reference | Author | Source Branch | Dest Branch | |
|---|---|---|---|---|---|
| Version bump wikimedia-event-utilities and add support to Flink 1.20 | repos/data-engineering/eventutilities-python!88 | gmodena | version-bump | main |
| Status | Subtype | Assigned | Task | ||
|---|---|---|---|---|---|
| Open | None | T376812 EPIC: Update flink jobs to support Flink 1.20 | |||
| Resolved | gmodena | T374359 Update eventutilities_python wrappers to support Flink 1.20 | |||
| Resolved | tchin | T401725 Deploy mediawiki-event-enrichment Flink jobs running 1.20 | |||
| Resolved | tchin | T397330 mediawiki.content_history: flink applications experiencing frequent restarts due to JobManager OOMs | |||
| Resolved | BTullis | T402886 Alert in need of triage: KubernetesContainerReachingMemoryLimit (instance wikikube-worker1119.eqiad.wmnet) |
Event Timeline
gmodena opened https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/86
flink: pin version to 1.17
gmodena merged https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/86
flink: pin version to 1.17
I timeboxed some investigation during my latest ops week, and here's some notes on the current state of our Flink support.
eventutilites_python Python wrapper currently requires Flink 1.17. However pyflink 1.17 does not support apple silicon, making docker the only viable option for local development and experimentation.
The docker dev image we provide is x86_64 linux, the same platform we target in production (k8s).
Native pyflink on apple silicon
The integration test suite fails with a Beam-related type error:
TypeError: Argument 'input_stream' has incorrect type (expected pyflink.fn_execution.stream_fast.LengthPrefixInputStream, got BeamInputStream)
Triggered in fn_execution/beam/beam_coder_impl_slow.py.
Docker on apple silicon
On my machine (m3) I ran into issues when running x86_64 images with Docker Desktop (and rosetta). The runtime is flaky with the test image variant takes between 30+ sec to never completing (Flink TMs hang indefinetly). Under the hood we run jdk11, which is known to be iffy with rosetta.
As a workaround: https://github.com/abiosoft/colima fixed (seemingly) all my issues with unstable perf of x86_64 java docker containers on apple silicon. This workaround requires substantial changes to a user's runtime.
Flink version bump
Flink 1.20 does not provide a kafka-connector. 1.18 and 1.19 connectors are not compatible with wikimedia-event-utilities, at least at runtime. If I bundle either version with eventutilities_python, py4j fails to load and initialize wikimedia-event-utilities objects. I need further investigation on the java side of things.
When pinning flink-kafka-connector to 1.17 and flink 1.20, wikimedia-event-utilities jars are loaded and the wrapper instantiates objects properly). The. test suites fails 4 cases out of 26 with a bunch of Py4JJavaErrors, that hint to API mismatch between our python wrapper and current Flink implementation,
Next steps
To make things a bit more complicated, wikimedia-event-utilities is built against Flink 1.16. That version is bundled in the fat jar we depend on in eventutilities_python. All our docker (Java and Python) deployments, however, are currently targeting Flink 1.17.
Naively, I would like to move forward with this upgrade path:
- Version bump wikimedia-event-utilities to the latest stable flink and kafka connector deps. This will likely required some API changes/updates.
- Version bump eventutilites_python to align with our Flink target. This will required updating the Python wrappers to match api changes.
- Version bump docker images. Here we need to asses if the k8s operator needs updates too (currently we maintain a fork from the apache one).
Quick update on this. I build the wrappers locally, using a SNAPSHOT of wikimedia-event-utiliites that bundles Flink 1.20
There is some bytecode incompat I need to figure out. Some tests fail with:
File "/Users/gmodena/repos/data-engineering/eventutilities-python/.tox/test/lib/python3.10/site-packages/pyflink/fn_execution/stream_slow.py", line 107, in write_int32
self.write(struct.pack('>i', v))
struct.error: required argument is not an integerI tried patching the python writers at python3.10/site-packages/pyflink/fn_execution/stream_slow.py to cast their input to int, and this makes tests pass:
--------- coverage: platform darwin, python 3.10.15-final-0 ---------- Name Stmts Miss Branch BrPart Cover Missing ---------------------------------------------------------------------------------------- eventutilities_python/__init__.py 8 2 0 0 75% 6-7 eventutilities_python/flink.py 142 30 48 9 71% 60->62, 63, 84-87, 205, 270, 405-416, 463-492, 593->597, 606->613, 607->611, 619-627, 720-727 eventutilities_python/stream/__init__.py 4 0 0 0 100% eventutilities_python/stream/descriptor.py 105 18 32 10 74% 50-54, 142-149, 192->198, 194, 208->212, 251, 260-267, 306, 308->318, 319, 320->323 eventutilities_python/stream/error.py 30 5 10 5 75% 55, 72, 74->78, 78->96, 92-94 eventutilities_python/stream/functions.py 19 0 2 1 95% 42->45 eventutilities_python/stream/manager.py 221 12 42 8 92% 154, 335->338, 411-413, 416, 503-507, 573->580, 582, 813, 868, 879->886, 893->exit eventutilities_python/testing/__init__.py 0 0 0 0 100% eventutilities_python/testing/utils.py 28 3 8 2 86% 21-22, 25 eventutilities_python/type_aliases.py 49 6 14 4 84% 71-72, 77->93, 78->91, 83-89, 94 eventutilities_python/utils.py 42 4 8 3 86% 38, 41, 83-84 ---------------------------------------------------------------------------------------- TOTAL 648 80 164 42 82% Coverage XML written to file coverage.xml =========================================================================== 26 passed in 40.10s =========================================================================== test: OK (44.43=setup[3.94]+cmd[40.50] seconds) congratulations :) (44.52 seconds)
Now I'm backtracking the root cause of this type mismatch.
Now I'm backtracking the root cause of this type mismatch.
The code path is triggered when a python function throws an exception. Eg.
def enrich_with_error(test_event: dict) -> dict:
assert isinstance(test_event, dict)
if test_event["test_int"] == 2:
raise ValueError("GOT AN ERROR HERE")
test_event["enriched_field"] = "I am enriched"
return test_eventAll good in the happy case.
After some more digging, I think the issue comes from the return value of flink_instant_of_datetime() (our method) triggered when generating an error event on the DLQ. You can reproduce with:
from pyflink.common import Instant from datetime import datetime dt = datetime.now() value = Instant.of_epoch_milli(dt.timestamp() * 1000)
Down in beam, value will end up in a code path that triggers out_stream.write_int64(value.seconds) (in pyflink/fn_execution/coder_impl_slow.py), and fails with a type error because value.seconds is float, and can't be packed with >q format. If I cast int(value.seconds), bin packing works fine.
This might have already been fixed by https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/85.
gmodena opened https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/88
Draft: Version bump wikimedia-event-utilities and add support to Flink 1.20
Down in beam, value will end up in a code path that triggers out_stream.write_int64(value.seconds) (in pyflink/fn_execution/coder_impl_slow.py), and fails with a type error because value.seconds is float, and can't be packed with >q format. If I cast int(value.seconds), bin packing works fine.
I fixed this in https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/88/diffs?commit_id=6797c99524013d313607acaeb28ea9ec5408c55f. I'll need to backport the change in MR85.
All tests are now passing locally against wikimedia-event-utilities 1.3.7-SNAPSHOT, Flink 1.20, Java 11, Python 3.10
--------- coverage: platform darwin, python 3.10.15-final-0 ---------- Name Stmts Miss Branch BrPart Cover Missing ---------------------------------------------------------------------------------------- eventutilities_python/__init__.py 8 2 0 0 75% 6-7 eventutilities_python/flink.py 142 30 48 9 71% 60->62, 63, 84-87, 205, 270, 405-416, 463-492, 593->597, 606->613, 607->611, 619-627, 720-727 eventutilities_python/stream/__init__.py 4 0 0 0 100% eventutilities_python/stream/descriptor.py 105 18 32 10 74% 50-54, 142-149, 192->198, 194, 208->212, 251, 260-267, 306, 308->318, 319, 320->323 eventutilities_python/stream/error.py 30 5 10 5 75% 55, 72, 74->78, 78->96, 92-94 eventutilities_python/stream/functions.py 19 0 2 1 95% 42->45 eventutilities_python/stream/manager.py 221 12 42 8 92% 156, 337->340, 413-415, 418, 505-509, 575->582, 584, 815, 870, 881->888, 895->exit eventutilities_python/testing/__init__.py 0 0 0 0 100% eventutilities_python/testing/utils.py 28 3 8 2 86% 21-22, 25 eventutilities_python/type_aliases.py 49 6 14 4 84% 71-72, 77->93, 78->91, 83-89, 94 eventutilities_python/utils.py 42 4 8 3 86% 38, 41, 83-84 ---------------------------------------------------------------------------------------- TOTAL 648 80 164 42 82% Coverage XML written to file coverage.xml =========================================================================================== 26 passed in 37.28s ===========================================================================================
Are we still hoping to merge include https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/85#152012d68e4daf2dbab132526851e859f98a98dd in this next major version?
gmodena merged https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/88
Version bump wikimedia-event-utilities and add support to Flink 1.20