In this task, we want do implement a mechanism that would take care of the following:
A) Iceberg provides Spark procedures that allow us to optimize its performance by removing old snapshots, compacting small files, and more. (For an example see T335305#8877037). We want to trigger these procedures on a regular cadence.
B) Additionally, at WMF we keep analytics data around for 90 days tops. For regular Hive tables, we have developed a python helper script that takes care of data deletion. Runs of this script are typically done via systemd timers. Iceberg does not support partitions in the same way as Hive, but it does support DELETEs.
Some ideas:
Both (A) and (B) are doable via Spark SQL statements.
For (A), we can do something like CALL spark_catalog.system.rewrite_data_files('wmf_traffic.referrer_daily');.
For (B) DELETE * FROM wmf_traffic.referrer_daily WHERE day <= '2023-01-01';. Note that the DELETE is a metadata operation, and to really scrub the data out of HDFS we have to compact files after the DELETE.
We can then implement an Airflow DAG (DAGs?) that will take configuration for what to do with any particular Iceberg table, perhaps similarly as in the Cassandra Loading DAGs.
Config could look like so:
{
"schema1.table1" : {
"procedure_calls" : {
"rewrite_data_files" : { "extraParamKey1": "extraParamVal1", ... }.
"another_procedure" : ...
}
"data_deletion" : {
"where" : "day <= {{ data_interval_start.substract(days=45) }}"
}
},
"schema1.table2" : {
...
}
}We should make sure data deletion runs before the procedure calls to guarantee that data files are gone.
These SQL statements generate result sets with metadata. Perhaps that is interesting stuff we should keep around?