Page MenuHomePhabricator

[Event Platform] Event Platform and DataHub Integration
Closed, ResolvedPublic

Description

Background/Goal

Build a poll based ingestion that iterates over declared streams in event stream config, looks up the schemas for the streams, and then posts the updates to datahub.

Ideally, datahub would represent these as 'stream' datasets, not just Kafka topics, as a 'stream' is made up of multiple kafka topics. In the interim, we may want to just use Datahub's built in support for Kafka topics and post updates to kafka topic schemas.

We have a Java library that automates interacting with EventStreamConfig and schema repositories to make it easier to iterate through declared streams and look up schemas. We could use this java library to do this, or alternatively write Event Platform utility library code in another language (python?).


There are two types of datahub ingestion, poll based and event based. Thus far, we have not been using event based ingestion. While event based ingestion would be much nicer, implementing metadata events of changes to Event Platform streams and schemas might be difficult. To do this, we'd need to emit events anytime EventStreamConfig is deployed, which would somehow have to be linked to MediaWiki config deployments.

User Story
As an event platform engineer, I need to keep DataHub up to date with new and existing event streams and schemas

Key Tasks/Dependencies

  • Add Event Stream custom platform to DataHub
  • Deploy the metadata ingestion pipeline for the event stream schemas
  • Link the corresponding topics and downstream datasets to the stream (TBD via lineage and/or metadata replication. This falls into the larger consideration on how to propagate metadata between equivalent datasets stored across different platforms and refinements.)

Acceptance Criteria

  • A custom platform called Event Streams is listed in DataHub
  • The Event Streams data hub lists all the event stream data assets
  • All the schema elements are documented
  • Top level schema description is imported as the top level dataset documentation

Details

TitleReferenceAuthorSource BranchDest Branch
Support DataHub transformersrepos/data-engineering/airflow-dags!507tchinsupport-datahub-transformersmain
Add event streams datahub transformationrepos/data-engineering/airflow-dags!498tchinevent-platform-integrationmain
Customize query in GitLab

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript

Hm, not quite!

This task is about cataloging the streams that exist in Kafka, with their event schemas.

T307040 is talking about the event tables in Hive, which are created from the streams, but are not exactly the same thing (e.g. there are enriched fields in the Hive tables, like geocoded_data, etc.). But I agree the descriptions should be propagated from the event schemas into the Hive schemas...and then also into DataHub. I'll comment on that task.

Ohhhh, thank you for clarification!

No clue if this is the right approach, but perhaps we could use ingestion transforms to augment the existent kafka ingestion with Event Platform event schemas? From 5 minutes of reading docs, I think we'd do this by implementing a transformer that can transform the schemaMetadata aspect of a dataset entity?

def entity_types(self) -> List[str]:
        return ["dataset"]

    def aspect_name(self) -> str:
        return "schemaMetadata"

@Ottomata I think the above is the right approach (if we decide to do it)

JArguello-WMF updated the task description. (Show Details)
JArguello-WMF updated the task description. (Show Details)

@Htriedman we are picking this work up again. Is the POC that you did available in a repository on gitlab?

Since Datahub has the concept of platforms, I think the best way forward is to have a separate platform called Event Streams where the datasets under it are the streams defined in the stream config. We can then keep the Kafka platform for all the individual Kafka topics. Then what we can do is have a transform attached to the current Kafka ingestion recipe that will attach the schemas to the individual topics when supported but also at the same time insert the streams into the Event Streams platform. This way we can have the schemas on both the stream and its topics

@tchin as discussed today, that sounds like a good approach. Before deploying to production, let's wipe out the kafka metadata given that the original POC was imported under the kafka platform. I'll add these to the acceptance criteria.

Since Datahub has the concept of platforms, I think the best way forward is to have a separate platform called Event Streams where the datasets under it are the streams defined in the stream config. We can then keep the Kafka platform for all the individual Kafka topics. Then what we can do is have a transform attached to the current Kafka ingestion recipe that will attach the schemas to the individual topics when supported but also at the same time insert the streams into the Event Streams platform. This way we can have the schemas on both the stream and its topics

Here are some considerations that we discussed, that we need to further explore and decide on:

  • Explore creating a custom platform for Event Streams
  • Add top level event schema description as the dataset documentation. TBD on how to accomplish this given import options.
  • The schema import automatically adds subgroups under kafka based on the first dot segment of the schema name. In the production instance of DataHub there are also streams with the naming analytics/mediawiki/web_ab_test_enrollment. Can “/” be used as a separator to designate the top level category?
  • Can we import goblin lineage to propagate lineage from kafka > hive?
  • There would value to import hive event_raw database for completion of lineage events
  • Can we add a link to the event platform schema/datahub documentation to hive tables in event and event_sanitized? Lineage would be one way to trace this. Another would be to add links in the documentation to datasets with equivalent schema both upstream and downstream. This falls into the larger consideration on how to propagate metadata between equivalent datasets stored across different platforms and refinements.
  • Some of the kafka topics are remnants of tests and misconfiguration/misnamings. There is an option to add them to an exclusion list. Ideally we'd delete these in Kafka, otherwise there is an exclusion list.
  • Given that the prod datahub has the event streams current Kafka metadata can we delete and reimport all the Kafka metadata? If a fresh backup is not available it would be have one handy
  • Is there a way to add ownership data to event schema json and import it from there? This would benefit Metrics Platform work and allow alerting the right parties about event publishing errors. Some discussion about adding this data already happened https://phabricator.wikimedia.org/T201063#4546544
  • What is the best way to ingest the metadata? Datahub transformer vs airflow vs TBD?

After experimenting a lot, I have a Datahub transformer for Kafka that generates an Event Streams platform, adds description, schema, and path. However, I don't know if it should be a transformer since it's doing a bit more than just transforming.

Explore creating a custom platform for Event Streams

Done fairly easily. The documentation shows curl or some ingestion job, but there's an equivalent way to do it through Datahub's python library.

Add top level event schema description as the dataset documentation. TBD on how to accomplish this given import options.

I discovered that Datahub actually distinguishes between user-edited metadata and ingestion-created metadata. This makes any kind of ingestion we try worry-free.

The schema import automatically adds subgroups under kafka based on the first dot segment of the schema name. In the production instance of DataHub there are also streams with the naming analytics/mediawiki/web_ab_test_enrollment. Can “/” be used as a separator to designate the top level category?

The slashes can be used as a separator. However, the slashes in this case are for the names of the schema, and not a Kafka topic (Kafka doesn't allow slashes in its topic names afaik). I don't know why it's in Datahub. I think it would be cool if there was another platform that showed which streams are using which schemas. Maybe linking them up via lineage?

Speaking of which, connecting up the topics to the stream via lineage to me seems a bit weird, but also probably the better way over just having links in the description. Are streams considered upstream or downstream to topics? Would it look something like this:

Screenshot 2023-08-22 at 1.10.20 PM.png (894×2 px, 193 KB)

The streams and the topics might be able to be compressed in the lineage view if they're marked as related to each other. Would have to look into this further.

Given that the prod datahub has the event streams current Kafka metadata can we delete and reimport all the Kafka metadata? If a fresh backup is not available it would be have one handy

From what I can see, there is a way in Datahub to mark things as 'removed' so it doesn't show up in the ui without actually deleting it. We could perhaps run it over the entirety of the Kafka metadata, and then when ingesting the topics we want to check if it already exists in Datahub and mark it as not removed.

From the recent meeting:

  • Event Streams will be the name of the platform
  • Streams are upstream to Kafka topics

@BTullis we'll need the SRE team's help with the deployment of the event platform schema ingestion into Datahub. The deployment involves a) creating the event steams custom platform and
b) deploying the ingestion code/transformer

A couple of questions:

  • Do we have database backups and/or can we take one before we deploy the above? The risk is small that something will go wrong, however this is a good precaution
  • Is there an option for us to delete/wipe the Kafka data platform schemas so that they are re-ingested afresh? As part of the previous POC we imported the Event Schemas under Kafka, and would like to remove them permanently as they will reside in a custom platform

While adding a workaround to T344235, I noticed that additionalProperties isn't very well represented in DataHub.

"custom_data": {
    "additionalProperties": {
        "properties": {
            "data_type": {
                "type": "string",
                "enum": ["number", "string", "boolean", "null"],
            }
        }
    },
    "propertyNames": {
        "maxLength": 255,
        "minLength": 1,
        "pattern": "^[$a-z]+[a-z0-9_]*$",
    },
},

Just shows up in DataHub as a Struct with no defined nested fields (which I guess makes sense, but is not helpful).

Some of the kafka topics are remnants of tests and misconfiguration/misnamings. There is an option to add them to an exclusion list. Ideally we'd delete these in Kafka, otherwise there is an exclusion list.

Could we just only import if there is a corresponding event stream config entry?

Is there a way to add ownership data to event schema json and import it from there?

We shouldn't do this in the schema, but this could be added to event stream config.
https://phabricator.wikimedia.org/T273235#6791925

Ahoelzl renamed this task from Event Platform and DataHub Integration to [Event Platform] Event Platform and DataHub Integration.Oct 20 2023, 4:51 PM