Page MenuHomePhabricator

Update eventutilities_python wrappers to support Flink 1.20
Closed, ResolvedPublic

Description

We should update bundled dependencies and py4j wrappers to support Flink 1.20

Details

Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Version bump wikimedia-event-utilities and add support to Flink 1.20repos/data-engineering/eventutilities-python!88gmodenaversion-bumpmain
Customize query in GitLab

Event Timeline

I timeboxed some investigation during my latest ops week, and here's some notes on the current state of our Flink support.

eventutilites_python Python wrapper currently requires Flink 1.17. However pyflink 1.17 does not support apple silicon, making docker the only viable option for local development and experimentation.
The docker dev image we provide is x86_64 linux, the same platform we target in production (k8s).

Native pyflink on apple silicon

The integration test suite fails with a Beam-related type error:

TypeError: Argument 'input_stream' has incorrect type (expected pyflink.fn_execution.stream_fast.LengthPrefixInputStream, got BeamInputStream)

Triggered in fn_execution/beam/beam_coder_impl_slow.py.

Docker on apple silicon

On my machine (m3) I ran into issues when running x86_64 images with Docker Desktop (and rosetta). The runtime is flaky with the test image variant takes between 30+ sec to never completing (Flink TMs hang indefinetly). Under the hood we run jdk11, which is known to be iffy with rosetta.
As a workaround: https://github.com/abiosoft/colima fixed (seemingly) all my issues with unstable perf of x86_64 java docker containers on apple silicon. This workaround requires substantial changes to a user's runtime.

Flink version bump

Flink 1.20 does not provide a kafka-connector. 1.18 and 1.19 connectors are not compatible with wikimedia-event-utilities, at least at runtime. If I bundle either version with eventutilities_python, py4j fails to load and initialize wikimedia-event-utilities objects. I need further investigation on the java side of things.

When pinning flink-kafka-connector to 1.17 and flink 1.20, wikimedia-event-utilities jars are loaded and the wrapper instantiates objects properly). The. test suites fails 4 cases out of 26 with a bunch of Py4JJavaErrors, that hint to API mismatch between our python wrapper and current Flink implementation,

Next steps

To make things a bit more complicated, wikimedia-event-utilities is built against Flink 1.16. That version is bundled in the fat jar we depend on in eventutilities_python. All our docker (Java and Python) deployments, however, are currently targeting Flink 1.17.

Naively, I would like to move forward with this upgrade path:

  1. Version bump wikimedia-event-utilities to the latest stable flink and kafka connector deps. This will likely required some API changes/updates.
  2. Version bump eventutilites_python to align with our Flink target. This will required updating the Python wrappers to match api changes.
  3. Version bump docker images. Here we need to asses if the k8s operator needs updates too (currently we maintain a fork from the apache one).

Naively, I would like to move forward with this upgrade path:

+1

gmodena renamed this task from Update eventutilities_python wrappers to support Flink 1.20 to Update eventutilities_python wrappers to support Flink 1.19.Oct 15 2024, 11:16 AM
gmodena updated the task description. (Show Details)
gmodena moved this task from Incoming to Kanban Board on the Dumps 2.0 board.
gmodena edited projects, added Dumps 2.0 (Kanban Board); removed Dumps 2.0.
gmodena renamed this task from Update eventutilities_python wrappers to support Flink 1.19 to Update eventutilities_python wrappers to support Flink 1.20.Oct 22 2024, 9:57 AM

Quick update on this. I build the wrappers locally, using a SNAPSHOT of wikimedia-event-utiliites that bundles Flink 1.20

There is some bytecode incompat I need to figure out. Some tests fail with:

  File "/Users/gmodena/repos/data-engineering/eventutilities-python/.tox/test/lib/python3.10/site-packages/pyflink/fn_execution/stream_slow.py", line 107, in write_int32
    self.write(struct.pack('>i', v))
struct.error: required argument is not an integer

I tried patching the python writers at python3.10/site-packages/pyflink/fn_execution/stream_slow.py to cast their input to int, and this makes tests pass:

--------- coverage: platform darwin, python 3.10.15-final-0 ----------
Name                                         Stmts   Miss Branch BrPart  Cover   Missing
----------------------------------------------------------------------------------------
eventutilities_python/__init__.py                8      2      0      0    75%   6-7
eventutilities_python/flink.py                 142     30     48      9    71%   60->62, 63, 84-87, 205, 270, 405-416, 463-492, 593->597, 606->613, 607->611, 619-627, 720-727
eventutilities_python/stream/__init__.py         4      0      0      0   100%
eventutilities_python/stream/descriptor.py     105     18     32     10    74%   50-54, 142-149, 192->198, 194, 208->212, 251, 260-267, 306, 308->318, 319, 320->323
eventutilities_python/stream/error.py           30      5     10      5    75%   55, 72, 74->78, 78->96, 92-94
eventutilities_python/stream/functions.py       19      0      2      1    95%   42->45
eventutilities_python/stream/manager.py        221     12     42      8    92%   154, 335->338, 411-413, 416, 503-507, 573->580, 582, 813, 868, 879->886, 893->exit
eventutilities_python/testing/__init__.py        0      0      0      0   100%
eventutilities_python/testing/utils.py          28      3      8      2    86%   21-22, 25
eventutilities_python/type_aliases.py           49      6     14      4    84%   71-72, 77->93, 78->91, 83-89, 94
eventutilities_python/utils.py                  42      4      8      3    86%   38, 41, 83-84
----------------------------------------------------------------------------------------
TOTAL                                          648     80    164     42    82%
Coverage XML written to file coverage.xml

=========================================================================== 26 passed in 40.10s ===========================================================================
  test: OK (44.43=setup[3.94]+cmd[40.50] seconds)
  congratulations :) (44.52 seconds)

Now I'm backtracking the root cause of this type mismatch.

Now I'm backtracking the root cause of this type mismatch.

The code path is triggered when a python function throws an exception. Eg.

def enrich_with_error(test_event: dict) -> dict:
    assert isinstance(test_event, dict)
    if test_event["test_int"] == 2:
        raise ValueError("GOT AN ERROR HERE")

    test_event["enriched_field"] = "I am enriched"
    return test_event

All good in the happy case.

After some more digging, I think the issue comes from the return value of flink_instant_of_datetime() (our method) triggered when generating an error event on the DLQ. You can reproduce with:

from pyflink.common import Instant
from datetime import datetime

dt = datetime.now()
value = Instant.of_epoch_milli(dt.timestamp() * 1000)

Down in beam, value will end up in a code path that triggers out_stream.write_int64(value.seconds) (in pyflink/fn_execution/coder_impl_slow.py), and fails with a type error because value.seconds is float, and can't be packed with >q format. If I cast int(value.seconds), bin packing works fine.

This might have already been fixed by https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/85.

Down in beam, value will end up in a code path that triggers out_stream.write_int64(value.seconds) (in pyflink/fn_execution/coder_impl_slow.py), and fails with a type error because value.seconds is float, and can't be packed with >q format. If I cast int(value.seconds), bin packing works fine.

I fixed this in https://gitlab.wikimedia.org/repos/data-engineering/eventutilities-python/-/merge_requests/88/diffs?commit_id=6797c99524013d313607acaeb28ea9ec5408c55f. I'll need to backport the change in MR85.

All tests are now passing locally against wikimedia-event-utilities 1.3.7-SNAPSHOT, Flink 1.20, Java 11, Python 3.10

--------- coverage: platform darwin, python 3.10.15-final-0 ----------
Name                                         Stmts   Miss Branch BrPart  Cover   Missing
----------------------------------------------------------------------------------------
eventutilities_python/__init__.py                8      2      0      0    75%   6-7
eventutilities_python/flink.py                 142     30     48      9    71%   60->62, 63, 84-87, 205, 270, 405-416, 463-492, 593->597, 606->613, 607->611, 619-627, 720-727
eventutilities_python/stream/__init__.py         4      0      0      0   100%
eventutilities_python/stream/descriptor.py     105     18     32     10    74%   50-54, 142-149, 192->198, 194, 208->212, 251, 260-267, 306, 308->318, 319, 320->323
eventutilities_python/stream/error.py           30      5     10      5    75%   55, 72, 74->78, 78->96, 92-94
eventutilities_python/stream/functions.py       19      0      2      1    95%   42->45
eventutilities_python/stream/manager.py        221     12     42      8    92%   156, 337->340, 413-415, 418, 505-509, 575->582, 584, 815, 870, 881->888, 895->exit
eventutilities_python/testing/__init__.py        0      0      0      0   100%
eventutilities_python/testing/utils.py          28      3      8      2    86%   21-22, 25
eventutilities_python/type_aliases.py           49      6     14      4    84%   71-72, 77->93, 78->91, 83-89, 94
eventutilities_python/utils.py                  42      4      8      3    86%   38, 41, 83-84
----------------------------------------------------------------------------------------
TOTAL                                          648     80    164     42    82%
Coverage XML written to file coverage.xml

=========================================================================================== 26 passed in 37.28s ===========================================================================================

If I cast int(value.seconds)

Nice find!