Page MenuHomePhabricator

Iceberg Merge strategies with dbt
Closed, ResolvedPublic

Description

When working with dbt, different materializations can be used. The simplest are table and view, but for example, the table materialization overwrites the whole table on each execution, so in most of the pipelines, we will be using incremental materializations.

Incremental materializations aims to insert data on each execution without rewriting the whole table, and they can do in different ways, which depend on some configurations and on the adapter used.

Materializations have a config called incremental_strategy. When working with Spark, the default strategy is Append. insert_overwrite and merge can be used too.

The Append strategy will insert new rows on each execution. If the incremental condition ({% if is_incremental() %} is correctly setup (checking select max(updated_date) from {{ this }}) it can work well, but it won't process records that arrive out of date on the source.

The insert_overwrite strategy uses insert overwrite table ... partition (...), but it can give problems on Iceberg tables if the source and destination partitions are not based on the same dates.

The merge strategy uses merge into ... statements and it requires a unique key to be defined, so it can overwrite the unique records, but it can give other problems on our Iceberg tables.

We should explore and test the different strategies to define a guide on how and when to use them properly.

The task is done if:

  • We have a document / Readme explaining how and when use each strategy, with some examples.

Event Timeline

We met with @JMonton-WMF and found that DBT supports building new materializations: https://docs.getdbt.com/guides/create-new-materializations
From a quick look at it, it feels like possibly this should be the solution to our problem (pre-hooks, etc).
We have decided to try to apply DBT to one real example of us: Try to reproduce projectview_hourly with DBT and an iceberg (and Airflow when feasible).
I suggested this example as data is way smaller than using mediawiki_content.

I looked on the internet, and the idempotent-pipeline problem in DBT is a known and documented issue.
My preferred article is https://techblog.finatext.com/dbt-incremental-strategy-and-idempotency-877993f48448

@JAllemandou suggested that we could try to develop in dbt one of the HQL pipelines we have, so we try it with a real case that might need a custom materialization. The table is https://github.com/wikimedia/analytics-refinery/blob/master/hql/projectview/hourly/aggregate_pageview_to_projectview.hql

I'm going to explore options to do that in the same way, I'll check insert_by_period materialization aswell as creating a model that accepts arguments that could be passed from Airflow, or a custom materialization if needed.

Here is an example of how to do the aggregate_pageview_to_projectview with dbt, Iceberg, using monthly partitions, and overwriting data only on hourly chunks, so it should be idempotent and follow the approach of running this from Airflow with input dates.

I believe the merge strategy should work for most of these cases. The merge strategy requires a unique_key to be configured, which will be used to overwrite data. As this example, and probably others that reduce partitions from hourly to monthly, or similar, uses an aggregation (group_by), the unique key is equal to the fields used in the aggregation.
This example was using also hourly partitions to query the data, so I added those fields to the aggregation but not to the partition.

{{
  config(
    materialized='incremental',
    incremental_strategy='merge',
    file_format='iceberg',
    unique_key=['project','access_method','agent_type','referer_class','continent','country_code','year','month','day','hour'],
    partition_by=['year','month']
  )
}}


-- run: dbt run --vars '{"year": "2025", "month": "11", "day": "5", "hour": "12", "record_version": "0.0.2"}' --select projectview_hourly

SELECT
    project,
    access_method,
    CAST(NULL AS STRING) AS zero_carrier,
    agent_type,
    referer_class,
    continent,
    country_code,
    '{{ var("record_version") }}' AS record_version,
    SUM(view_count) AS view_count,
    year,
    month,
    day,
    hour
FROM
    {{ source('wmf', 'pageview_hourly') }}
WHERE year={{ var("year") }} AND month={{ var("month") }} AND day={{ var("day") }} AND hour={{ var("hour") }}
GROUP BY
    project,
    access_method,
    agent_type,
    referer_class,
    continent,
    country_code,
    year,
    month,
    day,
    hour
;

The original example seems to be prepared to work with arbitrary input and output tables, so this query can be moved to a macro, and then it can be used like this:

{{
  config(
    materialized='incremental',
    incremental_strategy='merge',
    file_format='iceberg',
    unique_key=['project','access_method','agent_type','referer_class','continent','country_code','year','month','day','hour'],
    partition_by=['year','month']
  )
}}

{{ aggregate_hourly_views(source('wmf', 'pageview_hourly')) }}

Running dbt run --vars '{"year": "2025", "month": "11", "day": "5", "hour": "12", "record_version": "0.0.2"}' --select projectview_hourly an hour batch will be selected and overwritten in the Iceberg table, without deleting other batches or using hourly partitions. If it is run twice, it will overwrite the hour chunk, without removing the monthly partition.

The main changes from the original pipeline to this one are:

  • Add year, month, day and hour as part of the select and group by, rather than being partitions in the query.
  • Add unique_key with all grouping columns, those should be the unique key used for merges. A group by should define a unique_key by definition, so finding the unique_key should be trivial on aggregations.
  • CAST NULL as a type, otherwise it fails because the table doesn't exist yet and Iceberg cannot infer the type from NULL.
  • There's no need to create or maintain a DDL
  • Everything related to "insert into" is removed, dbt handles that.

There is one problem though, and I believe it's the main @JAllemandou concern. If the source table removes data for old batches, and we re-run and past date, this approach won't remove the data.

For this cases, dbt has a built-in delete+insert strategy but it doesn't exist in the Spark adapter.

I'm going to explore how to create it, I believe it won't be difficult.

In any case, for these cases, I believe we could consider if merge is enough. On a normal scenario, runs are incremental, always adding data, and I guess in most of the cases, when we need to run an old batch is because something changed, not because something needs to disappear, or maybe a manual clean would be enough for such cases. But I'm not aware of all use cases, so I'm going to investigate the delete+insert custom strategy.

I also think that we could "improve" this queries to work not only with the Airflow way, but only with the default dbt incremental modeling, where data is picked since the last execution. This approach could remove the need for "backfills" in some cases, while allowing for custom overwrites.

All the code is in this branch.

Also, if we want to consider it, the Trino adapter has the built-in delete+insert strategy, so, with Trino it will work out of the box.

There is also another strategy for micro batches that allow for retries and backfills, more similar to how Airflow works, but apparently in the Spark adapter it uses insert_overwrite so it will overwrite entire partitions. It could be considered for some jobs.

I'm trying to give different examples, so we can consider later if they are useful or not.

Here is a way of achieving "delete+inert" using a pre_hook. The delete happens in the pre_hook and the insert happens using the "append" materialization:

{{
  config(
    materialized='incremental',
    incremental_strategy='append',
    file_format='iceberg',
    partition_by=['year','month'],
    pre_hook=[
      "{% if is_incremental() %}DELETE FROM {{ this }} WHERE year={{ var('year') }} AND month={{ var('month') }} AND day={{ var('day') }} AND hour={{ var('hour') }}{% endif %}"
    ]
  )
}}


-- run: dbt run --vars '{"year": "2025", "month": "11", "day": "5", "hour": "12", "record_version": "0.0.2"}' --select projectview_hourly_pre_hook

SELECT
    project,
    access_method,
    CAST(NULL AS STRING) AS zero_carrier,
    agent_type,
    referer_class,
    continent,
    country_code,
    '{{ var("record_version") }}' AS record_version,
    SUM(view_count) AS view_count,
    year,
    month,
    day,
    hour
FROM
    {{ source('wmf', 'pageview_hourly') }}
WHERE year={{ var("year") }} AND month={{ var("month") }} AND day={{ var("day") }} AND hour={{ var("hour") }}
GROUP BY
    project,
    access_method,
    agent_type,
    referer_class,
    continent,
    country_code,
    year,
    month,
    day,
    hour
;

if is_incremental() is needed in the pre_hook to avoid running the delete on the first run, when the table doesn't exist.

This pre_hook solution is great :)
If we can build our own materialization defining such a hook, it would be perfect.
The only concern I see here is defining the time-period for the deletion and materialization (this can change between jobs), I'm not sure of how we can do that :)

Two other things about the table definition:

  • I think it would be better to take advantage of Iceberg hidden date partitioning. By that I mean we could transform the year/month/day/hour fields into a single dt datetime field and use the iceberg months(dt) partitionining. With this strategy, users can use partition filtering with a simple filter on dt, which would also work in superset chart.
  • It would be great to order the data inserted in the table, as Iceberg/parquet take advantage of this for reading and file metadata optimizations. I assume the sorting should be made on dt, project, agent_type, access_method for instance.

Thanks a lot for the good work!

Good points! I might need a bit of your help to work on them. I believe we could create a dt field in the SELECT clause using year/month/day/hour and tell dbt that the partition_by is dt and it will work, but it might be good if we work together on that at some point as I'm not very familiar with Iceberg :)

For now, I created a custom materialization for the delete + insert strategy. According to their docs, custom strategies are not supported by Spark, but as any other adapter, it allows custom materializations. For context, strategies are different ways of implementing the same materialization. For example, the incremental materialization has multiple strategies, like merge, insert_overwrite, etc. So, we can create a custom materialization that behaves similar to the incremental materialization, but does the delete + insert.

The materialization is here, it is based both on the official materializations for Spark, and on the delete + insert code used on other adapters.
An example would look like this:

{{
  config(
    materialized='delete_insert',
    file_format='iceberg',
    partition_by=['year','month'],
    delete_key=['year','month','day','hour']
  )
}}

SELECT
    project,
    access_method,
    CAST(NULL AS STRING) AS zero_carrier,
    agent_type,
    referer_class,
    continent,
    country_code,
    '{{ var("record_version") }}' AS record_version,
    SUM(view_count) AS view_count,
    year,
    month,
    day,
    hour
FROM
    {{ source('wmf', 'pageview_hourly') }}
WHERE year={{ var("year") }} AND month={{ var("month") }} AND day={{ var("day") }} AND hour={{ var("hour") }}
GROUP BY
    project,
    access_method,
    agent_type,
    referer_class,
    continent,
    country_code,
    year,
    month,
    day,
    hour
;

The interesting part is that it will handle the Delete automatically, based on the delete_key, and it will remove records according to the condition used in the query. I mean, if we change the AND hour={{ var("hour") for a range of hours, the Delete statement will remove everything in that range.

I believe that if we end up using something like this, it might be good to contribute to the official dbt-spark adapter and add the strategy directly there, as delete + insert is something already considered by dbt itself, adding the strategy on the official repo shouldn't be too difficult, and we won't need a custom macro.

Very cool !
Let's schedule some time to work together on the Iceberg time partitioning.

During a call with @JAllemandou, we came up with a version that would look better for many cases. It does "delete + insert", using the dt(month) partition on Iceberg, and coalesce the result into 1 parquet file. This is the example:

{{
  config(
    materialized='delete_insert',
    file_format='iceberg',
    partition_by=['months(dt)'],
    delete_key=['dt']
  )
}}


-- run: dbt run --vars '{"year": "2025", "month": "11", "day": "5", "hour": "12", "record_version": "0.0.2"}' --select projectview_hourly_delete_insert

SELECT /*+ COALESCE(1) */
    project,
    access_method,
    CAST(NULL AS STRING) AS zero_carrier,
    agent_type,
    referer_class,
    continent,
    country_code,
    '{{ var("record_version") }}' AS record_version,
    SUM(view_count) AS view_count,
    CAST(CONCAT(
            LPAD({{ var("year") }}, 4, '0'), '-',
            LPAD({{ var("month") }}, 2, '0'), '-',
            LPAD({{ var("day") }}, 2, '0'), ' ',
            LPAD({{ var("hour") }}, 2, '0'), ':00:00'
         ) AS TIMESTAMP) as dt
FROM
    {{ source('wmf', 'pageview_hourly') }}
WHERE year={{ var("year") }} AND month={{ var("month") }} AND day={{ var("day") }} AND hour={{ var("hour") }}
GROUP BY
    project,
    access_method,
    agent_type,
    referer_class,
    continent,
    country_code,
    dt
ORDER BY dt, project, access_method, agent_type, referer_class, continent, country_code.
;

The solution presented above is completely aligned with our current way of doing.
We could even think of optimizations allowing to run bigger portions of data than the regular hourly job.
Great work!