Page MenuHomePhabricator

Refine jobs should be scheduled by Airflow
Open, Needs TriagePublic

Description

One of the systems that schedules DE's jobs today is the Refine pipeline.
Documentation: https://wikitech.wikimedia.org/wiki/Analytics/Systems/Refine
Code: https://github.com/wikimedia/analytics-refinery-source/tree/master/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/refine

NOTE: We should probably do T341229: ProduceCanaryEvents job should be scheduled by Airflow and/or a k8s service first as it will help us learn more about how we'll dynamically schedule with Airflow.

Short context

1 Refine job processes a dynamic number of datasets, all included under the same HDFS base path. It has 3 different aspects:

  1. Identification of refine targets. Determining which datasets within the base path need to be refined.
  2. Schema evolution. Making changes to output Hive tables if schemas have changed.
  3. Data refinement. Actually processing the input data and writing to the output table.

Only the identification of refine targets (1) has to be migrated to Airflow. 2) and 3) will still be executed by Refine.

Expected result

The ideal result of this task would be an Airflow factory (i.e. a TaskGroup factory) that dynamically generates a DAG for all the target datasets (or 1 DAG for each target dataset). For each hour and dataset, the DAG would execute a SparkSubmitOperator that would call Refine. This way, we could very easily migrate an existing Refine job, just by calling the TaskGroup factory with some configuration.

Gotchas
  • The main issue is that the source data for the Refine pipeline can be updated after it being created. For example: An hourly partition for a given source dataset might be created at 5pm, and at 6pm it might be rewritten (updated) to include some more data it was missing. Refine works this around by checking the modification time of the source dataset and the output dataset, and it re-refines if the source mtime is greater than the destination mtime. One big part of this task is to figure out how to implement this in Airflow!
  • A problem that has bugged us in the past is the pyarrow library (used to interact with HDFS from Airflow). Its older version was not thread-safe and caused us problems when creating dynamic DAGs. We upgraded to the newest pyarrow library (which is supposed to fix our issues), but have not yet extensively tested it. This might be another potential blocker of this task.

Related Objects

StatusSubtypeAssignedTask
OpenNone
DuplicateNone
DuplicateNone
ResolvedJAllemandou
OpenNone
ResolvedNone
OpenNone
DuplicateNone
ResolvedAntoine_Quhen
ResolvedJAllemandou
Resolvedtchin
ResolvedAntoine_Quhen
OpenAntoine_Quhen
Resolvedgmodena
ResolvedSpikegmodena
ResolvedOttomata
ResolvedAntoine_Quhen
ResolvedStevemunene
OpenAntoine_Quhen
OpenAntoine_Quhen
Resolvedtchin
OpenJAllemandou
OpenNone
OpenNone
OpenNone
OpenNone
Opentchin
ResolvedAntoine_Quhen
OpenAntoine_Quhen
OpenNone

Event Timeline

Aklapper added a subscriber: NOkafor-WMF.

Resetting inactive assignee. Please reassign tasks when offboarding - thanks.)

Migrating refine from Airflow may trigger upgrading the refine jobs to Spark 3.

The last version of the refinery source includes more error logs, which will ship at the same time:

Ottomata renamed this task from Migrate 1+ Refine jobs to Refine jobs should be scheduled by Airflow.Jul 6 2023, 1:55 PM
Ottomata updated the task description. (Show Details)
Ottomata edited subscribers, added: Milimetric, JAllemandou; removed: NOkafor-WMF.

We (Data-Platform-SRE) have been working on updating the alerting system so that all emails sent by automated monitoring systems use routable domains. This work is being carried out under T358675: Update the From: addresses of all email from DPE pipelines so that they use routable addresses

With regard to refinery, this is happening as a two stage process:
Firstly, I updated all of the references in puppet to the systemd timers that launch refinery jobs: https://gerrit.wikimedia.org/r/c/operations/puppet/+/1014001 - adding options to override the default email address.
This is deployed and working.

I have also created a patch to refinery-source itself, here: https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/1014004
This should update the default email address to be the same as that which I have set on every instantiated job via puppet.
That patch is still awaiting code review.

So in the meantime, if you start to run any refine jobs from Ariflow, you may find that they come from refine@an-launcher1002.eqiad.wmnet instead of noreply@wikimedia.org.
You shoul be able to override the from_email for these jobs with configuraiton parameters, in the same way as the systemd timers are currently doing.

@Antoine_Quhen can you remind me? How did we over come the 'gotcha' described in the task description here?

The main issue is that the source data for the Refine pipeline can be updated after it being created. For example: An hourly partition for a given source dataset might be created at 5pm, and at 6pm it might be rewritten (updated) to include some more data it was missing. Refine works this around by checking the modification time of the source dataset and the output dataset, and it re-refines if the source mtime is greater than the destination mtime. One big part of this task is to figure out how to implement this in Airflow!

How did we over come the 'gotcha' described in the task description here?

The main issue is that the source data for the Refine pipeline can be updated after it being created

Answering for posterity: This is not yet overcome: T370665: Handle Late-Arrived Events from Gobblin into Airflow triggered Refine