Page MenuHomePhabricator

RFC: Modern Event Platform: Stream Intake Service
Closed, ResolvedPublic5 Estimated Story Points

Description

T185233: Modern Event Platform describes the high level components that will make up the Modern Event Platform program in FY2018-2019.
T201068: Modern Event Platform: Stream Intake Service is the parent task for Stream Intake service planning and implementation work. It also collects the user stories/requirements for this service that were collected in Q4 of FY2017-2018 during stakeholder interviews.

This RFC outlines the current state of our event intake systems and some possible ways forward. The outcome of this RFC should be to choose an implementation option. This RFC is not intended as a design document.

How things work now.

The original EventLogging system uses Varnish and URL encoded JSON in HTTP query parameters to intake events from external clients. varnishkafka then sends a log of the request to Kafka, which is consumed by EventLogging python processes to decode and validate the events. The events are then produced as JSON strings to Kafka topics for downstream systems.

The EventBus HTTP Proxy service is an internal Tornado HTTP server that listens for POSTs of event JSON body. The EventBus service itself is a part of the EventLogging python codebase. Incoming events are validated against a schema, and then produced to Kafka for downstream systems.

Eventlogging/Architecture Has a description of how the original EventLogging event intake system works.
EventBus has some more information on the evolution of this original idea in EventLogging for production (non-analytics) services.

What we want

More detailed user story/requirements are in T201068: Modern Event Platform: Stream Intake Service.

We want a scalable unified event stream intake system for both analytics and production events. Currently analytics events must be one submitted one at a time asynchronously (with no client feedback) to Varnish as HTTP GET URL encoded query parameters. 'production' events are POSTed to the separate EventBus service. We'd like to unify these systems and add some features.

This stream intake service will retrieve schemas from the Schema Repository described in T201063 to validate events.

I'd really like it if this service was generic and well written enough to be used by others than Wikimedia. There is a need in the Kafka community for an open source JSONSchema based component like this.

Some history

EventBus as it is today grew out of discussions in T88459 and T114443. Much of the debate there was around the issue this RFC wants to address: to adapt EventLogging or to write something new. (In hindsight, it would have been difficult to write a NodeJS based service at the time, due to the lack of a functioning Kafka client (fyi node-rdkafka has been working great for us for the last year or so).) I was on the fence between these options, but was swayed by arguments for using an existing system, as were many others. We chose to implement the EventBus service in EventLogging. In doing so, we added code to make EventLogging understand how to use a new non-Mediawiki specific schema type (with no EventCapsule). This was done with the intention of hopefully one day unifying the analytics+Mediawiki schemas into a more generic event-schemas like conventions. If that EventLogging codebase unification work is to be done, this Stream Event Intake component would include that work.

However, I think we should take this moment to re-evaluate our decision to use the EventLogging codebase for event stream intake (especially since we want to expose this endpoint publicly), and consider implementing something more specific to the use cases described in T201068.

Decision points

Many of the desired features are either already handled by the EventBus service (part of EventLogging python), or could be added to it. However, we should take this moment and use the RFC to decide if we want to do so, or if we should start again with something else, either leveraging service-template-node or adapting something else, like Confluent's Kafka REST Proxy.

This RFC should be used to decide if we want to continue with EventBus (and EventLogging python codebase), or to use something else.

Option 1: Continue to use EventLogging + EventBus service

To do this, I'd include the following work:

  • Refactor EventLogging service.py code to support Python 3, latest Tornado, and review the async/coroutine handlers.
  • Move topic/config mapping out of event-schemas repository, and make service.py support remote topic configuration (from the Stream Configuration service.
  • Refactor some EventLogging internals to decouple the Mediawiki specific parts.
  • Refactor some EventLogging internals to decouple the on wiki schema registry
  • Solve this Kafka timeout + EventBus bug: T180017: Timeouts on event delivery to EventBus
  • (possibly) add client authentication and authorization

Option 2: Write a new service service-template-node

In 2015, our service-template-node based production services were fewer, and we had zero NodeJS Kafka client usages deployed in production. In 2018, we have several more nodejs prod services, are moving towards deploying them in Kubernetes, and have at least 2 NodeJS production services using Kafka. Given this situation, I believe that implementing a new service would not be more work than the changes needed to EventLogging.

Option 3: Adapt Confluent Kafka REST Proxy

I've been experimenting with REST Proxy to see if this would be possible, and I've concluded that it is.

To do this, we'd need to fork Kafka REST Proxy to add the following features:

  • Pluggable JSON Schema validation
    • This would be a pluggable API to allow JSONSchema lookup from a JSON value (and or topic name) to do validation of that value.
  • configurable topic name transformation
    • EventBus does some topic name transformation when it produces an event to prefix it with the data center name
  • Restricted topic -> schema mapping (possibly via Stream Configuration service)
  • Restricted API usage for frontend (no consumer API, /topics only)
    • This might be done with a separate front end proxy that only allows certain REST URIs (e.g. POST /topics) through

Recommendation

Stop using EventLogging/EventBus in favor of something else.

Since this RFC's goal is to decide if we should use something other than EventLogging/EventBus, it is not necessary to officially decide between Option 3 or Option 2 here. After exploring the REST Proxy codebase, I currently lean towards adapting it for our needs (Option 3). We may still encounter something that would make us change our mind and go with Option 2 instead (SRE pushback for JVM, bad performance, etc.). REST Proxy is pretty widely used by the Kafka community, and by using and building in JSONSchema support we'd be doing this community a favor. We may or may not be able to upstream our changes to Confluent.

Event Timeline

Ottomata triaged this task as Medium priority.Aug 14 2018, 9:07 PM
Ottomata created this task.
Ottomata renamed this task from [WIP] RFC: Modern Event Platform: Scalable Event Intake Service to RFC: Modern Event Platform: Scalable Event Intake Service.Aug 22 2018, 2:07 PM
Ottomata added a project: TechCom-RFC.
Ottomata changed the point value for this task from 0 to 5.Aug 29 2018, 2:18 PM

@daniel I heard you might have some questions/thoughts about this! Find me in IRC to chat?

@Ottomata I'm neck deep into SDC stuff, with deadlines looming... I don't have anything concrete to ask, just the general question of how much this impacts or is influenced by the "dependency engine" design we discussed a while ago. That discussion would be a lot easier had I written a proper spec for that yet - that'll probably happen in a few weeks, once SDC is wrapped up.

For this component, I don't think it matters much. The event intake is just about a stanadlone HTTP API that accepts and validates events. Later on we'll think about stream processing platforms. That'll be much more relevant to dep tracking engine requirements.

That's what I hoped, thanks for confirming!

Heya @fgiunchedi, could you take a look at this (and the parent) task and tell me what you think? We are considering scheduling this work for Q2. My current hope/suggestion is to use Kafka REST Proxy for this, which would mean eventually (probably not in next quarter) a publicly exposed Kafka REST Proxy (JVM based) HTTP produce endpoint. Thoughts?

I agree with the general sentiment that adapting EL/EventBus isn't going to be a whole lot less work than either writing a new service or adapting Kafka REST Proxy. So I'm +1 on this as far as the scope of the RFC goes.

A few questions/considerations in more detail wrt option 2/3: in case we go with the REST Proxy fork, and not all our changes are not accepted upstream, I'm assuming we'd have to keep maintaining our fork which I don't think is desirable. Is Kafka REST Proxy something you'd envision having in production on its own to allow Kafka consumption/production via HTTP? I think it'd be nice to have especially to gain Kafka "support" via HTTP clients only, with the idea being that we're much more comfortable with HTTP than Kafka.

If REST Proxy is something we'd want anyways in production, what about a blend of option 2 and option 3 ? Namely we'd have REST Proxy stock, to allow generic Kafka production/consumption via HTTP. For event intake however the entry point would be a separate service to handle what Kafka REST proxy can't out of the box, namely at least schema validation, topic restriction, validating frontend clients, etc. After validation said service would then pass on the request to REST Proxy or reject it.

I'm assuming we'd have to keep maintaining our fork which I don't think is desirable.

Yeah we'd likely be maintaining a fork, but hopefully most of the changes would be in an extended API class. It does require some modification to the http routes, but that logic is minimal.

Is Kafka REST Proxy something you'd envision having in production on its own to allow Kafka consumption/production via HTTP?

I think the publicly exposed instances would be restricted to producing only. But there's no reason we couldn't use other deployed instances of it to do both consuming and producing. We also have EventStreams, which is an HTTP consumer interface in front of Kafka too. EventStreams and Kafka REST Proxy for consuming work very differently, so there might be a reason to use Kafka REST Proxy (internally) for consuming instead of EventStreams. However in most cases, I'd likely recommend folks just use a Kafka client if they can.

If REST Proxy is something we'd want anyways in production, what about a blend of option 2 and option 3 ? Namely we'd have REST Proxy stock, to allow generic Kafka production/consumption via HTTP. For event intake however the entry point would be a separate service to handle what Kafka REST proxy can't out of the box, namely at least schema validation, topic restriction, validating frontend clients, etc. After validation said service would then pass on the request to REST Proxy or reject it.

No I don't think so. The only reason I can see that we'd go through the effort of deploying REST Proxy is for this scalable event intake component. Mayyyybe someone will want it for something else (consuming over HTTP) and we could use it for that, but I don't think there is a huge reason to deploy REST Proxy just for that. If we don't use Kafka REST Proxy for this Scalable Event Intake Service, it wouldn't be part of this year's Modern Event Platform program.

TechCom will be hosting IRC discussion on 26 September 2pm PST(21:00 UTC, 23:00 CET) in #wikimedia-office

BTW, in case you missed it, I just updated T185233: Modern Event Platform, the main parent task that describes the different components of Modern Event Platform, including this one. The rational for the task update ^ is in this comment: https://phabricator.wikimedia.org/T185233#4611779

I'll write down here some questions I'd like to discuss in this evening's meeting:

  • In general, rewriting from scratch a service that works doesn't come without risks. So, if the amount of work is comparable what are the reasons to dismiss our current software stack?
  • If we want to go for the full rewrite, are there specific limitations we foresee if use an existing software as the Kafka REST proxy?
  • Why would a re-implementation use node instead of python? Do we have specific reasons for that? Also, why stick to a single-threaded concurrency model for the execution environment? Both node and python/tornado don't seem like the best choices for a high-volume intake system. Any language with a good support for multithreading would probably work better of those; even PHP (on php-fpm or HHVM) has a better handling of concurrent requests. Did you evaluate the kafka client libraries in various programming languages?
  • If (as I expect) we could just maintain a "fork" of the REST proxy that does extend its api a bit, wouldn't it be possible to make the ability to extend the default behaviour a feature of the proxy we contribute upstream, allowing us to just maintain a proper extension to the basic code with a stable API to work with?

I'll try answer these in order of increasing difficulty:

Why would a re-implementation use node instead of python?

We could! As said, this RFC is to choose if we should use something different than EventLogging/EventBus, whether that is REST Proxy or something new. If we chose to write something new, perhaps there is a better choice than NodeJS. It mostly chose NodeJS as an example since we have a lot of service based infrastructure supporting NodeJS now and it seemed the lowest hanging solution. But maybe there is something better!

If (as I expect) we could just maintain a "fork" of the REST proxy that does extend its api a bit, wouldn't it be possible to make the ability to extend the default behaviour a feature of the proxy we contribute upstream, allowing us to just maintain a proper extension to the basic code with a stable API to work with?

It might! But it might be more difficult than it sounds, and require a bunch of refactoring of the upstream code. E.g. here I am adding a route to do JSONSchema validation based on a different Content-Type: https://github.com/confluentinc/kafka-rest/compare/master...ottomata:jsonschema#diff-cfbc94ec5dce805681e63a6375730df5

If we want to go for the full rewrite, are there specific limitations we foresee if use an existing software as the Kafka REST proxy?

I don't think so, other than that Kafka REST Proxy is not invented here (which I think is a pro). It could turn out that some of the features we need (e.g. configurable topic datacenter name prefixing) is difficult in Kafka REST Proxy, whereas if we wrote something new (or continued to use EventBus), we can more flexibly add whatever we want.

In general, rewriting from scratch a service that works doesn't come without risks. So, if the amount of work is comparable what are the reasons to dismiss our current software stack?

EventLogging is very Mediawiki and installation specific. It is possible to refactor it, but I'd estimate the risks of doing so comparable to the risks of writing something new and more than the risk of using something that is broadly used in places other than just WMF.

Ottomata renamed this task from RFC: Modern Event Platform: Scalable Event Intake Service to RFC: Modern Event Platform: Stream Intake Service.Sep 26 2018, 8:51 PM

One of the action items from the RFC meeting was:

  • investigate if we need to support GET event intake anymore

I just looked for webrequest logs over the last 3 days to see if there were any /beacon/event.gif usages, and I found 0. I conclude that we don't need to support GET intake at all! Yeehaw!

I spoke a little too soon ^. It looks like EventLogging extension code does not use the event.gif endpoint, it uses the same /beacon/event in an img src if sendBeacon is not available: https://github.com/wikimedia/mediawiki-extensions-EventLogging/blob/master/modules/ext.eventLogging/core.js#L264-L267

Still, this is javascript, not a server rendered <img> element, so we can still do POST. We can modify this conditional to POST the event (xmlhttprequest? fetch? submit form? whatever) instead of rendering a hidden img tag that will GET the event.

Chatted with @Joe today. He followed up on a point of misunderstanding from the IRC meeting. He wanted to know specifically why it would be more work to modify EventLogging as is to do what we want rather than to write something new. I'll try and elaborate on this point.

In the IRC meeting, another action item for me was to modify Option 2 to remove the NodeJS / service-template-node specific parts of it, and make it all about just writing something new. I'd like to push back on this, and keep service-template-node as Option 2. To write something 100% brand new would be more work and risk than to use EventLogging. service-template-node is not new, nor is production use of node-rdkafka. Option 2 is about using those two things together to build the Stream Intake Service.

As for Option 1, there are many places we could draw the line for the code refactor. If we wanted to, we could even deploy a separate instance of eventlogging-service (eventbus) for analytics event production with the code as it is right now. It would work. (Still gotta solve T180017). However, if we are going to keep using EventLogging, I'd like to do two main 'refactors' of the codebase to make things more maintainable and not mediawiki specific.

Refactor eventlogging/service.py
This is the server code that accepts events over HTTP POST, validates them to a JSONSchema, and produces them to Kafka. It currently Python 2.7, Tornado 4 and kafka-python. For this refactor, I'd want to upgrade to (at least) Python 3.5 and to an web framework that uses the Python asyncio paradigm. Tornado 5 can use Python asyncio, but I find the tornado API (including my own usage of it, e.g. convert_kafka_future) to be fairly confusing and not easy to understand or maintain. There are other web frameworks that use asyncio (Sanic might be the most appropriate here?) and if we are refactoring this code, we should consider using them rather than just upgrading Tornado.

Additionally, neither kafka-python or confluent-python-kafka work well with asyncio. There is aiokafka, which we could add an EventLogging handler for.

A refactor of service.py would require

  • a new Python version
  • a new (or upgraded) web framework
  • a new async event loop (Python 3's built in asyncio)
  • a new Python Kafka client

Refactor eventlogging schema handling
The original EventLogging codebase expects that schemas are all uniquely identified by a name and a revision in a Mediawiki Schema namespace on meta.wikimedia.org, and that events that use these schemas be encapsulated in a larger schema called the EventCapsule. When we built the eventbus service logic into eventlogging, we added conditionals to differentiate between the different schema repositories (local file system/git vs. on wiki), and also the different types of events (capsuled or meta-subobject). To make a clean non Mediawiki specific EventLogging, we'd want to remove the logic that differentiates between these two systems. This would mean branching or forking the EventLogging codebase to remove the extraneous logic, with the intention of one day porting all analytics events to the new MEP system and schema style. The original EventLogging code would need to be maintained and deployed separately from the new refactored code.


In both of these refactors, I'd argue that the changes required are more of a rewrite, than a refactor. I'd also argue that using service-template-node for this is less risk, as we wouldn't be deploying new web frameworks and new Kafka clients to production. A lot of the code in service.py deals with logging, routing, monitoring, etc. all of which is standardized in service-template-node.

Also, along the way, I would really like it if the Stream Intake Service we build is well written and usable in places outside of WMF. EventLogging is full of stream processing building blocks and ingestion systems and very WMF specific code (e.g. mw bot detection) and other features that a standalone event intake service doesn't need. In the long run, I'd like to replace our usages of other EventLogging components with MEP components, namely Stream Ingestion (into MySQL, files, Hadoop, whatever) with Kafka Connect and a Stream Processing platform (for augmenting events, UA Parsing, etc.). Those MEP components are supersets of what most of EventLogging was built to do many years ago, but with scalability and deployability in mind.

BTW, I am totally open to other suggestions than service-template-node for Option 2. It's just that when balancing risks and amount of work, it seems to me to be the obvious choice if we were to write something new. Perhaps there is another better idea? If so let's hear it!

I think in general it's ok to go with the nodejs rewrite - I only hope we've evaluated carefully that this service will not be very cpu-intensive; as we know, all systems that mock concurrency by using an event loop and non-blocking i/o are inherently limited in the amount of computational work they can do by running on a single CPU core.

We have already run in such issues with MCS, and the solutions proposed there are really ugly, to the point that it might merit a rewrite in another language.

Similar issues are presented by python/tornado, although uwsgi can take care of raising parallelism in the general case.

Was this considered? What is your evaluation in terms of how computationally intensive this process is?

Was this considered? What is your evaluation in terms of how computationally intensive this process is?

I'd have to say no, I'm mostly considering ease of implementation and more so risk. We use both python and nodejs now, so I'm really only evaluating those. A quick rundown of what this service will do for every request.

  • Deserialize a JSON string to a Javascript event object
  • Make a (possibly remote) request for a JSONSchema (unless it has already been cached), and deserialize it to a Javascript object.
  • Validate the event against the schema
  • Serialize the event object back to a JSON string and produce to Kafka

The above not should be CPU instensive, although I suppose a very complicated schema might be intensive to validate. We'll have a lot of rules in place about keeping our JSONSchemas simple, so hopefully that won't be a problem.

service-template-node uses service-runner, which is basically a prefork worker model (using nodejs cluster) (right?) to make use of multiple cores. This service will be horizontally scalable (I'd realllly like to deploy it in k8s...gotta talk to Alex there I guess :D ).

I think in general it's ok to go with the nodejs rewrite

Thanks, prototype is proceeding...now we need a name.

service-template-node uses service-runner, which is basically a prefork worker model (using nodejs cluster) (right?) to make use of multiple cores

Correct.

The above should be CPU instensive,

We're running the JobQueue and ChangeProp on nodejs using the same driver and those 2 perform much more work, but even for the most high-traffic rules/jobs we hit 30% CPU. Also, I believe consuming from Kafka is much more CPU-intensive then producing due to all the constant polling that the client does. With producing events we should be much more efficient given that the polls are essentially on-demand for producing.

Oops, typo, meant to write

The above should not be CPU intensive

(fixed in comment)

moving to inbox so techcom members can review if needed, and move to last call if there are no further questions or problems to work out.

TechCom is placing this on Last Call ending Wednesday December 5th 10pm PST(December 6th 06:00 UTC, 07:00 CET)

Can we close this?

Yes, this ticket is just the RfC itself, and it's been approved. Resolving.