Page MenuHomePhabricator

Refine jobs should be scheduled by Airflow
Open, HighPublic

Description

One of the systems that schedules DE's jobs today is the Refine pipeline.
Documentation: https://wikitech.wikimedia.org/wiki/Analytics/Systems/Refine
Code: https://github.com/wikimedia/analytics-refinery-source/tree/master/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/refine

NOTE: We should probably do T341229: ProduceCanaryEvents job should be scheduled by Airflow and/or a k8s service first as it will help us learn more about how we'll dynamically schedule with Airflow.

Short context

1 Refine job processes a dynamic number of datasets, all included under the same HDFS base path. It has 3 different aspects:

  1. Identification of refine targets. Determining which datasets within the base path need to be refined.
  2. Schema evolution. Making changes to output Hive tables if schemas have changed.
  3. Data refinement. Actually processing the input data and writing to the output table.

Only the identification of refine targets (1) has to be migrated to Airflow. 2) and 3) will still be executed by Refine.

Expected result

The ideal result of this task would be an Airflow factory (i.e. a TaskGroup factory) that dynamically generates a DAG for all the target datasets (or 1 DAG for each target dataset). For each hour and dataset, the DAG would execute a SparkSubmitOperator that would call Refine. This way, we could very easily migrate an existing Refine job, just by calling the TaskGroup factory with some configuration.

Gotchas
  • The main issue is that the source data for the Refine pipeline can be updated after it being created. For example: An hourly partition for a given source dataset might be created at 5pm, and at 6pm it might be rewritten (updated) to include some more data it was missing. Refine works this around by checking the modification time of the source dataset and the output dataset, and it re-refines if the source mtime is greater than the destination mtime. One big part of this task is to figure out how to implement this in Airflow!
  • A problem that has bugged us in the past is the pyarrow library (used to interact with HDFS from Airflow). Its older version was not thread-safe and caused us problems when creating dynamic DAGs. We upgraded to the newest pyarrow library (which is supposed to fix our issues), but have not yet extensively tested it. This might be another potential blocker of this task.

Done is

AC is: There are no more systemd scheduled Refine jobs.

  • main Refine jobs scheduled by Airflow, systemd timers removed.
  • Refine sanitized jobs scheduled by Airflow, systemd timers removed.

Related Objects

StatusSubtypeAssignedTask
ResolvedNone
DuplicateNone
DuplicateNone
ResolvedJAllemandou
ResolvedOttomata
OpenNone
ResolvedNone
OpenAhoelzl
DuplicateNone
ResolvedAntoine_Quhen
ResolvedJAllemandou
Resolvedtchin
ResolvedAntoine_Quhen
ResolvedAntoine_Quhen
Resolvedgmodena
ResolvedSpikegmodena
ResolvedOttomata
ResolvedAntoine_Quhen
Resolved Stevemunene
ResolvedAntoine_Quhen
ResolvedAntoine_Quhen
Resolvedtchin
OpenNone
ResolvedOttomata
OpenNone
Resolvedtchin
ResolvedAntoine_Quhen
ResolvedAntoine_Quhen
OpenNone
DeclinedAntoine_Quhen
ResolvedJAllemandou
ResolvedOttomata
ResolvedAntoine_Quhen
ResolvedAntoine_Quhen
ResolvedAntoine_Quhen
ResolvedAntoine_Quhen
DuplicateNone
OpenNone
ResolvedAntoine_Quhen
ResolvedAntoine_Quhen

Event Timeline

Aklapper added a subscriber: NOkafor-WMF.

Resetting inactive assignee. Please reassign tasks when offboarding - thanks.)

Migrating refine from Airflow may trigger upgrading the refine jobs to Spark 3.

The last version of the refinery source includes more error logs, which will ship at the same time:

Ottomata renamed this task from Migrate 1+ Refine jobs to Refine jobs should be scheduled by Airflow.Jul 6 2023, 1:55 PM
Ottomata updated the task description. (Show Details)
Ottomata edited subscribers, added: Milimetric, JAllemandou; removed: NOkafor-WMF.

We (Data-Platform-SRE) have been working on updating the alerting system so that all emails sent by automated monitoring systems use routable domains. This work is being carried out under T358675: Update the From: addresses of all email from DPE pipelines so that they use routable addresses

With regard to refinery, this is happening as a two stage process:
Firstly, I updated all of the references in puppet to the systemd timers that launch refinery jobs: https://gerrit.wikimedia.org/r/c/operations/puppet/+/1014001 - adding options to override the default email address.
This is deployed and working.

I have also created a patch to refinery-source itself, here: https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/1014004
This should update the default email address to be the same as that which I have set on every instantiated job via puppet.
That patch is still awaiting code review.

So in the meantime, if you start to run any refine jobs from Ariflow, you may find that they come from refine@an-launcher1002.eqiad.wmnet instead of noreply@wikimedia.org.
You shoul be able to override the from_email for these jobs with configuraiton parameters, in the same way as the systemd timers are currently doing.

@Antoine_Quhen can you remind me? How did we over come the 'gotcha' described in the task description here?

The main issue is that the source data for the Refine pipeline can be updated after it being created. For example: An hourly partition for a given source dataset might be created at 5pm, and at 6pm it might be rewritten (updated) to include some more data it was missing. Refine works this around by checking the modification time of the source dataset and the output dataset, and it re-refines if the source mtime is greater than the destination mtime. One big part of this task is to figure out how to implement this in Airflow!

How did we over come the 'gotcha' described in the task description here?

The main issue is that the source data for the Refine pipeline can be updated after it being created

Answering for posterity: This is not yet overcome: T370665: Refine to Hive with Airflow – Handle Late-Arrived Events

Ahoelzl changed the task status from Open to In Progress.Apr 25 2025, 4:24 PM
Ahoelzl triaged this task as High priority.
Ahoelzl added a project: Epic.
Ahoelzl moved this task from Backlog to Tag with Roadmap on the Data-Engineering board.
Ahoelzl edited projects, added Data-Engineering-Roadmap; removed Data-Engineering.
Ahoelzl moved this task from Backlog to Q4 FY24-25 on the Data-Engineering-Roadmap board.
Ahoelzl claimed this task.

@Ahoelzl I came across this after seeing T414107: Inventory of SystemD timer based jobs and pipelines and realized that this task is not done.

The AC includes "Refine sanitized jobs scheduled by Airflow, systemd timers removed.". RefineSanitize is still using systemd with the old version of Refine from before Antoine's refactor.