Page MenuHomePhabricator

mediawiki-event-enrichment: issue async requests from ProcessFunction
Closed, ResolvedPublic

Description

Goal

To improve throughput of real-time pipelines, we should consider making enrichment function asynchronous. Initial tuning work (https://phabricator.wikimedia.org/project/view/6387/) suggests that:

  1. aiohttp/asyncaio co-courtines seem to be non pickable.
  2. Requests-futures (concurrent.futures based) also does not play nice with beam’s ThreadPool/pickling either.

TNG suggested to perform manual microbatching/windowing, then dispatching the entire batch/window together into the chosen async construction and forward the results once everything is collected.

The open method in the python MapFunction to create an operator-local ThreadPool and submit in the .flat_map() the http call to the ThreadPool; block until the call finishes and use the plain result (not the future) as output. This avoids the issue of pickling since this happens all locally in the operator without serde.

In the simplest form, where we do a microbatch and wait for all HTTP reqs on the microbatch to finish, the checkpoint marker only proceeds once the batch is done. This is safe (no data lost on crash) and retries (at least) the current microbatch.

Success criteria

  • we should validate the approach works as expected.
  • mediawiki-event-enrichment has been tuned and works reliably on YARN and DSE k8s
  • changes specific to mediawiki-event-enrichment have been upstreamed to eventutilities-python and documented in the design doc.
  • mediawiki_page_content_change enrichment is deployed and running in DSE

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript
gmodena renamed this task from [NEEDS GROOMING] eventutilities-python: issue async request from FlatMap context to [NEEDS GROOMING] eventutilities-python: issue async requests from FlatMap context.Mar 23 2023, 9:02 PM
gmodena renamed this task from [NEEDS GROOMING] eventutilities-python: issue async requests from FlatMap context to [NEEDS GROOMING] eventutilities-python: issue async requests from MapFunction context.Mar 23 2023, 9:24 PM
gmodena moved this task from Backlog to Investigate on the Event-Platform board.
gmodena moved this task from Investigate to Sprint 10 on the Event-Platform board.
gmodena edited projects, added Event-Platform (Sprint 10); removed Event-Platform.
gmodena renamed this task from [NEEDS GROOMING] eventutilities-python: issue async requests from MapFunction context to eventutilities-python: issue async requests from MapFunction context.Mar 24 2023, 8:39 AM
gmodena updated the task description. (Show Details)

I have been able to validate that the idea can work. I was able to init a thread pool local to an operator and execute parallel requests on a batch of elements (PoC implementation here). No pickling issues, because what is put on the wire is the function output (instead of the closure). I think we could fallback to using a ProcessWindowFunction for compute + managing sideoutput, without too much impact on our API. I have a rough implementation outside of eventutilities-python, that I got to work locally (docker/minikube). Next step will be measuring latency/throughput on YARN and possibly tune settings (batch size, thread pool size). If this works, we can look at integrating it into eventutilities-python.

gmodena updated the task description. (Show Details)
gmodena renamed this task from eventutilities-python: issue async requests from MapFunction context to mediawiki-event-enrichment issue async requests from MapFunction context.Apr 3 2023, 6:19 PM
gmodena updated the task description. (Show Details)

Next step will be measuring latency/throughput on YARN and possibly tune settings (batch size, thread pool size). If this works, we can look at integrating it into eventutilities-python.

Performance tuning is still in progress. The current implementation batches using with count window on a non-keyed datastream. The main things that need tuning (and experimentation) are:

  • window size (number of events per partition)
  • number of worker threads in the Process function thread pool
  • beam python function bundle size
  • taskmanager memory settings (process memory vs java heap vs managed vs consumer-weights).
gmodena renamed this task from mediawiki-event-enrichment issue async requests from MapFunction context to mediawiki-event-enrichment: issue async requests from MapFunction context.Apr 3 2023, 6:31 PM

We have a seemingly stable job running on YARN. Using an ad hoc process function that combines count and time triggers on a keyed data stream, seem to have reduced memory pressure on the beam workers.

Currently testing on k8s. I had to disable jemalloc, and default to the glib allocator, to prevent container OOMs. metrics (container mem saturation, partition lag) seem to have stabilized in that env too.

Other than that, the container is running Flink with default memory settings.

Ottomata renamed this task from mediawiki-event-enrichment: issue async requests from MapFunction context to mediawiki-event-enrichment: issue async requests from ProcessFunction.Apr 26 2023, 2:11 PM

Change 913245 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] page_content_change_enrichment - update with latest image and parameterization

https://gerrit.wikimedia.org/r/913245

Change 913245 merged by jenkins-bot:

[operations/deployment-charts@master] page_content_change_enrichment - update with latest image and parameterization

https://gerrit.wikimedia.org/r/913245

Change 913252 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] page_content_change_enrichment - apply values currently running in DSE

https://gerrit.wikimedia.org/r/913252

Change 913252 merged by Ottomata:

[operations/deployment-charts@master] page_content_change_enrichment - apply values currently running in DSE

https://gerrit.wikimedia.org/r/913252

Change 913973 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] page_content_change - bump image to v0.13.0 for bugfix

https://gerrit.wikimedia.org/r/913973

Change 913973 merged by Ottomata:

[operations/deployment-charts@master] page_content_change - bump image to v0.13.0 and disable jemalloc

https://gerrit.wikimedia.org/r/913973

Finally got latest code deployed (without stupid bug), and I've got memory issues again (I think).

I'm guessing there is some difference in settings somewhere than what Gabriele was running. It isn't the same as Gabriele's code, but it should be similar enough; I didn't expect memory issues.

the container is running Flink with default memory settings.

I'm not sure if this was true; there were some settings I noticed were applied but not in the helmfile. I added them:

"jobmanager.memory.process.size": "1500m"
"taskmanager.memory.process.size": "3072m"
"python.fn-execution.bundle.size": "5000"

Today, I deployed with Flink 1.17 and python 3.9. I did this mostly because it looked like it was behaving better with Flink 1.17 without the double task issue.

And, things did behave better, for about 3 hours. I had restarted this job from the earliest Kafka offset, so it was trying to process last weeks worth of data as fast as possible. For those 3 hours, it was doing pretty good!

This OOM looks different than the previous ones. In this case the container memory limit was not reached. This looks like an internal Flink buffer OOM (Java stack trace below). The TaskManager was not killed, but some internal (python worker?) process looks like it died, and all processing just stopped.

Also strange, one of the last log messages in the TaskManager is a single hugely long message, with a list of over 20000 RequestException and stack traces, like

requests.exceptions.RequestException: Failed requesting wiki content from fr.wikipedia.org for page_id 15610861, rev_id 203713756
[...]

This exception is from error handling I added in order to have a nicer error message and error event in the common case where a rev_id is not available from the MW API, usually due to a quick page deletion or revision suppression.

It looks like somehow (all of?) the Exceptions are being kept in some memory buffer?! I'm not sure why, but perhaps as a result of some concurrent futures thing? They should be caught, logged, and emitted as error events to the side output, and then forgotten.

I'll try to investigate more tomorrow.

Java OOM stack:

May 02, 2023 10:28:37 PM org.apache.beam.vendor.grpc.v1p48p1.io.grpc.netty.NettyServerTransport notifyTerminated
INFO: Transport failed
java.lang.OutOfMemoryError: Direct buffer memory
	at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
	at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
	at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:649)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:624)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:203)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:187)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.handler.codec.ByteToMessageDecoder.expandCumulation(ByteToMessageDecoder.java:541)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:97)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:277)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:391)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.beam.vendor.grpc.v1p48p1.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)

I wonder if we should explore python worker thread mode? Although from reading the article it doesn't seem like it is recommended for us, and I'm not sure if it applies to things other than Table API UDFs.

Docs here indicates that this OOM is due to a 'direct memory leak'...I'd say so! All the Exceptions are in memory!

Change 914780 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] page_content_change - bump image to v1.15.0-dev0 to debug OOM

https://gerrit.wikimedia.org/r/914780

Change 914780 merged by Ottomata:

[operations/deployment-charts@master] page_content_change - bump image to v1.15.0-dev0 to debug OOM

https://gerrit.wikimedia.org/r/914780

I deployed a version of the code that does not raise exceptions, and still got the same Direct Memory OOM.

This time the error log did not include 20K+ exception messages in one log line, so I suppose the Direct Memory issue is not related to that.

Perhaps I should just try increasing Direct Memory a bit? If we have a leak, this won't help, but maybe we don't? Direct Memory allocation does look a bit spikey.

Full error log here.

I decided to try again resetting python.fn-execution-bundle.size to it's default of 1000. At first I thought this might have an effect on direct memory usage, but after reading the docs on python.fn-execution.memory.managed I'm not so sure. It looks like python workers are using task slot managed memory by default, so I don't think this will affect direct memory usage? Not sure though. I'll let it run and see if it fails again in another couple of hours.

Still running! And also still backfilling!

Python RSS creeped up but seems to have stablized.

I added a graph that estimates the time until max kafka lag is 0, based on current rate of consumption. Looks like with this single task we can process a week in about 40 hours. 20 more hours to ago right now.

Glad this hasn't OOMed! I guess if we ever go back to increasing python.fn-execution-bundle.size we should also increase some other memory section, either taskmanager.memory.task.off-heap.size, or taskmanager.memory.managed.size or change the
taskmanager.memory.managed.fraction to weigh more for python.

I want to add the Exception handling back in, but I'm worried since the previous time it died there were 20K exceptions in memory, that something bad will happen. Need to troubleshoot that.

Change 915817 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] page_content_change - bump to v1.15.0-dev2

https://gerrit.wikimedia.org/r/915817

Change 915817 merged by Ottomata:

[operations/deployment-charts@master] page_content_change - bump to v1.15.0-dev2

https://gerrit.wikimedia.org/r/915817

Change 915834 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/deployment-charts@master] page_content_change - remove python.fn-execution.bundle.size setting

https://gerrit.wikimedia.org/r/915834

Change 915834 merged by Ottomata:

[operations/deployment-charts@master] page_content_change - remove python.fn-execution.bundle.size setting

https://gerrit.wikimedia.org/r/915834

Redeployed with exception being raised. Instead of raising a RequestException (from requests lib) with the response instance, I'm raising a custom MediaWikiApiError. I'm hoping that something was holding on to the RequestExceptions in memory because they had a reference to the response? I'm not really sure.

I'm started consuming from a day ago instead of last week.

Backfilled the full day, job is still running. Things look okay...except Python RSS is still slowly creeping up. I can't say for sure, but it looks like it is slowly creeping higher, compared to the previous run (without raising the exception) which seems to have leveled out at around 750MB, even when it was processing a full weeks of data.

The fact that python RSS is creeping up, even though we are not backfilling atm, is not good. Going to let this run until Monday...

Pretty sure this task is done. Resolving.