Page MenuHomePhabricator

Refine to Hive with Airflow – Handle Late-Arrived Events
Closed, ResolvedPublic

Description

We need to take care of late-arrived events coming from Gobblin. Some events could arrive in the raw repository several hours after importation.

The systemd based Refine scheduler handles arrival of raw late events by scanning the past 26 hours for data that needs refined. Even if an hourly partition has already been refined, late arrival of raw data within those 26 hours will cause the relevant partitions to be automatically re-refined.

This will not be the case with the new Airflow based RefineDataset job. We could consider implementing something similar by using per-partition done flag timestamps for Hive+Parquet based tables. However Iceberg based tables are not compatible with per-partition done flags. We'll need another mechanism.

To address this, we propose the following solutions:

Alert on late arrival and manually rerun Airflow tasks

It is possible that late arrival happens infrequently enough that an alert and manual rerun would be sufficient.

Run Another Job in Airflow ~refine_late_arrived_events:

  • Create a new job in Airflow to check for late-arrived events.
  • Compare the Airflow task run timestamp with the timestamp in the Gobblin done flag.
  • Trigger the necessary refine tasks rerun/scheduling based on this comparison.

Add a New Step in the Newly Created Refine Airflow DAG:

  • Integrate a new step into the Refine Hive hourly DAG.
  • The step will wait several hours before performing the time difference check.
  • Launch the necessary refine tasks rerun/scheduling based on the time difference check.

Other Solutions

  • Evaluate and implement other potential solutions to ensure the timely processing of late-arrived events.

Details

Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Update Gobblin jar for new metricsrepos/data-engineering/airflow-dags!1527joalupdate_gobblin_jarmain
Update gobblin schedulesrepos/data-engineering/airflow-dags!1523joalupdate_main_gobblin_schedulesmain
Update flaging source folder in dagsrepos/data-engineering/airflow-dags!1511joalupdate_processed_flag_in_dagsmain
Update webrequest to flag source path at successrepos/data-engineering/airflow-dags!1508joaladd_webrequest_input_path_markmain
Grow main gobblin webrequest_frontend map memoryrepos/data-engineering/airflow-dags!1497joalfix_main_gobblin_resources_3main
Second attempt at fixing main gobblin resourcesrepos/data-engineering/airflow-dags!1496joalfix_main_gobblin_resources_2main
Fix main gobblin mapper memoryrepos/data-engineering/airflow-dags!1495joalfix_main_gobblin_resourcesmain
Add map resource configuration to main gobblin jobrepos/data-engineering/airflow-dags!1494joalupdate_main_gobblin_resourcesmain
Fix main and an_test Gobblin jobs with new jarrepos/data-engineering/airflow-dags!1480joalfix_main_gobblin_with_new_jarmain
Update main gobblin jobs for publisher metricsrepos/data-engineering/airflow-dags!1477joalupdate_main_gobblin_jobsmain
Update an_test gobblin job jarrepos/data-engineering/airflow-dags!1474joalupdate_an_test_gobblin_jarmain
Update analytics-test gobblin to use new publisherrepos/data-engineering/airflow-dags!1468joalupdate_an_test_gobblinmain
Show related patches Customize query in GitLab

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

The first one is a custom sensor that succeeds when the source partition has seen an update (check file mtime?)
The second task is a custom operator that clears the dag_run, which would re-execute it all.

This has an additional benefit: a dag (using ExternalTaskSensor) can choose to depend on the first task for quicker results, or if they want to be really sure the data is complete they can depend on the latter task. I suppose cascading reruns would make depending on the second one not really that useful though?

Discussed this with @Antoine_Quhen a bit in a 1:1 today.

I wonder why we have late events of this kind. We partition by ingestion time, meta.dt (see also comment at T267648#8995454). Perhaps...it is because in both eventgate-wikimedia and in wikimedia-event-utilities (use by Flink apps to produce JSON events), we do not set meta.dt if the client has provided it.

Event Plaform/Producer Requirements says:

All generic producer clients and libraries [...] SHOULD set the meta.dt field to indicate the event's ISO-8601 'system ingestion time' by the library.

I wonder if we changed this to a MUST and made these always set meta.dt to current timestamp, would the 'late' events would go disappear?

@Antoine_Quhen is going to get a list of streams that have these late events. I think it is unlikely that any internal producers events (Flink, MediaWiki EventBus, etc.) set meta.dt. Events produceable via eventgate-analytics-external can be come from the internet (bots, browsers, etc), and it is very possible they could set meta.dt.

If we decide to have library set meta.dt, we should verify with a few users, e.g. Search and Metrics Platform, if this will be an issue. I doubt it will, but we should check.

Today we did a bit of pair coding but ran out of time. Parallelized Antione's snippet here:

https://gitlab.wikimedia.org/-/snippets/166

Ahoelzl set the point value for this task to 3.Sep 13 2024, 6:32 PM

I ran an analysis over the /wmf/data/raw/event folder (event-platform events not yet refined) on Friday 13th of September.
The code and walk-through results can be found in this phab paste.

Summary of results:

  • 1.3% of files had a modification date later than 1h20 after the calendar-time of the parent folder (defined by year/month/day/hour values) [21545 late-event files for 1679350 total files]
  • Among those late-event files, ~5k were empty (_IMPORTED flags), leading to 0.98% of late files containing data
  • Among those late-non-empty-files, ~1k were containing canary-events only (not valuable data), leading to:
    • 0.92% of files contain actual late data (15308 over 1679350)
    • This represents 0.79% of data size (~173Gb over 21.9Tb)
    • We find late data in 96% of our datasets (232 over 269)
    • We find late data in 80% of our folder hours (1720 over 2150)

About the late-non-empty-files:

  • 17 hours contain late-non-empty-files for more than 200 datasets. We should investigate those hours (see the list here) to understand what happened as it feels platform related (maybe EventGate? Or gobblin?)
  • 7 datasets contain late-non-empty-files are worth investigation (see the list here).
  • After removal of the 17 hours and 7 datasets identified above, we're left with 799 late-non-empty-files representing 2Gb over 186 datasets and 77 folder-hours. Grouping those folder-hour gives us more hours to look at (see the list here), and finally a few standalone files (see the list here)

I think the next steps of this analysis are to investigate why we have late-events at specific hours (my guts tell me to look at Event-gate glitches), and why some datasets have those late events as well (hint: 6 over 7 are mediawiki generated events). Something else to consider could be to change the way we ingest data and run gobblin more frequently, giving us more granularity in the times at which files lend on HDFS (currently we run gobblin once per hour for events).

I think the next steps of this analysis are to investigate why we have late-events at specific hours (my guts tell me to look at Event-gate glitches), and why some datasets have those late events as well (hint: 6 over 7 are mediawiki generated events).

Unsure if related but we recently found that some MW requests might last for several hours (T374662), so depending on how the event is created it's possible that late-events are created by MW:

I believe that this might possibly lead to late events being sent by MW.

Second round of analysis : https://phabricator.wikimedia.org/P69381

TL;DR:

  • When gobblin has issues/is paused, qwe have holes
  • Some datasets set their ingestion-time and lead to late events (https://phabricator.wikimedia.org/P69381$247). @Ottomata let's talk about this.
  • Small number of glitches visible in gobblin for which I have not found explanations yet :).

After explaining my finding to the team yesterday, here are the following steps: I'm gonna have a look at late-events in a few streams to try to understand the reason why we have them, and at the same time we are going to change the data event-publishing contract: event-publishing libraries should set the meta.dt field and use this field as kafka-timestamp field, overriding the field value if it is already set by the producer.

Exploration on specific datasets:

  • mediawiki_page_content_change_v1--> No timestamp-late-events, only gobblin/kafka hiccups
  • resource_purge --> Uses meta.dt, and there are late timestamps. We don't refine this table (see here) but not because of late events. I wonder if changing the behavior of event-gate to overwrite meta.dt would change anything to other dowstream consumers.
  • mediawiki_revision-create --> Regular small number of late events (less than 10 per hour), with one spike at ~5k per hour for 4 hours on 2024-08-30 at 8amUTC. I think this is due to the fact that meta.dt is set to the revision timestamp, and some revision changes can occur for past revisions.
  • mediawiki_recentchange --> Regular number of late events (less than 10 usually with some bumps). I don't really know why this happens, probably the same reason as the revision-create,

resource_purge

Good catch. We should ask SRE team, I think ServiceOps.

meta.dt is set to the revision timestamp

Also good catch. Now that you mention it, this is a consequence of never prioritizing T267648: Adopt conventions for server receive and client/event timestamps in non analytics event schemas.

mediawiki_recentchange

Yes, probably same reason.

While talking with @Ottomata , I realized that we can relatively easily monitor late-arrived events in Gobblin when writing _IMPORTED flags: if there already is a flag in folder to be flagged, report late-events instead of overwriting the flag.
As gobblin has prometheus metrics integration, it'll be possible to report those using prometheus :)
This feels like a nice and wasy approach.

Ahoelzl triaged this task as Medium priority.Jan 24 2025, 5:13 PM
Ahoelzl raised the priority of this task from Medium to High.Apr 3 2025, 3:47 PM

Resuming work on this task, and after rereading the previous discussions, I propose the following plan:
• We already generate a flag at the end of the new refine job — we can include the legacy timestamp in that flag (to do).
• With that timestamp available, we could extend the legacy refine monitor job to write its analysis into a new table.
→ I don’t think this job should send alert emails for every error or late event — only if the global level of late events exceeds a certain threshold.
• We could create a Superset dashboard to present this data clearly.
• In parallel, we can progressively override meta.dt in EventGate to reduce the number of late events to an acceptable margin. (if ok with the proposition, the task needs to be created)

Thoughts?

Antoine_Quhen renamed this task from Handle Late-Arrived Events from Gobblin into Airflow triggered Refine to Refine to Hive with Airflow – Handle Late-Arrived Events.Apr 25 2025, 3:29 PM

• In parallel, we can progressively override meta.dt in EventGate to reduce the number of late events to an acceptable margin. (if ok with the proposition, the task needs to be created)

Oh yeah let's just do this! I think we agreed that we can and should just do that asap. T376026: Update event-producing tools to overwrite `meta.dt`

It just hasn't been prioritized yet.

→ I don’t think this job should send alert emails for every error or late event — only if the global level of late events exceeds a certain threshold.

If we have a way to observe the late events per dataset per hour (superset dashboard or whatever), then I agree! Perhaps, a single daily email if there are any late events would be sufficient? With a link to the dashboard? And maybe an email summary like RefineMonitor does now is good too?

JAllemandou removed the point value 3 for this task.May 2 2025, 6:13 AM

After some thoughtful discussion here is the proposed plan:

  • Add an airflow step in Refine and webrequest jobs to add a _PROCESSED flag in the job source folder when the job kicks-in. It makes sense to have this flag set in those jobs as the triggering logic belongs there.
  • Write a java/scala job that:
    • reads a Gobblin state from HDFS (specific HDFS format)
    • Validates touched folders against a flag (can be configured) - The validation is: for a folder touched by the Gobblin job, is there data written AFTER in time the flag has been written, and if yes, how much (using file size for the how much).
    • Reports on folders touched by Gobblin: How many without late-events, how many with late-events, and which ones.
  • Add an airflow step in Gobblin airflow jobs with the job written above after the Gobblin import has run. It makes sense to have this job tied to Gobblin as the Gobblin state provides which folders have been touched by the lastest Gobblin job, allowing for an exact check.

Let me know if it makes sense!

I have done a new analysis of late-events using the wmf.hdfs_usage table here are the findings and links to the analysis code (snapshot used: 2025-05-26, event data between 2025-02-25 AND 2025-05-24)

  • Thanks to its 24h-lag recomputation mechanism, the scala refine system has no late-event. Every hour of every refined dataset contains data computed AFTER the latest arrival of their relative raw data folder, except for some canary-event generated more than 24h after the refinement, but those don't matter as they are filtered out of the refined data. https://phabricator.wikimedia.org/P77496
  • Using an approximation of what the Airflow sensor would do when triggering airflow-scheduled-refine jobs, I have found that 10 datasets would have incurred some late-events in the past 90 days (https://phabricator.wikimedia.org/P77530). A manual review of those cases showed that
    • The mediawiki_revision_create stream contains late data - My assumption is that sometime events are regenerated, and the revision_timestamp is used as meta.dt (we wish to change that).
    • Gobblin sometimes has hiccups - I don't know if it's gobblin or Kafka, but on certain runs gobblin ingested irregular data volumes accross runs (see pasted image).

Next steps:

  • Find a way to automate checking if a gobblin hiccup correlates with late events
  • Discuss about mediawiki_revision_create

Change #1166400 had a related patch set uploaded (by Joal; author: Joal):

[operations/alerts@master] Add data-eng gobblin alert for published files

https://gerrit.wikimedia.org/r/1166400

Change #1166400 merged by Brouberol:

[operations/alerts@master] Add data-eng gobblin alert for published files

https://gerrit.wikimedia.org/r/1166400

Latest update on this before leaving for holidays.

Gobblin now reports publisher metrics to Prometheus:

  • files_count and files_size_total per job and per topic
  • for files published in the future (event timestamp in the future), files published too far in past (event timestamp before 60 days from now), files published after a _PROCESSED flag in the folder, and regular files

I have updated our Gobblin dashboard: https://grafana.wikimedia.org/goto/UfU0apsHg?orgId=1

  • On the top-row (Last run), at the far right, there are 2 boxes telling about how many topics with regular files published, and how with problematic files published (more on the definition of this later).
  • On the second row (Main), the bottom 2 charts provide a temporal view of how many regular files and problematic files accross topics. I have separated them to prevent scale mismatch issues (low number of problematic files vs high number of regular ones)
  • A new row at the bottom (Published files details) shows per-topic charts for regular files, after-flag files for files big enough to be reported (more on this later), after-flag files for files small enough to NOT be reported, files in future and files too far in the past.

Now the definition of what I think we should act on, and therefore counted in the "problematic files" definition:

  • files in future - We should always alert and have a look, this is not normal
  • Files too far in the past - Same, always alert
  • Files published after the _PROCESSED flag - It depends on data size
    • If regular data size is small (less than 1Mb), always report. Data is small enough that ratios don't make sense.
    • If regular data size is big enough (more than 1Mb), report only if the ratio (files_after_flag_size / regular_files_size) is bigger than 0.0001. This garantees us four nines of correctness over the data size metric.

We have an alert in place (thanks a lot to Balthazar for the Friday deploy) for the above definition (https://gerrit.wikimedia.org/r/c/operations/alerts/+/1166400), so normally we should receive emails when weird files appear.

I have reviewd all the above with @mforns (thank you so much <3), hopefully it's all ok while I'm away :)

This is incredible, thank you Joseph!

I updated the description fields of the 2 problematic files panels in the Grafana dashboard with your description from above, and also a link to this comment. There is now an (i)nfo hover that shows what it means.

I updated the description fields of the 2 problematic files panels in the Grafana dashboard with your description from above, and also a link to this comment. There is now an (i)nfo hover that shows what it means.

Thank you ! :)