Page MenuHomePhabricator

Move more of refine_hive_hourly dag logic into RefineConfiguration
Open, Needs TriagePublic

Description

Following https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/677 we notice a path of amelioration for the Airflow code linked to Refine. Quoting @Ottomata :

It would be nice if we could keep any adjustable EvolveTable and RefineDataset job parameters named as closely to settings in analytics_*_ingestion as possible.
Additionally, you are doing some config interpretation in this class, e.g. _hive_table_name_from_stream_name, but you are also doing some interpretation in the dag factory, e.g.

"--schema_uri": f"/{config['schema_uri']}",
        [f"{column}:{sql_type}" for column, sql_type in config["hive_partition_columns"].items()]
)

Rather than doing this kind of thing in two places, could we make this RefineConfiguration class handle all translations between EventStreamConfig entries (and maybe DagProperties? not sure.) to most (if not all?) SparkSubmitOperator kwargs needed to launch the jobs.

More simply, should we move the logic in prepare_evolve_table_script and prepare_refine_task_params into this class, and simplify the dag factory just handle converting RefineConfiguration -> to a Dag?

I kind of ask this, because I wonder if we could avoid hardcoding a lot of the application_args for the SparkSubmitOperator (like self.stream, analytic_hive_ingestion_enabled, , and instead make RefineConfiguration just a mapper between EventStreamConfig entry (and one data datasets config?) to SparkSubmitOperator kwargs, or maybe just application_args, not sure.
That would allow us to more easily add switches to the config, e.g. overriding table name via EventStreamConfig without having to modify this Airflow code.

We detached it from the base MR to deploy the staging version first.