Page MenuHomePhabricator

Add support for Spark producers in Event Platform
Closed, ResolvedPublic

Description

Spark producers and consumers currently can access Event Platform's Kafka topics via EventGate and EventStream.

The goal of this task is to standardize producer/consumer APIs and access patterns.

Use cases

  • Dumps 2.0 reconciliation
  • Search Update Pipeline and weighted tags for image recommendations: T372912
  • WDQS Updater reconcile mechanism (currently uses a simple EventGate client from the spark driver)

Done is

  • Spark + Event Platform Kafka sink support is implemented in wikimedia-event-utilities repository that meets Event Platform producer requirements.
  • New wikimedia-event-utilities version is released

Usage example based in implemented Spark Sink

dataFrame 
    .write()
    .format("wmf-event-stream")
    .option("event-stream-name", "mediawiki.cirrussearch.page_weighted_tags_change.rc0")
    .option("event-schema-base-uris", "https://schema.wikimedia.org/repositories/primary/jsonschema")
    .option("event-stream-config-uri", "https://meta.wikimedia.org/w/api.php?action=streamconfigs")
    .option("event-schema-version", "1.0.0")
    .option("event-stream-topic-prefix", "eqiad")
    .option("kafka.bootstrap.servers", "kafka.local:9092")
    // this is the default topic used by the kafka sink if a row does not specify one
    .option("topic", "eqiad.mediawiki.cirrussearch.page_weighted_tags_change.rc0")
    .save();

The above should also work in pyspark with identical syntax.

Related

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript

I think for both of the listed use cases, what is mostly needed is producer support.

See also:

https://wikitech.wikimedia.org/wiki/Event_Platform/Producer_Requirements

Producing via eventgate would satisfy these requirements. Producing directly to Kafka would be nice though!

@dcausse @pfischer and I had a chat about this phab today. Here are some notes from our conversation,

Search has two use cases. Both of them require a producer, and no spark consumer.

  1. Low volume: very similar to Dumps 2.0 reconciliation.
    • Detection with spark
    • Re-injection into the same Flink pipeline
    • POST events from spark to EventGate.
      • HTTP client, runs from spark driver, no rate limiting no reusable pieces as of now
      • Same thing Dumps 2 should do, but that implementation would be Python.
  1. High volume: Image Suggestions + Weighted Tags.
    • Replace two topics with one and a unify all writes to ES
    • Image suggestion:
      • Currently batch, airflow + spark, create file on swift, notify kafka (via daemon), fetch file,
      • Future: users should write to a single topic, single component that writes to ES. Stream is the API boundary between search and growth.
    • If the pattern is to collect data to the Spark driver (single thread) and POST from there, do we need Spark at all?
      • We'll need to write a client anyway, TBD if/how to wrap it in spark.

To support high throughput use cases, we would like a Java library wrapped into a custom Writer (like we did with Flink)

Some things to take into account / investigate:

  • How to SerDe (typeInfo in Flink is tricky)? Serializer in Scala.
  • Do we write directly to Kafka or use EventGate?
  • Rate limit spark sink?

@dcausse @pfischer holler if I missed something :)

FYI, CanaryEventsProducer in wikimedia-event-utilities java has a postEvents method.

This could be factored out to a more generic event service producer.

High volume: Image Suggestions + Weighted Tags.

What is high volume in this case? I would love to have a Spark->Event Platform kafka sink for sure, but if producing to EventGate is easier and the volume isn't too high...this could be fine!

@Ottomata, regarding volumes to be expected for image recommendations: According to @Cparle commented, that image suggestions would be produced in weekly batches of 160k (90k commons + 70k non-commons) updates. I don't now how easily the batch frequency could be increased to lower the per-batch-size. After all, 160k/week would boil down to 1k/hour, which does not sound so much anymore.

I would try to keep eventgate produce throughput under 1000 per second. Both eventgate and kafka are horizontally scalable, so if we need to support more we can always add more eventgate replicas (and more kafka brokers), but I'd be hesitant to do so for weekly batch purposes.

If you want to produce as fast as you can to directly to Kafka though, that _should_ be okay. Batching will be better handled by the Kafka client.

But, to do this, you'd need a Spark + Event Platform producer integration.

Pasting from my comment in Slack:

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka
what would be needed is a converter from input dataframe -> output kafka dataframe, with value serialized with JsonEventGenerator

Once you have the DataFrame in the correct format, you can just use Spark's Kafka sink to write it.

Spark supports both streaming as well as batch read/writes to Kafka: https://spark.apache.org/docs/3.5.3/structured-streaming-kafka-integration.html.

This is a good example of a custom Kafka sink that builds atop the ForeachWriter API.
https://www.databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

This could implement the conversion logic @Ottomata mentions above, but the same API could be wrap an HTTP client that calls out to EventGate.

If we decide to go down the Kafka Sink path, we must be careful in re-using producer logic (validation, topic mappings, partitioning, etc) and be consistent
with both EventGate and eventutilities producers.

Once you have the DataFrame in the correct format, you can just use Spark's Kafka sink to write it.

This part could be non trivial IMHO. But we'll need it regardless of writing to Kafka or to EventGate.

For Flink, we piggy back on the typesystem (RowTypeInfo) to convert Row to Json object. This is done _before_
calling JsonEventGenerator. E.g. in RowDataToJsonConverters and other classes
in org.wikimedia.eventutilities.flink.formats.json.

I would like to investigate if this tool can help:
https://github.com/databricks-industry-solutions/json2spark-schema. And we also have a JsonSchemaSparkConverter that might already do some lifting.

I would like to investigate if this tool can help:

https://github.com/databricks-industry-solutions/json2spark-schema. And we also have a JsonSchemaSparkConverter that might already do some lifting.

Hm, what is needed for producing? We'd need to:

  • Get an event DataFrame corresponding to the event schema. JsonSchemaSparkConverter can help get the schema. We have utilities in Refine code that manage using this to go from stream name to Spark schema. Those could moved into a event-utilities for reusability.

If producing to EventGate:

  • serialize DataFrame to JSON string, collect and POST? Or implement an eventgate Spark sink?

If producing to Kafka:

  • Convert event DataFrame to ObjectNode, and use JsonEventGenerator to validate and serialize to JSON string. There might be some trickiness here with null values and needing to omit them from the final output. I don't remember if we do that in the Flink JSON converter, or if that is done by JsonEventGenerator.
  • Construct a DataFrame according to what Kafka sink requires, with value as event JSON string.
  • write to Kafka sink.

Some notes:

  • At least on dumps, we are not using Spark Structured Stream. Its just and old DataFrame.

...
If producing to EventGate:

  • serialize DataFrame to JSON string, collect and POST? Or implement an eventgate Spark sink?

If we are going to the trouble of providing an API, then it would be nice if I just give a dataframe (with presumably proper schema) and a URI to a jsonschema to a method. From that point and on everything is done for me, including validation and (rate limited) submission. Extra points if it uses Spark's Data Source API:

dataFrame
  .write
  .format("eventgate")
  .option("endpoint", "eventgate-analytics")
  .option("jsonschema", "mediawiki/page/change/1.2.0")
  .option("validate", "true")
  .option("batching", "1000")
  .option("rps", "1")
  .save()

If producing to Kafka:

  • Convert event DataFrame to ObjectNode, and use JsonEventGenerator to validate and serialize to JSON string. There might be some trickiness here with null values and needing to omit them from the final output. I don't remember if we do that in the Flink JSON converter, or if that is done by JsonEventGenerator.
  • Construct a DataFrame according to what Kafka sink requires, with value as event JSON string.
  • write to Kafka sink.

Reading above requirements suggests EventGate is plenty? But if not, then:

dataFrame
  .write
  .format("wmf-kafka")
  .option("topic", "t1")
  .option("jsonschema", "mediawiki/page/change/1.2.0")
  .option("validate", "true")
  .option("batching", "1000")
  .option("rps", "1")
  .save()

Note a DataSource API will work transparently for Scala, Java, Python Spark APIs. Even for R!

I started working on a kafka sink for T372912, that leverages JsonSchemSparkConverter from eventutilities-spark. It expects a Dataset<Row> which complies to the mapped stream-schema DataType. From that, it extracts the key (according to the schema config) and encodes both, key and value, as JSON in a single Dataset.map() step. Additionally, it also performs validation of the encoded JSON in that step.

So far I wrote that code in Java, but it feels a bit cumbersome when using Spark API that has been written in Scala. Would it be okay, to implement the sink in Scala instead (and expose it as part of eventutilities-spark)?

Early feedback would be highly appreciated: P69440

@Ottomata, I pushed a CR: https://gerrit.wikimedia.org/r/1077113, but my attempt to simply wrap the kafka sink seems a dead end. It works for batch processing (.write().format('event-stream').save()) but not for stream processing (.writeStream().format('event-stream').start()). Do we intent to intent to support spark stream processing? How would you approach providing a wrapper around the kafka sink? Alternatively, we could provide the necessary mapping/validation step (that happens as part of the wrapper) separately, so it could be called via df.map(…) but it feels cumbersome, especially when calling from PySpark.

So far I wrote that code in Java, but it feels a bit cumbersome when using Spark API that has been written in Scala. Would it be okay, to implement the sink in Scala instead (and expose it as part of eventutilities-spark)?

In general, a Java implementation would make python interop easier. Sometimes we need to get creative when wrapping scala with py4j. That said, I remember the Spark API not being the nicest to work with in java.
Works for me if you go for scala implementation here, in the worst case we can add some java boilerplate to make the API python friendly.

Do we intent to intent to support spark stream processing? How would you approach providing a wrapper around the kafka sink?

Chiming in: I would not block on the lack of support for spark-streaming. AFAIK there is no plan / desire to support it anytime in the near future.

Amazing!

I would not block on the lack of support for spark-streaming

Agree. If it is easy to do, then sure, but I don't think that Spark streaming support is in any of our plans or dreams right now.

.write().format('event-stream').save()

I guess we'd need to pass in Kafka cluster properties somehow? Those are just passed through to kafka sink via .options(...)?

so it could be called via df.map(…)

One thing that is slightly nicer about this idea is that it doesn't conflate the output format and sink writer as Spark usually does.

I like how Flink separates the concept of format / serializers from sink writers. E.g. it is possible to write our custom event-json format (with validation) to a file sink, or to a kafka sink. Spark is a little weird about this though.

Hm in https://spark.apache.org/docs/3.5.1/structured-streaming-kafka-integration.html#writing-data-to-kafka, that is basically what they are saying you should do. It is the user's responsibility to make the DataFrame match the requirements for the 'kafka' sink 'format'.

Change #1077113 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[wikimedia-event-utilities@master] Added event-stream-sink wrapper around kafka sink

https://gerrit.wikimedia.org/r/1077113

The batch-only sink is ready. Two questions remain open:

  • Naming: What should we name this sink?
    • eventstream
    • event-stream
    • wmf-eventstream
    • wmf-event-stream
  • Early vs late schema validation: Currently the validation happens per Row after it has been encoded as JSON during normalisation the event data. Alternatively, we could leverage the StructType extracted from the schema to validate an input dataset, however, that would only reveal missing fields, so I don't know if it's worth the effort.
  • How can we make sure that spark does not attempt to retry a stage after a validation error (IllegalArgumentException)? Does that have to be configured when scheduling the spark task from AirFlow? According to @JAllemandou, exceptions thrown from within the spark code are not retried.
  • Early vs late schema validation: Currently the validation happens per Row after it has been encoded as JSON during normalisation the event data. Alternatively, we could leverage the StructType extracted from the schema to validate an input dataset, however, that would only reveal missing fields, so I don't know if it's worth the effort.

It is true that only looking at the StructType can only discover a subset of the problems, missing required fields and type mismatch only?
@JAllemandou told me that there are some code in refinery we could possibly externalize and re-use here to do some early schema validation/normalization, but unsure if easy/worthwhile to re-use.

@dcausse, according to @JAllemandou, the validation-related code lives in HiveExtensions.scala that provides extensions to StructTypeField to test assumptions like isMapType. That does not get us much. As far as I am concerned, I would opt for sticking with just JSONSchema-validation. This way we only write compliant data to kafka since there is no way to mess with it afterwards (since we wrap the kafka sink). We could of course provide a dedicated UDF to perform StructType-based validation but we currently do not have a use case for that, so I would like to keep it out of this CR.

@Ottomata, any final thoughts on the naming of the output format? That appears to be the only open question left.

any final thoughts on the naming of the output format?

Either eventstream or wmf-eventstream. eventstream is what we went with for the name of the Flink SQL catalog type, buuut, in hindsight I think wmf-eventstream is a more accurate and less squatty name. So I think I prefer wmf-eventstream.

If we ever get back to using Flink SQL catalog integration (we may not), we can consider renaming to wmf-eventstream then. It isn't used anywhere, so renaming that shouldn't be hard.

@pfischer just curious about your thoughts about the last part of T374341#10208368 comment. Namely about merits of a custom .map step vs the custom sink.

Also, your latest sink code looks much more comprehensive! Were you able to get it working with .writeStream() after all? Not a need, just curious.

Oh, I see the other usages of 'event-stream', e.g. 'event-stream-config-uris'. Ergh, maybe with the hyphen is better after all? wmf-event-stream

Were you able to get it working with .writeStream() after all? Not a need, just curious.

@Ottomata sadly no, I did not put anymore effort in this since the Spark APIs are clearly not designed to be wrapped.

I updated the CR with (usage) docs and renamed things for consistent naming.

Change #1083161 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[wikimedia-event-utilities@master] Replace eventutilities-shaded with attached, classified artifact of eventutilities.

https://gerrit.wikimedia.org/r/1083161

Change #1083161 abandoned by Peter Fischer:

[wikimedia-event-utilities@master] Replace eventutilities-shaded with attached, classified artifact of eventutilities.

Reason:

fixed in original CR

https://gerrit.wikimedia.org/r/1083161

Folks, I'm trying to pickup T368755: Python job that reads from wmf_dumps.wikitext_inconsistent_row and produced reconciliation events. again. On that ticket, I was using EventGate to emit events, but it would definitely be better if I can reuse the work from this ticket and emit directly to Kafka.

I see that https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/1077113 has progressed a lot, but it is not merged yet.

Do we have an ETA? A rough one is fine.

( Also, this ticket has no owner. I'm being bold and assigning to @pfischer )

xcollazo renamed this task from [SPIKE] how can we support Spark producer/consumers in Event Platform to [SPIKE] How can we support Spark producer/consumers in Event Platform.Nov 1 2024, 7:57 PM
xcollazo updated the task description. (Show Details)
xcollazo removed a subscriber: lbowmaker.
gmodena renamed this task from [SPIKE] How can we support Spark producer/consumers in Event Platform to Add support for Spark producers in Event Platform.Nov 5 2024, 10:39 AM

@gmodena @pfischer can you please add a definition of done / deliverables for this task?

Change #1077113 merged by jenkins-bot:

[wikimedia-event-utilities@master] Added event-stream-sink wrapper around kafka sink

https://gerrit.wikimedia.org/r/1077113

@gmodena @pfischer can you please add a definition of done / deliverables for this task?

done.

Change #1087985 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[wikimedia-event-utilities@master] Strip null values as part of normalization

https://gerrit.wikimedia.org/r/1087985

Change #1087985 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[wikimedia-event-utilities@master] Strip null values as part of normalization

https://gerrit.wikimedia.org/r/1087985

For completeness, this change comes from testing the data source and finding that it could not validate events with optional fields set to NULL.We discussed this in Slack.

Copy pasting from that thread:

I’m getting the following errors from the validation step when attempting to send events of schema https://schema.wikimedia.org/repositories/primary/jsonschema/mediawiki/page/change/1.2.0.yaml :
error: instance type (null) does not match any allowed primitive type (allowed: ["string"])

level: "error"
schema: {"loadingURI":"#","pointer":"/properties/revision/properties/content_slots/additionalProperties/properties/content_body"}
instance: {"pointer":"/revision/content_slots/main/content_body"}
domain: "validation"
keyword: "type"
found: "null"
expected: ["string"]

error: instance type (null) does not match any allowed primitive type (allowed: ["string"])

level: "error"
schema: {"loadingURI":"#","pointer":"/properties/revision/properties/content_slots/additionalProperties/properties/content_format"}
instance: {"pointer":"/revision/content_slots/main/content_format"}
domain: "validation"
keyword: "type"
found: "null"
expected: ["string"]

Note that submitting the events via EventGate is successful.

My question is: These two properties, content_body and content_format are supposed to be NULL, from the perspective of my use case, as they will be enriched later. But then I can see from https://www.json.org/json-en.html, that, strictly, a JSON string cannot be NULL.

I can set them to the empty string but that seems wrong?

Change #1088290 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[wikimedia-event-utilities@master] Use functions.to_json over Row.json

https://gerrit.wikimedia.org/r/1088290

Change #1088551 had a related patch set uploaded (by Peter Fischer; author: Peter Fischer):

[wikimedia-event-utilities@master] Spark: Row -> ObjectNode mapping

https://gerrit.wikimedia.org/r/1088551

Change #1088290 abandoned by Peter Fischer:

[wikimedia-event-utilities@master] Use functions.to_json over Row.json

Reason:

Superseded by directly mapping from Row -> ObjectNode, see https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/1088551

https://gerrit.wikimedia.org/r/1088290

Change #1087985 abandoned by Peter Fischer:

[wikimedia-event-utilities@master] Strip null values as part of normalization

Reason:

Superseded by directly mapping from Row -> ObjectNode, see https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/1088551

https://gerrit.wikimedia.org/r/1087985

xcollazo changed the task status from Open to In Progress.Nov 8 2024, 4:04 PM

Change #1088551 merged by jenkins-bot:

[wikimedia-event-utilities@master] Spark: Row -> ObjectNode mapping

https://gerrit.wikimedia.org/r/1088551

Change #1091386 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[wikimedia-event-utilities@master] Shade eventutilities-spark to be able to create a fat jar with all dependencies.

https://gerrit.wikimedia.org/r/1091386

Change #1091386 abandoned by Xcollazo:

[wikimedia-event-utilities@master] Shade eventutilities-spark to be able to create a fat jar with all dependencies.

Reason:

Did not work.

https://gerrit.wikimedia.org/r/1091386