Page MenuHomePhabricator

Evaluate Benthos as stream processor
Closed, ResolvedPublic

Description

Hi folks!

Filippo has been testing Benthos as stream processor, and I gave it a try as well for some of the ML use cases. It looks really simple and powerful, and in my opinion it would be nice to have a tool like that on some VMs or on K8s. Since the streaming use case is a big one and there are a lot of things going on I opened a task so we can discuss what is best to do :)


  • Filippo's use case: enrich webrequest_text with geoip data

The idea has been discussed several times, namely pulling messages from the Kafka topic webrequest_text, adding geoip data and then sending them back to a different Kafka topic. Druid then could pull messages from this new topics and add "real-time" segments to webrequest_128.

  • ML use case - replace ChangeProp for mediawiki.revision-score events

The idea is to read messages from the mediawiki.revision-create Kafka topic, wrap every message in another JSON that Lift Wing knows how to process and call the Lift Wing api accordingly. I have created a simple config to process only enwiki events, and tested it on stat1004. If you are curious: https://phabricator.wikimedia.org/P35321
Some interesting details:

  • TLS used for both Kafka and HTTP, works out of the box nicely.
  • There are "rate-limit" configs to add to avoid overwhelming services. In my case, I added one to send only few req/second to Lift Wing.

It seems to me that Benthos could be used in several places, and it should be really easy to package (it is a go binary that can be debianized or built as part of a Docker image and configured via Helm).

What does Data Engineering think? If there are other alternatives let us know, we don't want a proliferation of Stream processors/tools but it would be nice to have something like Bethos for simple use cases.

Event Timeline

Ping @gmodena, as we talked about this exact topic this morning :)

thanks for the ping @JAllemandou .

This looks really interesting, especially for ease of deployment. @elukey do you know if http_client calls are async? I could not figure it out by skimming through the doc.

FWIW Event-Platform is experimenting with what we call "simple stateless services" that seem to match your example here.
This sprint we are doing PoCs to evaluate pyflink and SQL. Let's compare notes in a couple of weeks.

This looks really interesting, especially for ease of deployment. @elukey do you know if http_client calls are async? I could not figure it out by skimming through the doc.

Afaics there is no specific mention of async in the code, but in theory it should/may use goroutines (but I am ignorant about Go so I am definitely not authoritative). The http_client module has a lot of functionalities like throttling, back pressure, retries, etc.. so maybe the author decided to avoid asyncronicity for this use case. Do you have any specific requirements in mind? If so I can try to test them :)

FWIW Event-Platform is experimenting with what we call "simple stateless services" that seem to match your example here.
This sprint we are doing PoCs to evaluate pyflink and SQL. Let's compare notes in a couple of weeks.

Very nice yes!

I'm in favour of further experiments with benthos, given that it appears to be so simple run and and so flexible.

We might think of Benthos as the Swiss army knife of stream processing, compared with the CNC milling machine of stream processing provided by Flink.
Therefore, I think that there is probably room for both types of tool in our environment. That's my feeling for now, anyway. I'm happy to review this as we get more experience with the various tools and deployment scenarios.

I had a chat with Filippo last week and it shouldn't be too difficult to package/deploy Benthos somewhere. We could create a Debian package and deploy it to centrallog nodes for a quick test, or maybe we could create a simple helm chart and deploy it on k8s (still not sure what cluster is best, maybe it depends on the use case).

Given the simple structure of the Benthos config for the streams, it may make sense to have a Benthos instance for every stream processing set up that one wants to set up. In this way we'd have, in k8s-land, a pod for each Benthos stream with its Prometheus metrics and logging, unmixed from the rest. Keeping all streams in one config may be possible but surely prone to error in the long term (at least IMHO).

We might think of Benthos as the Swiss army knife of stream processing, compared with the CNC milling machine of stream processing provided by Flink.

Indeed! We should evaluate this. We'd like to be able to do this via Flink if we can, because then we only have to maintain one streaming framework. However, in our experience so far, Flink is very hard, and this will only be possible if we can make using it very easy, perhaps via SQL.

Do you have any specific requirements in mind? If so I can try to test them :)

Our main use cases is making it easy to enrich streams with extra data, e.g. via a remote HTTP API call. It would be nice if the stream processor did not have to block on receiving this remote HTTP response before processing the next message, but on the other hand, if we do partitioning and Kafka consumer rights, we should be able to at least on parallelize on partition keys, and perhaps the blocking won't be so bad.

Do you have any specific requirements in mind? If so I can try to test them :)

Our main use cases is making it easy to enrich streams with extra data, e.g. via a remote HTTP API call. It would be nice if the stream processor did not have to block on receiving this remote HTTP response before processing the next message, but on the other hand, if we do partitioning and Kafka consumer rights, we should be able to at least on parallelize on partition keys, and perhaps the blocking won't be so bad.

Ack got it. Benthos is written in Go so it has better support for threading, I see some async http client keywords mentioned in the codebase but so far I am still not enough expert in the language/tool to give a good answer. Some docs to read though:

https://www.benthos.dev/cookbooks/enrichments/
https://www.benthos.dev/docs/guides/streams_mode/about/

The tricky thing about async calls in streams, is that the ordering of the events might get all messed up, as the calls will evaluate in an undetermined order. If order matters, you'll have to re-order at the end. We'd have to do this in Flink too, but we might be able to abstract it so the developer doesn't have think about it.

The tricky thing about async calls in streams, is that the ordering of the events might get all messed up, as the calls will evaluate in an undetermined order. If order matters, you'll have to re-order at the end. We'd have to do this in Flink too, but we might be able to abstract it so the developer doesn't have think about it.

https://www.benthos.dev/docs/configuration/processing_pipelines contains some interesting detail - the number of threads for the processors (basically where data enrichment would happen) can be tuned. I am planning also to ask to upstream the usage of the async http calls, but I am pretty sure that the use case that you outlined above has been taken into configuration (we are surely not the only ones with this use case in mind). I'll report back if I find anything.

(FYI) For this task and the work in T314981: Add a webrequest sampled topic and ingest into druid/turnilo we now have Debian packages for Benthos available for Buster and Bullseye

By the way, any Event Platform producter should do these things: https://wikitech.wikimedia.org/wiki/Event_Platform/EventGate#Purpose

  • Receives events of a specific schema version destined to a specific stream.
  • Uses EventStreamConfig to ensure that only events of the same schema lineage are emitted to a stream.
  • Augments the event with some custom defaults (e.g. setting date time fields or HTTP header data in the event).
  • Looks up the schema of the incoming event, and validates that the event data conforms to the schema.
  • Adds datacenter prefixes to the destination stream name to make the destination Kafka topic name.
  • Produces to Kafka.

Like I mentioned before, we're doing a lot of evaluation of how we can make this kind of simple stream processing easier, and indeed, maybe Benthos will really be it! @tchin and @gmodena have been seeing if we can do this with Flink SQL. @tchin is going to try to find some time to play with Benthos in the next 3 weeks and see how it might fit in.

By the way, any Event Platform producter should do these things: https://wikitech.wikimedia.org/wiki/Event_Platform/EventGate#Purpose

  • Receives events of a specific schema version destined to a specific stream.
  • Uses EventStreamConfig to ensure that only events of the same schema lineage are emitted to a stream.
  • Augments the event with some custom defaults (e.g. setting date time fields or HTTP header data in the event).
  • Looks up the schema of the incoming event, and validates that the event data conforms to the schema.
  • Adds datacenter prefixes to the destination stream name to make the destination Kafka topic name.
  • Produces to Kafka.

Like I mentioned before, we're doing a lot of evaluation of how we can make this kind of simple stream processing easier, and indeed, maybe Benthos will really be it! @tchin and @gmodena have been seeing if we can do this with Flink SQL. @tchin is going to try to find some time to play with Benthos in the next 3 weeks and see how it might fit in.

Sounds great! Thank you for the context and the update @Ottomata !

FYI I am working on making a more specific list of requirements spec for event platform producers:

@elukey thanks a lot for this live data! That's awesome!
I went to the Data Engineering office hours today to ask for some hints on how to reproduce what the json-webrequests-stats script does in analytics-land and got a lot of great suggestions. Also thanks to @Milimetric for giving me an example from where to start from.

I give you a preview of the current draft (still WIP) result: https://superset.wikimedia.org/superset/dashboard/webrequest-live/
I still need to polish some things, ask Data Eng. how to improve some details, and make another dashboard with the equivalent of the sum N aggregations.
But is a starting point and I think it's already kinda-usable in case of necessity during an incident.
Send me any feedback or comment if you have any.

Change 857519 had a related patch set uploaded (by Filippo Giunchedi; author: Filippo Giunchedi):

[operations/puppet@production] prometheus: add benthos jobs

https://gerrit.wikimedia.org/r/857519

Change 857519 merged by Filippo Giunchedi:

[operations/puppet@production] prometheus: add benthos jobs

https://gerrit.wikimedia.org/r/857519

Change 857544 had a related patch set uploaded (by Filippo Giunchedi; author: Filippo Giunchedi):

[operations/puppet@production] benthos: fix service name

https://gerrit.wikimedia.org/r/857544

Change 857545 had a related patch set uploaded (by Filippo Giunchedi; author: Filippo Giunchedi):

[operations/puppet@production] benthos: reload on config changes

https://gerrit.wikimedia.org/r/857545

Change 857619 had a related patch set uploaded (by Filippo Giunchedi; author: Filippo Giunchedi):

[operations/puppet@production] benthos: apply batching to webrequest_live

https://gerrit.wikimedia.org/r/857619

Change 857619 merged by Filippo Giunchedi:

[operations/puppet@production] benthos: apply batching to webrequest_live

https://gerrit.wikimedia.org/r/857619

Change 857544 merged by Filippo Giunchedi:

[operations/puppet@production] benthos: fix service name

https://gerrit.wikimedia.org/r/857544

Change 857545 merged by Filippo Giunchedi:

[operations/puppet@production] benthos: reload on config changes

https://gerrit.wikimedia.org/r/857545

Change 857648 had a related patch set uploaded (by Filippo Giunchedi; author: Filippo Giunchedi):

[operations/puppet@production] benthos: fix required 'content' for absented systemd::service

https://gerrit.wikimedia.org/r/857648

Change 857648 merged by Filippo Giunchedi:

[operations/puppet@production] benthos: fix required 'content' for absented systemd::service

https://gerrit.wikimedia.org/r/857648

@fgiunchedi @elukey I seeing some strange behaviour of the data in the dashboard, not sure if it's a me problem or a data problem, but reporting it in case it's the latter.

Basically it seems like the data for the text records is delayed by ~7-8 minutes at the moment while the ones for upload are all much less behind.
Is that at all possible?
I'm attaching a screenshot and a quick link to the charts in superset that graphs the webrequest_source field: https://superset.wikimedia.org/r/2103

hits-web-req-live-2022-11-16T17-14-39.358Z.jpg (362×564 px, 37 KB)

@fgiunchedi @elukey I seeing some strange behaviour of the data in the dashboard, not sure if it's a me problem or a data problem, but reporting it in case it's the latter.

Basically it seems like the data for the text records is delayed by ~7-8 minutes at the moment while the ones for upload are all much less behind.
Is that at all possible?

Yes very much possible, I believe the lag has been fixed by adding batching here: https://gerrit.wikimedia.org/r/857619 can you confirm that's the case?

Yes very much possible, I believe the lag has been fixed by adding batching here: https://gerrit.wikimedia.org/r/857619 can you confirm that's the case?

@fgiunchedi I'm not sure, yesterday night, after that patch was merged, I saw randomly delays in the catching of the data. Or at least that's what appears in the graph, than if it's benthos or any other parts of the pipeline that I don't know.
Right now for example the count(*) query returns up-to-date data, but the sum(response_size) one stops at 8:38, see screenshot.
Both queries are grouped by webrequest_source.

Screenshot 2022-11-17 at 09.45.00.png (1×1 px, 243 KB)

Change 858191 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/puppet@production] benthos: add snappy compression to kafka output settings for webrequest

https://gerrit.wikimedia.org/r/858191

Change 858191 merged by Elukey:

[operations/puppet@production] benthos: add snappy compression to kafka output settings for webrequest

https://gerrit.wikimedia.org/r/858191

And now (before the merge of the above patch) the data is back in sync. Hence it looks to me something that depends on live factors/loads.

The other actor that plays a role in this pipeline is Druid: https://grafana.wikimedia.org/d/000000538/druid?orgId=1&refresh=1m&var-datasource=eqiad%20prometheus%2Fanalytics&var-cluster=druid_analytics&var-druid_datasource=All&from=now-3h&to=now

My knowledge is rusty so I defer the final comments to DE, but IIRC the historical daemons in Druid are the ones responsible to build queries from segments and maybe sometimes they are a little overwhelmed if more people query data at the same time (see in the Historical panel the metrics related to response timings etc.., and also the GC timings for the same). The "live" segments that the webrequest_sampled_live supervisor lead probably to lower performances when trying to get aggregate metrics, compared to bigger/less-granular (in time) segments that we obtain with batch jobs.

Another thing that we should discuss is how many Kafka partitions of webrequest_{upload,text} to fetch data from. At the moment the two benthos instances on centralog nodes are handling 12 partitions each (of the 24 total), consuming a ton of messages and discarding the biggest part of them. We could think about consuming only 12/6 partitions in total, and this would reduce a lot the efficiency of the Benthos instances.

Caveat: how data is distributed in the Kafka webrequest_{upload,text} partitions by varnishkafka? At the moment we don't use any specific algorithm, so librdkafka sets its default consistent_random (see the partitioner setting):

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties

CRC32 hash of key (Empty and NULL keys are randomly partitioned)

In this case the "key" should be the json message that we are sending, but I still need to verify that. Is there any corner case that we can think of about clients getting "bucketed" in specific partitions and hence not visible if we instruct Benthos to pull less data?

And now (before the merge of the above patch) the data is back in sync. Hence it looks to me something that depends on live factors/loads.

Seems quite possible indeed, I don't have enough knowledge for an informed answer/investigation and will too defer to DE

Ok, after refreshing a bit the query in this superset chart that performs a SUM(request_size) GROUP BY webrequest_source it seems like the data is loaded in batches of ~5 minutes or some number of records that in the end take ~5 minutes to accumulate.

Basically the data shown is the same as the time passes until the new "batch" is loaded. For example in the last few minutes I've got the last data point in the graph at:

:46:08
:50:42
:55:15

This doesn't seem to happen with the graph that shows just the COUNT(*) GROUP BY webrequest_source.

elukey claimed this task.

Closing since we have been using benthos for a while :)