One of the systems that schedules DE's jobs today is the Refine pipeline.
Documentation: https://wikitech.wikimedia.org/wiki/Analytics/Systems/Refine
Code: https://github.com/wikimedia/analytics-refinery-source/tree/master/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/refine
NOTE: We should probably do T341229: ProduceCanaryEvents job should be scheduled by Airflow and/or a k8s service first as it will help us learn more about how we'll dynamically schedule with Airflow.
Short context
1 Refine job processes a dynamic number of datasets, all included under the same HDFS base path. It has 3 different aspects:
- Identification of refine targets. Determining which datasets within the base path need to be refined.
- Schema evolution. Making changes to output Hive tables if schemas have changed.
- Data refinement. Actually processing the input data and writing to the output table.
Only the identification of refine targets (1) has to be migrated to Airflow. 2) and 3) will still be executed by Refine.
Expected result
The ideal result of this task would be an Airflow factory (i.e. a TaskGroup factory) that dynamically generates a DAG for all the target datasets (or 1 DAG for each target dataset). For each hour and dataset, the DAG would execute a SparkSubmitOperator that would call Refine. This way, we could very easily migrate an existing Refine job, just by calling the TaskGroup factory with some configuration.
Gotchas
- The main issue is that the source data for the Refine pipeline can be updated after it being created. For example: An hourly partition for a given source dataset might be created at 5pm, and at 6pm it might be rewritten (updated) to include some more data it was missing. Refine works this around by checking the modification time of the source dataset and the output dataset, and it re-refines if the source mtime is greater than the destination mtime. One big part of this task is to figure out how to implement this in Airflow!
- A problem that has bugged us in the past is the pyarrow library (used to interact with HDFS from Airflow). Its older version was not thread-safe and caused us problems when creating dynamic DAGs. We upgraded to the newest pyarrow library (which is supposed to fix our issues), but have not yet extensively tested it. This might be another potential blocker of this task.