Page MenuHomePhabricator

[EPIC] Streaming and event driven Python services
Closed, ResolvedPublic

Description

This is a parent task to capture efforts related to development and operation of Python based stateless streaming services.

Background

This Epic was informed by the following SPIKEs;

Goals

This Epic spans the following tasks.

  • Flink wrappers and helper libraries should be moved into a dedicated git repo with packaging and CI. https://phabricator.wikimedia.org/T324746
  • Flink wrappers and helper libraries should integrate with Table API. We should allow injection of UDFs (ideally cross language). https://phabricator.wikimedia.org/T324953
  • We should provide scaffolding to bootstrap Python based services.
  • We should provide utilities for local experimentation and unit testing. For instance, I would like to be able to inject mocked Sources/Sink and operate with local json files before rolling out to YARN. https://phabricator.wikimedia.org/T324951
  • We should streamline packaging of pyflink applications, and ideally integrate with the shared Flink docker images
  • Sideoutput error reporting should be made composable and more robust.
  • Metrics and monitoring should be standardized.
  • Deployment should be standardized using WMFs Deployment Pipeline.

Done is:

  • Implement 'production' version of the Media Wiki Enrichment service in PyFlink using the utilities and capabilities implemented as part of this Epic ticket - running on YARN
  • Java/Scala implementation of the enrichment service is archived/switched off

Details

TitleReferenceAuthorSource BranchDest Branch
Add link to WIP documentationrepos/data-engineering/eventutilities-python!11gmodenaadd-doc-linkmain
Customize query in GitLab

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript
gmodena renamed this task from [EPIC] Streaming and event eriven Python services to [EPIC] Streaming and event driven Python services.Dec 7 2022, 4:46 PM

@gmodena, I've been trying to write tests for eventutilities-python, so that we could more easily improve and add things (like error event side output, etc.).

I've spent two days struggling with writing and testing a simple enrichment pipeline. I've even simplified and am just trying to write a simple python datastream enrichment map function that goes from one stream with a schema to an output with a different schema. I could not get this to work!

It took me forever to realize how this works for the page content enrich pipeline: the output is the same schema as the input! If the output schema is different, we can't just treat the Row in the map function as a dict. Doing so will result in a ValueError being thrown when trying to assign a field to Row that the Row doesn't already know about.

I tried to work around this by recursively converting the Row to a dict before providing it to the map function. This works fine, but we have no way to convert a nested dict back to a Row recursively (unless we write it ourselves). (I tried RowTypeInfo.from_internal_type, but I couldn't get it to work?).

I think if we want to be able to treat the event like a Python dict, we are going to have to implement custom recursive converters in Python between pyflink Row <-> and python dict.

I'd hope we can do somethign like this:

def enrich_fn(event: dict) -> dict:
    event['enriched_field'] = 'enriched value'
    return event

# gets the source and sink row types via EventDataStreamFactory
with stream_manager(source_stream_name='...', sink_stream_name='...', sink='kafka...?') as stream:
    stream.map(enrich_fn)
    stream.execute()

To do this, the functions users implement all have to work with dicts, which will require conversion of the input data stream, and also conversion of the final output datastream back to Row of the output RowTypeInfo.

Okay, I think I have something working?

We can already easily recursively convert Rows to dicts.

dict_to_row will recursively convert any dicts that should be Rows to Rows using the RowTypeInfo.

This allowed me to make the stream manager stuff always pre-convert the datastream to dicts, and then convert back to the output Row type before sending to the sink.

In this way, all user provided map, filter, etc. functions work with dicts.

Still SUPER WIP, but I got it working with our Kafka sink stuff, and was able to output using a different schema.

It took me forever to realize how this works for the page content enrich pipeline: the output is the same schema as the input! If the output schema is different, we can't just treat the Row in the map function as a dict. Doing so will result in a ValueError being thrown when trying to assign a field to Row that the Row doesn't already know about.

I tried to work around this by recursively converting the Row to a dict before providing it to the map function. This works fine, but we have no way to convert a nested dict back to a Row recursively (unless we write it ourselves). (I tried RowTypeInfo.from_internal_type, but I couldn't get it to work?).

I think if we want to be able to treat the event like a Python dict, we are going to have to implement custom recursive converters in Python between pyflink Row <-> and python dict.

Correct, this is not implemented yet. Last quarter we ended up prioritising the rollout of a python version of mediawiki stream enrichment over API completeness.
Let's address this missing bit ASAP though. We'll need it, among other things, to produce messages to an error topic.

I was wondering if we could reuse the projection/row creation primitives from JVM eventutilities, but it turned out that would require additional (non trivial) wrapping. Doing the conversion in Python should be more straightforward.

Okay, I think I have something working?

Terrific!

LGTM. Maybe we can test it out within the scope of https://phabricator.wikimedia.org/T326536?
Would be great to have some unit tests (more in general), but it's tricky with the current CI setup / java deps. It may be be time
to fix that properly.

I do wonder how much overhead this conversion will introduce. It should not be too bad (especially at low throughput),
but something we might want to instrument.

Ottomata claimed this task.
Ottomata updated the task description. (Show Details)

Being bold and resolving.