Description
Related Objects
Event Timeline
Okay, what we want is to abstract away the Flink-y bits from writing Flink UDFs in python. (and also probably in Java/Scala but that is not this task :) )
Right now, to make your Table UDF, you do:
@udf(input_types=DataTypes.BIGINT(), result_type=DataTypes.STRING()) def get_images(page_id: int):
There are two possibilities for how one might want to write a UDF.
- A. arbitrary input to output types (like you have)
- B. input stream to output stream
Here's how I would imagine these would look. Let's do B. first because I think it is simpler to describe (we already have most of the code needed).
B. input stream to output stream
This is what I was trying in this gist here, but wasn't able to get working.
Let's assume we have streams and schemas for 'mediawiki.page_change' and 'mediawiki.page_change_with_list_of_images' declared.
You'd use eventutilities-flink to go from a stream name to a Flink DataType.
We might need to write a new specific function in eventutilties-flink to do this, but
the logic exists. See JsonSchemaFlinkConverter#toDataType and also EventTableDescriptorBuilder#getSchemaBuilder. We want a little wrapper that goes directly from streamName -> DataType.
Anyway, assuming we have that:
# This is a Python decorator function. def flink_udf(input_stream: str, output_stream: str): # use eventstream-utilities to go from stream_name -> Flink DataType input_datatype = jvm.blabla.getDataTypeForStream(input_stream) output_datatype = jvm.blabla.getDataTypeForStream(output_stream) def wrap_udf(user_func): # This is abstracting away Flinkiness from the user. # We declare call_udf_func as the Flink @udf, but with the # input and output DataTypes we got above. All the function # has to do now is call user_func() with whatever Flink will pass it. @udf(input_types=input_datatype, output_datatype=output_datatype): def call_udf_func(*args, **kwargs): # IIUC, *args will end up being called by Flink as [input_row, output_row] here, # so we might be able to pass them directly as func(args[0], args[1])? not sure. # Q: I'm not sure what the types of input_row, output_row will be. I guess Flink Row()? # It might be nice if we could use native python stuff, like dict, lists, etc, # but I'm not sure. return user_func(*args, **kwargs) return call_udf_func return wrap_udf # Now, we can use our @flink_udf decorator to abstract any need to declare DataTypes. # page_change_to_page_change_with_images_udf will passed as user_func in the code above. @flink_udf(input_stream='mediawiki.page_change', output_stream='mediawiki.page_change_with_list_of_images'): def page_change_to_page_change_with_images_udf(event_row): event_row['images'] = custom_logic_to_get_images(event_row['page_id']) return event_row
Then, I guess in SQL we'd hope to do something like:
insert into mediawiki_page_change_with_list_of_images select page_change_to_page_change_with_images_udf(*) from mediawiki_page_create
I don't know if any of that would work. But it is worth a try. BTW, here's some info on how to write decorators with arguments.
Okay let's try it with arbitrary input and output types:
A. arbitrary input to output types
This is closer to what you already have, but I'm not exactly sure how best to abstract away the Flink bits. In my gist UDF example here, I do it by getting the DataType of one field from the output stream's JSONSchema.
The coolest way would be to get it using Python type annotations, inspect.signature and maybe either NamedTuple or @dataclass.
from typing import List, NamedTuple class WikiImageInfo(NamedTuple): ns: str title: str @flink_udf() def get_wiki_images_info(page_id: int) -> List[WikiImageInfo]: images = json.parse(get_images_from_mw_api(page_id)) result = [] for info in images: result += WikiImageInfo(images['ns'], images['title']) return result
The code to do above might look something like:
import inspect def python_to_flink_datatype(python_type): # to be implemented. def flink_udf(user_func): user_func_sig = inspect.signature(user_func) result_type = python_to_flink_datatype(user_func_sig.return_annotation)
The implementation of python_to_flink_datatype might be sort of similar to JsonSchemaConverter#convert, except iterating over Python types rather than JSONchema. The idea is the same though, convert from one type system to another.
Also note that pyflink has something similar to this, except for explicitly converting between pyflink DataType and Java Flink DataType. We'd want conversion between python typing and pyflink DataType.
Or, if we can't be so fancy as to auto convert between python type and pyflink DataType, we could require that every output type has a JSONSchema, even if not a full event schema. The user could provide this in code themselves, or refer to a specific field like I have in my example. If provided themselves, this would look something like:
image_info_jsonschema = """ type: object properties: ns: type: integer title: type: string """ @flink_udf(result_jsonschema=image_info_jsonschema) def get_wiki_images_info(page_id: int): # ...
In this case, the flink_udf decorator function would use the eventutiltiies-flink JsonSchemaFlinkConverter to go directly from the result_jsonschema to a DataType. This might be easier to do now, since we already have JSONSchema conversion code.
Anyway, as you can see, there are lots of ways we might do this, but whatever they are, ideally users wouldn't have to think TOO much about Flink Types. Ideally users would write regular ol' functions, and we could call them from Flink. To do this, we have to abstract away the Flink type conversions.
Thanks for this write up @Ottomata!
+1 for leveraging on the decorator pattern to hide implementation details.
IMHO Option A is interesting, since it could be orthogonal to Option B (that we could further abstract might we manage to implement a Flink Catalog).
I’m not sure if <stream in> / <stream out> is the best abstraction for UDFs though. AFAIK pyflink supports table functions, that might be a better fit for this use case. As discussed, at that point we might want to just use a pyflink job / python script. The latter does not discount having a set of helpers/decorators to make developer experience nicer.
Okay, what we want is to abstract away the Flink-y bits from writing Flink UDFs in python. (and also probably in Java/Scala but that is not this task :) )
Not tested: if we leverage on Table API, we might be able to invoke Python UDFs from JVM. No idea yet about potential performance bottlenecks / limitations.
Anyway, as you can see, there are lots of ways we might do this, but whatever they are, ideally users wouldn't have to think TOO much about Flink Types. Ideally users would write regular ol' functions, and we could call them from Flink. To do this, we have to abstract away the Flink type conversions.
Python 3.8 introduced TypeAlias, which might be helpful to establish a mapping (along the lines of what we do in the Json SerDE). This requires some thinking though. I think that what here is referred as Flink types are actually SQL types. I wonder if using this (abstracted) UDF in a SQL query could create ambiguity.
In terms of design, maybe a good entry point would be looking at how a user would interact with the flink cluster and eventstream factories. In first approximation I'd be keen to explore Flink Catalogs (I'll create a phab).
Just a thought: maybe to understand the point of departure of SQL vs Table APIs - for this spike - a start could be:
- Packaging / decorating that setup logic to make it more pythonic.
- Introducing an interface for python functions, that implementers would have to adopt (basically what is described in OP).
- Compare a SQL script vs. a Pyflink job that uses Table API (which you did in T318859).
@tchin what do you think? Does this track with what you have in mind?
I definitely feel like the biggest issue here is how we'd map from python types to pyflink DataTypes. Would a python int turn into a DataTypes.INT or perhaps a DataTypes.BIGINT? It's hard to say how well abstracting away the types would go considering the DataTypes are supposed to represent the columns that are being sunk to.
I've only used the Json Schema converter when I was typing the source, so I could make transformations to morph the data into the right structure for the sink (which I typed manually). but here it's more like we're trying to automatically type the sink. In this case, any mismatch can be fixed with casting in SQL, but that defeats the point of abstracting away the DataTypes anyways.
For example, let's say int turns into a DataTypes.INT. If someone had some_table with a some_int TINYINT column and wrote a udf def some_udf() -> int, would it be good UX if INSERT INTO some_table VALUES (some_udf()); crashes and they actually have to do CAST(some_udf() AS TINYINT)?
For reference, here's the docs that list the mappings between python types and Flink DataTypes
To be fair, the actual idea is easy enough to implement for simple mappings
def python_to_flink_datatype(val: type) -> DataType: if val is str: return DataTypes.STRING() elif val is int: return DataTypes.INT() elif val is bool: return DataTypes.BOOLEAN() def flink_udf(user_func): user_func_sig = signature(user_func) result_type = python_to_flink_datatype(user_func_sig.return_annotation) @udf(result_type=result_type) def call_udf_func(*args, **kwargs): return user_func(*args, **kwargs) return call_udf_func
But when considering my example of get_image, given my lack of python knowledge, I have no idea how this translates into a return annotation nor how to dissect it into the right DataType:
@udf(result_type=DataTypes.ARRAY( DataTypes.ROW([ DataTypes.FIELD("ns", DataTypes.INT()), DataTypes.FIELD("title", DataTypes.STRING()) ]) )) def get_images(page_id: int): response = requests.get( f"https://en.wikipedia.org/w/api.php?action=query&format=json&prop=images&pageids={page_id}") if response.status_code == 200: try: images = response.json()['query']['pages'][str(page_id)]['images'] return [Row(ns=int(image["ns"]), title=image["title"]) for image in images] except KeyError: pass return []
Mapping python to SQL will be tricky, since as you point out there is no 1:1 relationship (floating-point and decimal will be funky too). The db-api doc could give some pointers on how database drivers approach this; but AFAIK there is not one standard way to do this mapping. E.g. SQL Server vs PostgreSQL.
nit: did you mean to use isinstance for comparisons?
This is a bit of a risky function IMHO, because it mixes types and objects. If I understand it correctly, the proposal is to use a val: type function annotation to inform the user (and type checker), but under the hood we discard it and cast based on instance. That can be confusing, and lead to some nasty bugs to troubleshoot.
how we'd map from python types to pyflink DataTypes. Would a python int turn into a DataTypes.INT or perhaps a DataTypes.BIGINT
This is just a choice we have to make. We make this choice in the JSONSchema converters too. E.g. we convert all integers to BIGINT, and all numbers to DOUBLE.
I’m not sure if <stream in> / <stream out> is the best abstraction for UDFs though.
On second thought, I think you are right. If this was the abstraction, all the logic is in python and what's the point of doing SQL vs just a pyflink job then?
when considering my example of get_image, given my lack of python knowledge, I have no idea how this translates into a return annotation nor how to dissect it into the right DataType:
Yeah, because python doesn't have a 'struct/row' type, we need to pick a type that it does have that might represent this. I think the closest is NamedTuple, but this means that folks would have to declare their types in a NamedTuple (kind of like a PoJo), which is starting to feel like just more boiler plate...and why not just learn how to use Flink DataType then?
Maybe we should double down on the JSONSchema idea and just always rely on that instead of converting from python type annotations?
E.g. this example from my comment:
@flink_udf(result_jsonschema=image_info_jsonschema) def get_wiki_images_info(page_id: int):
Nice @tchin!
FYI:
- Create the virtual environment
We don't use conda [...]
A problem with virtualenvs is that they don't include the python executable. Most of the time this is fine, but as the system python and also pyflink dependencies change over time, eventually older virtualenvs stop working.
Also stacked environments can cause some confusion
Agreed. We are moving towards deprecating anaconda-wmf and stacked environments: T302819: Replace anaconda-wmf with smaller, non-stacked Conda environments
there *will* be some lag every time you execute a UDF since Flink has to unzip and initialize the virtual environment.
Hm! Is this true? I would assume that Flink would have to unzip the virtualenv for every new taskmanager, but not every time you execute the UDF?
A problem with virtualenvs is that they don't include the python executable.
Can you elaborate on that? I thought the executable is venv/bin/python3
Hm! Is this true? I would assume that Flink would have to unzip the virtualenv for every new taskmanager, but not every time you execute the UDF?
I should rephrase that since I just assumed a single UDF is being called per a query. If let's say you have 2 UDFs get_images and get_backlinks and did SELECT get_images(1221227) as images, get_backlinks(1221227) as backlinks;, then yes from what I can tell it creates a single pyflink gateway and executes both UDFs in the same python environment. However, it doesn't seem like a UDF can be 'warmed up' in advance, so calling SELECT get_images(1221227) as images; twice in a row will bootstrap the python environment twice, making it annoyingly time-consuming when messing around in the SQL Client CLI
Can you elaborate on that? I thought the executable is venv/bin/python3
Uh hm. I just checked and I also see the executable in a virtualenv. I must be remembering incorrectly! I thought this was one of the whole reasons we embarked on the conda support instead...but I guess not? Perhaps it has more to do with dynamically linked binary dependencies? Or maybe the virtualenv's un-relocatability?
Okay well scratch that reason then.
What OS and python version are you using? The env should contain a symlink to a system's python install (at leas on posix, not sure how windows behaves).
I just tried this on macOS:
$ python -V Python 3.10.7 $ python -m venv venv $ ls -l ./venv/bin/python lrwxr-xr-x 1 gmodena staff 33 Nov 1 09:49 ./venv/bin/python -> /run/current-system/sw/bin/python*
From https://docs.python.org/3/library/venv.html: one issue of distributing venv is that
_[...] scripts installed in environments should not expect the environment to be activated, their shebang lines contain the absolute paths to their environment’s interpreters. Because of this, environments are inherently non-portable, in the general case. You should always have a simple means of recreating an environment ( [...]_
This is usually mitigated on a cluster where workers/drivers have consistent python installations and the same operating system.
@tchin Sparks' doc explains things nicely https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html
Hm! Is this true? I would assume that Flink would have to unzip the virtualenv for every new taskmanager, but not every time you execute the UDF?
I should rephrase that since I just assumed a single UDF is being called per a query. If let's say you have 2 UDFs get_images and get_backlinks and did SELECT get_images(1221227) as images, get_backlinks(1221227) as backlinks;, then yes from what I can tell it creates a single pyflink gateway and executes both UDFs in the same python environment. However, it doesn't seem like a UDF can't be 'warmed up' in advance, so calling SELECT get_images(1221227) as images; twice in a row will bootstrap the python environment twice, making it annoyingly time-consuming when messing around in the SQL Client CLI
Woah. We need to investigate this. Good catch!
When you say "bootstrapping the python env", do you starting up a new python process (which is indeed time consuming)? Could you walk me though the debugging steps?
Do you maybe know if the sql client is executing your UDFs in a JVM thread or as a standalone python process? See https://flink.apache.org/2022/05/06/pyflink-1.15-thread-mode.html#test-environment for details.
The UDFs appear to be being executing inside of a process.
I've been trying to wrap my head around what exactly happens when executing a UDF, and what I've gathered so far (don't quote me) is that when it hits a UDF, it creates a py4j gateway in a separate thread, and then spins up a python sub-process that connects to the gateway (by telling python the port through a file which I find funny). There it does all the extracting of file archives and setting up the environment. It then submits python UDFs to that sub-process with a shutdown callback to let it know when it's done. Still don't really know where it does the submitting, but PythonFunctionFactory is involved somehow. Once I actually feel like I know what's going on I should make a diagram.
Functionally, it would make sense if it spins up an entirely new process, because since configuration is read at runtime, you can theoretically gives it different virtual environments in the middle of executing. I haven't tried it so maybe there's something special about those specific config options that prevent it, but if it does work it would look like something below:
SELECT get_images(1221227) as images; SET 'python.archives' = 'file:///home/path/to/pyflink_venv2.zip'; SET 'python.client.executable' = 'pyflink-venv2.zip/bin/python3' SET 'python.executable' = 'pyflink-venv2.zip/bin/python3' SELECT get_images(1221227) as images;
Maybe the config option to set it to thread mode would work too? hmmm...
Huh we've been doing everything on Yarn so far so I guess I overlooked this, but if I wanted to produce to Kafka or Hadoop using the SQL Cli I would need to generate a Kerberos keytab. Yarn is the only deployment method where you can forego the keytab and use the ticket cache. Looking at wikitech, it seems like generating a keytab is a non-trivial task? Or at least it's something I don't have permission to do. I feel like this is starting to get into the 'figuring out deployment' issue. @Ottomata
If you want to run Flink in k8s and write to HDFS, then this will be a problem: this is the k8s "kerbarrier".
However, we don't (yet?) have any Kerberos authentication down of Kafka, so you shouldn't need a keytab here. In production, we will want to produce/consume using the TLS kafka port on 9093, and for that we will need some custom client certificates, but we can deal with that when we get around to deploying anything in k8s.
The exact error I get is org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed when trying to produce to a topic. Tried the test topic and the platform-wiki-image-links topic from Gabriele's first event POC. Consuming from the topic works, so I wonder if there's different default permissions for producing/consuming.
Hm, maybe you can do a different topic? It might be better to do a temp topic with your name in it, so it is clear that is just you testing things. 'tchin.test0'?
Iff you are getting that, then something is weird!
I got sidetracked into getting a working content enrichment pyflink UDF SQL thing working. Finally got it!
https://gist.github.com/ottomata/bc583fac4cafc4d7651db463dc755c9e
My SQL query from ^ is:
-- Insert into the ottotest0.rc0.mediawiki.page_content_change topic. INSERT INTO ottotest0_mediawiki_page_content_change -- The results of the following query. -- Note that the fields selected here MUST be in the exact order of the sink table schema. -- They are inserted by field order in the result, not by name. SELECT meta, `$schema`, dt, changelog_kind, page_change_kind, wiki_id, page, performer, -- We need to create the new revision (row) field manually in order to -- supply one that has content_slots with content_bodies. -- All the fields here are the same except for enriched_content_slots row( rev_id, rev_parent_id, rev_dt, `comment`, comment_html, is_comment_visible, is_editor_visibile, is_content_visible, is_minor_edit, rev_sha1, rev_size, -- Use the enriched_content_slots field for content_slots -- as returned by the UDF in the subquery below. enriched_content_slots, editor ) as `revision`, prior_state FROM ( SELECT -- Get all the fields from mediawiki_page_change *, -- Hoist the revision row fields to the top level sub query result so we can use them -- in the new revision row() field construction above. -- (Apprently we can't use '.' in row() construction function. Flink errors otherwise. revision.rev_id, revision.rev_parent_id, revision.rev_dt, revision.`comment`, revision.comment_html, revision.is_comment_visible, revision.is_editor_visibile, revision.is_content_visible, revision.is_minor_edit, revision.rev_sha1, revision.rev_size, -- We dont need the original revision.content_slots. Instead, include a new 'enriched_content_slots' field -- as returned by the enrich_content_slots_with_body UDF. enrich_content_slots_with_body(meta.domain, revision.rev_id, revision.content_slots) as enriched_content_slots FROM mediawiki_page_change WHERE meta.domain <> 'canary' ) enriched;
If we had a catalog, a lot of that CREATE TABLE boiler plate would be reduced.
But not all of it! We apparently can't use the eventutilities-flink Java code from pyflink UDFs. If you try to access the py4j JVM gateway in python UDF code, this error will be thrown.
So, if we wanted to automate declaring result_types of pyflink UDFs, we'd have to do so in pure python, rather that try to re-use our Java code.
Or, we could implement UDFs in Java/Scala and use Flink's custom type inference.
So I was using Kafka Client 3.2.3, but I noticed you were using 2.4.1. Switched to that and it solves the cluster authorization issue. Gonna have to note that somewhere
I think the custom type inference in Java/Scala is really powerful, but if someone already is at a point where they're writing UDFs in Java then they probably already having working knowledge of DataTypes, When it comes to UX for people who only want to write python it might be worth just reimplementing JsonSchemaFlinkConverter, although I don't know how difficult that would be. Having something like
@flink_udf(output_schema="fragment/mediawiki/state/entity/revision_slots")
looks very sleek. (although it feels a bit wrong to have 2 codebases that do the same thing)
But now that I think about it, if the UDFs have to return a value with the specific structure of return_type anyways, would we be doing a disservice to the user writing the UDF by hiding what the return type has to be behind an abstraction? It's one thing if we derive the return_type from the type annotations since the user would be defining that (although because of ambiguous types that's not the best idea), but if we're deriving it from schemas then the user would have to go to the schema repo and figure out what they have to return anyways
I'm reminded of when I implemented the image suggestion feedback pipeline, the schema converter helped when typing the input event, but did not help in typing the output and I had to manually type the Cassandra table
(When it comes to this task of making an example reading/writing with Flink SQL and a UDF, With Andrew's example and also a more simplified example in the example repo, this can be marked as done; although there are still good conversations here)
if we're deriving it from schemas then the user would have to go to the schema repo and figure out what they have to return anyways
Yeah, you are right, and it is kind of weird to associate the return value of a UDF with an event JSONSchema. It makes total sense for the inputs and outputs of the streaming pipelines, but not so much for intermediate steps, like function calls.