We need a way to identify active streams for ingestion into Hadoop. We also need a way to monitor them to A. be sure that desired data is being ingested properly and B. that all data for a given hour has been imported.
We currently use a complicated regex topic include list with Camus and Refine to do ingestion into Hive. This task is about replacing that list with something smarter, and using it to build both automated ingestion and monitoring.
This is relevant to both T249261: Vertical: Migrate SearchSatisfaction EventLogging event stream to Event Platform and T241241: Produce an instrumentation event stream using new EPC and EventGate from client side browsers, and will also help improve our present system. This task is not about replacing Camus; that will be done in T238400: Evaluate possible replacements for Camus: Gobblin, Marmaray, Kafka Connect HDFS, etc.
I'll describe the current state of ingestion and its problems here in the description, and then work out ideas in the comments.
How things work now
Camus: Kafka -> HDFS raw JSON data
We have 3 different Camus jobs for event data. Each job selects which topics to import via topic include and exclude regexes.
Camus Job 'eventlogging'
Topics like 'eventlogging_SearchSatisfaction' are ingested into /wmf/data/raw/eventlogging.
The per topic imported directories in end up looking like
/wmf/data/raw/eventlogging/eventlogging_SearchSatisfaction/hourly/2020/04/10/10
Camus Job 'mediawiki_events'
Topics like 'eqiad.mediawiki.revision-create' are ingested into /wmf/data/raw/event.
The per topic imported directories in end up looking like
/wmf/data/raw/event/eqiad_mediawiki_revision-create/hourly/2020/04/10/10
Camus Job 'mediawiki_analytics_events'
High volume topics like 'eqiad.mediawiki.api-request' are ingested into /wmf/data/raw/event by a separate Camus job. High volume topics take longer to import, so we run a separate job to keep them from starving out the lower volume ones.
The per topic camus imported directories in end up looking like
/wmf/data/raw/event/eqiad_mediawiki_api-request/hourly/2020/04/10/10
Camus Job 'mediawiki_job'
Topics like 'eqiad.mediawiki.job.refreshLinks' are ingested into /wmf/data/raw/mediawiki_job.
The per topic imported directories in end up looking like
/wmf/data/raw/mediawiki_job/eqiad_mediawiki_job_refreshLinks/hourly/2020/04/10/10
Refine: HDFS raw JSON data -> Hive (parquet) tables (augmented by TransformFunctions)
These jobs import raw JSON data from Camus imported HDFS directories into parquet backed Hive tables. There are separate jobs so we can specify different base camus directories (e.g /wmf/data/raw/eventlogging vs /wmf/data/raw/event) and different Hive partition capture group regexes (e.g. eventlogging_(.+) vs (eqiad|codfw)_(.+)).
Refine job 'eventlogging_analytics'
Uses schemas from meta.wikimedia.org and refines from
/wmf/data/raw/eventlogging -> /wmf/data/event into the Hive event database with some tables like 'Edit' excluded.
Camus per topic directories in /wmf/data/raw/eventlogging look like 'eventlogging_SearchSatisfaction', and table names are extracted via eventlogging_(.+).
Refine job 'event'
Uses schemas from schema.discovery.wmnet and refines from
/wmf/data/raw/event -> /wmf/data/event into the Hive event database with some tables like 'mediawiki_recentchange' excluded.
Camus per topic directories in /wmf/data/raw/event look like 'eqiad_mediawiki_revision-create',
and datacenter and table names are extracted via (eqiad|codfw)_(.+)
Refine job 'mediawiki_job_events'
Infers schemas from JSON data and refines from
/wmf/data/raw/mediawiki_job -> /wmf/data/event into the Hive event database with some tables like 'EchoNotificationJob' excluded.
Camus per topic directories in /wmf/data/raw/mediawiki_job look like 'eqiad_mediawiki_job_refreshLinks'
and datacenter and table names are extracted via (eqiad|codfw)_(.+).
Refine job 'eventlogging_legacy'
This job is soon TBD, but I thought I'd include it here to illustrate what we need for the migration process. We'll need to add a new Refine job for T238230: Decommission EventLogging backend components by migrating to MEP that uses schema.discovery.wmnet to lookup migrated legacy EventLogging schemas instead of meta.wikimedia.org. This new Refine job will eventually replace 'eventlogging_analytics' once all EventLogging streams are migrated to MEP.
Problems:
- Legacy EventLogging topics are not prefixed with the datacenter name. Their Hive tables do not have a datacenter partition. This fact makes their ingestion jobs harder to to unify with the other event ingestion jobs.
- There is no consistent way to identify which topics should be imported. This is mostly a Camus problem, but could also be one for Refine too. Refine can be configured to consider all data it finds in a base directory path (with some hardcoded exclude lists). But The data in HDFS that Refine uses is put there by Camus, and Camus sees all topics in Kafka as potential for ingesting to HDFS. We need a way to dynamically configure each Camus job's topic include list. I started this work in https://gerrit.wikimedia.org/r/c/analytics/refinery/+/593047, but on more thought I'm not sure that will be enough.
Outcome
- EventStreamConfig is used to declare streams and whether or not canary events are enabled.
- wikimedia-event-utilties, a Java library for discovering streams and configs was developed.
- A ProduceCanaryEvents job was created in refinery-source to produce canary events to specific streams once per hour.
- Camus was modified to integrate with EventStreamConfig via wikimedia-event-utilities to discover streams and topics to ingest.
- CamusPartitionChecker was modified to integrate with EventStreamConfig to automatically check topics that have canary events enabled.
- All 3 event service Camus jobs use EventStreamConfig for stream topic discovery.
- CamusPartitionChecker for eventgate-analytics-external and eventgate-analytics (but not eventgate-main) is configured to only check stream topics that have canary events enabled.
All this together means that:
- An event stream that is added to $wgEventStreams (AKA stream config) will automatically be ingested by a Camus job.
- Any stream with canary_events_enabled: true will automatically have canary events produce to it once an hour, and will be monitored to be sure that some data is ingested each hour for those streams.