Page MenuHomePhabricator

[Refine refactoring] Refine jobs should be scheduled by Airflow: implementation
Open, Needs TriagePublic16 Estimated Story Points

Description

Goals

  • Table management (creation, evolution) should be its own airflow task
  • Refine code refactored to support Hive and Iceberg
  • Dynamic Airflow dag to launch Refine jobs

Table Management Story

As a software engineer, I would like the capability to programmatically manage schemas of Refined tables.

As part of the refine refactor, we should extract table management into a dedicated tool.

The tool should be able to.

  1. read from config dbs and schema that should exist in metastore (jsonschema vs calcite?)
    • db -> schema URI
  2. The tool should do multiple things
    • validate table existence
    • validate coherence
    • migrate the schema
      • update table properties
  3. be executable at the CLI and via airflow

Success criteria

Related Objects

StatusSubtypeAssignedTask
OpenNone
DuplicateNone
DuplicateNone
ResolvedJAllemandou
OpenNone
ResolvedNone
OpenNone
OpenAntoine_Quhen
Resolvedgmodena
ResolvedSpikegmodena
ResolvedOttomata
ResolvedAntoine_Quhen
ResolvedStevemunene
OpenAntoine_Quhen
OpenAntoine_Quhen
Resolvedtchin
OpenJAllemandou
OpenNone
OpenNone
OpenNone
OpenNone
Opentchin

Event Timeline

Hello! I'm not entirely sure what this ticket is trying to do, but here's some hopefully useful information:

RefineTarget has a useMergedSchemaForRead option which if true (which it is for our Refine jobs), will merge the incoming raw event's schema with the target table's Hive schema before reading the DataFrame from the raw data. The reasoning for this is documented here.

We'd really prefer not to do this. In Event Platform world, this is not needed, because schemas are backwards compatible and do not remove fields. However, IIRC, there is a bug related to how for array and map fields where values are structs, a different ordering of the struct fields in the raw event schema vs the target hive schema will cause the write to fail.

@JAllemandou has a patch to fix this, but we were waiting for Spark 3 to fix it for map types. We are on Spark 3 now, so we should be able to go?

See:

lbowmaker set the point value for this task to 5.Feb 16 2024, 7:54 PM

Change #1016808 had a related patch set uploaded (by Aqu; author: Aqu):

[analytics/refinery/source@master] Add CLI to create or update Iceberg tables

https://gerrit.wikimedia.org/r/1016808

Just read Antione's patch and I think I'm missing something, so I thought I could ask here.

I understand why we want to have separate jobs for evolving tables and for 'refining' (inserting) data into those tables.

Is there also a reason for separating the library code in this way?

I'm asking because the RefineDataset patch has a writeToIcebergOutputTable function, and Antione's very cool EvolveIcebergTable patch has an "IcebergTableDDLStatementsGenerator", which feels to me like an awkward name and home for this code.

(I'll paste some of what I wrote in the review and we can discuss here?)

Added comment here.

Could this logic live in a public library function somewhere? It seems useful especially from the CLI, and for other jobs than just RefineDataset.

Both Refine and RefineSanitized jobs use DataFrameToHive to do what they need to do. I think having something similar for Iceberg would be useful (I understand that we don't want a single function to do both evolve + insert).

I'm not sure what we'd call such a utility class though. Maybe DataFrameToIceberg is good. However unlike DataFrameToHive, there would be no implicit apply function that both evolved the table and inserted into it.
We'd just throw an exception if someone calls DataFrameToIceberg.write (or whatever) and their schema was not compatible.

Thoughts?

I have been wondering about how to organize this code.
I was not willing to replicate the DataFrameToHive pattern due to the apply function indeed, and also to not put code for both schema and data management in the same place, as we are trying to split them functionally.
Should we have 2 lib files, one for schema and one for data, for both Hive and Iceberg? Or one file doing both as it is now?

Antoine_Quhen renamed this task from [NEEDS GROOMING][SPIKE] Extract refine schema management into a dedicated tool to Extract refine schema management into a dedicated tool.Apr 4 2024, 3:56 PM
Antoine_Quhen claimed this task.
Antoine_Quhen updated the task description. (Show Details)

Should we have 2 lib files, one for schema and one for data, for both Hive and Iceberg? Or one file doing both as it is now?

Either could work! I think it would be nicer to keep Hive code with a Hive class, Iceberg code with an Iceberg class, but if you'd prefer to organize around the functionality (an Evolve class and a Writer class) instead, that could be fine too. It'd just be nice if the interfaces were roughly consistent (function names? even if parameters are not, e.g. tableProperties?)

So I guess either

// organized around storage
class DataFrameToHive {
   def evolve(spark, tableName, schema, ...)
   def write(tableName, dataFrame, ...)
}
class DataFrameToIceberg{
   def evolve(spark, tableName, schema, ...)
   def write(tableName, dataFrame, ...)
}

Or:

// Organized around functionality:

class TableSchemaManager {   // (name TBD of course)
   def evolveIcebergTable(spark, tableName, schema, ...)
   def evolveHiveTable(spark, tableName, schema, ...)
}

class DataFrameToTable {
   def writeHive(tableName, dataFrame, ...)
   def writeIceberg(tableName, dataFrame, ...)
   // OR: do these need to be different functions? I think not because tableName must already exist, and we can inspect it to find its storage.
}

Or even a combo? One utility class that contains these functions and storages?

class SparkTableManager { // name?: DataFrameToTable?, SparkTableUtilities?
  def evolveHiveTable(spark, tableName, schema, ...)
  def evolveIcebergTable(spark, tableName, schema, ...) 
  
  // only need one public function, storage is known because tableName already exists, can vary behavior accordintly
  def write(tableName, dataFrame, ...)
}

I prefer the "by functionality" organization, for separating schema vs data code.
I think we need the 2 different functions to make the Iceberg one delete data before inserting. And actually this could be discussed as well: I think we wish to have this by default in the Iceberg write function - do you agree?

If ok for you @Ottomata, (ping @Antoine_Quhen and @gmodena as well if you have a minute :), let's go with that?

I prefer the "by functionality" organization

Yap cool with me. Let the namingbikeshed begin.

I think we need the 2 different functions to make the Iceberg one delete data before inserting

Behavior needs to be different, but the public API function might not need to be. It might be nice for callers to not have to think to hard about it. E.g.

def write(tableName, dataFrame, ...) = {
   val table = spark.table(tableName)
   table.getStorageSomehow() match {
        "iceberg" => writeIceberg(tableName, dataFrame, ...)
        "hive" => writeHive(tableName, dataFrame, ...)
        // ...
   }

}

Unless of course, there are very different parameters that need to be passed to write for iceberg vs hive vs etc. In that case, let's just make different public functions for each.

Ahoelzl renamed this task from Extract refine schema management into a dedicated tool to [Refine refactoring] Extract refine schema management into a dedicated tool.Apr 11 2024, 6:31 PM
Ahoelzl updated the task description. (Show Details)
Ahoelzl changed the point value for this task from 5 to 8.

I've encountered an issue with our current production Airflow setup where the scheduler is not executing tasks in a depth-first manner as expected. Upon investigation, I found that the depth-first execution feature isn't supported in our current Airflow version (2.7.3). This functionality was introduced in Airflow 2.8.0, as per this pull request: Apache Airflow PR #27827 (I've tried it locally with success).

To resolve this, I propose upgrading our Airflow distribution to version 2.9.1, which supports depth-first task execution and includes other beneficial updates.

Additionally, from my testing, I discovered that to effectively utilize depth-first execution, we must limit the number of active tasks per DAG. Without this limit, the scheduler might fill the queue with instances of the initial tasks, which prevents later tasks from starting promptly. For the 'Refine' DAG, setting max_active_tasks to our desired level of parallelization (10) would optimize our task execution sequence.

Moreover, one more conf for a dag to execute in a depth-first manner is to add in its default_args (or in some tasks args) weight_rule="upstream".

I've encountered an issue with our current production Airflow setup where the scheduler is not executing tasks in a depth-first manner as expected. Upon investigation, I found that the depth-first execution feature isn't supported in our current Airflow version (2.7.3). This functionality was introduced in Airflow 2.8.0, as per this pull request: Apache Airflow PR #27827 (I've tried it locally with success).

To resolve this, I propose upgrading our Airflow distribution to version 2.9.1, which supports depth-first task execution and includes other beneficial updates.

We are already planning an upgrade to Airflow version 2.9.1 in T353884 although it isn't currently prioritised for the next few weeks. Would it help you significantly if we can get this upgrade done sooner?

To check the output of the new Refine process, I've been working with diff from https://github.com/G-Research/spark-extension/ to verify that my output matches what was generated by the old Refine process. Here are the problems I've encountered and circumvented:

Alternatively, we could update the legacy process to run a fresher code version...

What do you think?

The user_agent structure depends on the existing table schema

Yeah, this is a legacy EventLogging problem. Def unfortunate.

The _schema column is not filled with the current Refine process. It is pleasantly filled with the new code. The workaround is to remove _schema from the comparison. But I can't find where it's filled in the new code.

Really!? T255818: Refine drops $schema field values, specifically T255818#7992353.

The workaround is to remove geocoded_data in the comparison

Sounds fine. Geocoded data outputs can change even as maxmind dbs are updated. Even different runs of the old refine on the same data might produce different output.

The deduplicate transform function runs on a DF that was not previously sorted

Should we change it to sort? If that would make this more deterministic, then let's go for it.
But what to sort on? Hm. I suppose on meta.dt? Meh. I don't love it.

The duplications I've seen mostly involved identical lines differing only with meta.dt and meta.request_id.

Weird, I guess from retries from clients where X-Request-Id is not set before eventgate.

The workaround is to remove in the comparison of the whole meta

If the chosen deduped record was random before, then random in new code is not worse. I think its fine to simplify your comparison like this.

we could update the legacy process to run a fresher code version

Hm, not following, can you expand on this?

Ahoelzl changed the point value for this task from 8 to 16.Jun 28 2024, 1:45 PM

Change #1051061 had a related patch set uploaded (by Aqu; author: Aqu):

[analytics/refinery/source@master] Make TransformFunction Deduplication Deterministic

https://gerrit.wikimedia.org/r/1051061

Ottomata renamed this task from [Refine refactoring] Extract refine schema management into a dedicated tool to [Refine refactoring] Refine jobs should be scheduled by Airflow: implementation.Jul 18 2024, 5:01 PM
Ottomata updated the task description. (Show Details)

As discussed in standup, I've rewritten this task to encompass the work attached to it.

Change #1059429 had a related patch set uploaded (by Ottomata; author: Ottomata):

[analytics/refinery/source@master] Refine refactor - Use ExplicitSchemaLoader when constructing RefineTarget

https://gerrit.wikimedia.org/r/1059429

Change #1051061 merged by Aqu:

[analytics/refinery/source@master] Make TransformFunction deduplication deterministic

https://gerrit.wikimedia.org/r/1051061

Change #1070582 had a related patch set uploaded (by Aqu; author: Aqu):

[operations/puppet@production] [Analytics][Refine][test-cluster] Bump Refine job version for test cluster

https://gerrit.wikimedia.org/r/1070582

Change #1070582 merged by Ottomata:

[operations/puppet@production] [Analytics][Refine][test-cluster] Bump test cluster Refine job version

https://gerrit.wikimedia.org/r/1070582

Change #1071871 had a related patch set uploaded (by Ottomata; author: Ottomata):

[operations/puppet@production] refine - bump to refinery version 0.2.49

https://gerrit.wikimedia.org/r/1071871

Change #1071871 merged by Ottomata:

[operations/puppet@production] refine - bump to refinery version 0.2.49

https://gerrit.wikimedia.org/r/1071871

Change #1016808 merged by jenkins-bot:

[analytics/refinery/source@master] Refactor Refine to be triggered by Airflow

https://gerrit.wikimedia.org/r/1016808

Change #1075636 had a related patch set uploaded (by Aqu; author: Aqu):

[schemas/event/secondary@master] Add useragent to eventlogging legacy fragement eventcapsule

https://gerrit.wikimedia.org/r/1075636

Change #1075637 had a related patch set uploaded (by Aqu; author: Aqu):

[schemas/event/secondary@master] Add useragent to analytics/legacy events

https://gerrit.wikimedia.org/r/1075637

Change #1075636 merged by Aqu:

[schemas/event/secondary@master] Add useragent to eventlogging legacy fragement eventcapsule

https://gerrit.wikimedia.org/r/1075636

Change #1075637 merged by Aqu:

[schemas/event/secondary@master] Add useragent to analytics/legacy events

https://gerrit.wikimedia.org/r/1075637