Page MenuHomePhabricator

[Refine refactoring] Extract refine schema management into a dedicated tool
Open, Needs TriagePublic8 Estimated Story Points

Description

As a software engineer, I would like the capability to programmatically manage schemas of Refined tables.

As part of the refine refactory, we should extract schema management into a dedicated tool.

The tool should be able to.

  1. read from config dbs and schema that should exist in metastore (jsonschema vs calcite?)
    • db -> schema URI
  2. The tool should do multiple things
    • validate table existence
    • validate coherence
    • migrate the schema
      • update table properties
  3. be executable at the CLI and via airflow
Success criteria
  • Code
  • Unit Tests
  • Migrate existing code to be compatible with Iceberg tables
  • Manual dryrun on currently refined tables
  • Release and used from Airflow refine dag

Event Timeline

Hello! I'm not entirely sure what this ticket is trying to do, but here's some hopefully useful information:

RefineTarget has a useMergedSchemaForRead option which if true (which it is for our Refine jobs), will merge the incoming raw event's schema with the target table's Hive schema before reading the DataFrame from the raw data. The reasoning for this is documented here.

We'd really prefer not to do this. In Event Platform world, this is not needed, because schemas are backwards compatible and do not remove fields. However, IIRC, there is a bug related to how for array and map fields where values are structs, a different ordering of the struct fields in the raw event schema vs the target hive schema will cause the write to fail.

@JAllemandou has a patch to fix this, but we were waiting for Spark 3 to fix it for map types. We are on Spark 3 now, so we should be able to go?

See:

lbowmaker set the point value for this task to 5.Feb 16 2024, 7:54 PM

Change #1016808 had a related patch set uploaded (by Aqu; author: Aqu):

[analytics/refinery/source@master] Add CLI to create or update Iceberg tables

https://gerrit.wikimedia.org/r/1016808

Just read Antione's patch and I think I'm missing something, so I thought I could ask here.

I understand why we want to have separate jobs for evolving tables and for 'refining' (inserting) data into those tables.

Is there also a reason for separating the library code in this way?

I'm asking because the RefineDataset patch has a writeToIcebergOutputTable function, and Antione's very cool EvolveIcebergTable patch has an "IcebergTableDDLStatementsGenerator", which feels to me like an awkward name and home for this code.

(I'll paste some of what I wrote in the review and we can discuss here?)

Added comment here.

Could this logic live in a public library function somewhere? It seems useful especially from the CLI, and for other jobs than just RefineDataset.

Both Refine and RefineSanitized jobs use DataFrameToHive to do what they need to do. I think having something similar for Iceberg would be useful (I understand that we don't want a single function to do both evolve + insert).

I'm not sure what we'd call such a utility class though. Maybe DataFrameToIceberg is good. However unlike DataFrameToHive, there would be no implicit apply function that both evolved the table and inserted into it.
We'd just throw an exception if someone calls DataFrameToIceberg.write (or whatever) and their schema was not compatible.

Thoughts?

I have been wondering about how to organize this code.
I was not willing to replicate the DataFrameToHive pattern due to the apply function indeed, and also to not put code for both schema and data management in the same place, as we are trying to split them functionally.
Should we have 2 lib files, one for schema and one for data, for both Hive and Iceberg? Or one file doing both as it is now?

Antoine_Quhen renamed this task from [NEEDS GROOMING][SPIKE] Extract refine schema management into a dedicated tool to Extract refine schema management into a dedicated tool.Thu, Apr 4, 3:56 PM
Antoine_Quhen claimed this task.
Antoine_Quhen updated the task description. (Show Details)

Should we have 2 lib files, one for schema and one for data, for both Hive and Iceberg? Or one file doing both as it is now?

Either could work! I think it would be nicer to keep Hive code with a Hive class, Iceberg code with an Iceberg class, but if you'd prefer to organize around the functionality (an Evolve class and a Writer class) instead, that could be fine too. It'd just be nice if the interfaces were roughly consistent (function names? even if parameters are not, e.g. tableProperties?)

So I guess either

// organized around storage
class DataFrameToHive {
   def evolve(spark, tableName, schema, ...)
   def write(tableName, dataFrame, ...)
}
class DataFrameToIceberg{
   def evolve(spark, tableName, schema, ...)
   def write(tableName, dataFrame, ...)
}

Or:

// Organized around functionality:

class TableSchemaManager {   // (name TBD of course)
   def evolveIcebergTable(spark, tableName, schema, ...)
   def evolveHiveTable(spark, tableName, schema, ...)
}

class DataFrameToTable {
   def writeHive(tableName, dataFrame, ...)
   def writeIceberg(tableName, dataFrame, ...)
   // OR: do these need to be different functions? I think not because tableName must already exist, and we can inspect it to find its storage.
}

Or even a combo? One utility class that contains these functions and storages?

class SparkTableManager { // name?: DataFrameToTable?, SparkTableUtilities?
  def evolveHiveTable(spark, tableName, schema, ...)
  def evolveIcebergTable(spark, tableName, schema, ...) 
  
  // only need one public function, storage is known because tableName already exists, can vary behavior accordintly
  def write(tableName, dataFrame, ...)
}

I prefer the "by functionality" organization, for separating schema vs data code.
I think we need the 2 different functions to make the Iceberg one delete data before inserting. And actually this could be discussed as well: I think we wish to have this by default in the Iceberg write function - do you agree?

If ok for you @Ottomata, (ping @Antoine_Quhen and @gmodena as well if you have a minute :), let's go with that?

I prefer the "by functionality" organization

Yap cool with me. Let the namingbikeshed begin.

I think we need the 2 different functions to make the Iceberg one delete data before inserting

Behavior needs to be different, but the public API function might not need to be. It might be nice for callers to not have to think to hard about it. E.g.

def write(tableName, dataFrame, ...) = {
   val table = spark.table(tableName)
   table.getStorageSomehow() match {
        "iceberg" => writeIceberg(tableName, dataFrame, ...)
        "hive" => writeHive(tableName, dataFrame, ...)
        // ...
   }

}

Unless of course, there are very different parameters that need to be passed to write for iceberg vs hive vs etc. In that case, let's just make different public functions for each.

Ahoelzl renamed this task from Extract refine schema management into a dedicated tool to [Refine refactoring] Extract refine schema management into a dedicated tool.Thu, Apr 11, 6:31 PM
Ahoelzl updated the task description. (Show Details)
Ahoelzl changed the point value for this task from 5 to 8.