Page MenuHomePhabricator

[Iceberg Migration] Implement mechanism for automatic Iceberg table maintenance
Closed, ResolvedPublic8 Estimated Story Points

Description

In this task, we want do implement a mechanism that would take care of the following:

A) Iceberg provides Spark procedures that allow us to optimize its performance by removing old snapshots, compacting small files, and more. (For an example see T335305#8877037). We want to trigger these procedures on a regular cadence.

B) Additionally, at WMF we keep analytics data around for 90 days tops. For regular Hive tables, we have developed a python helper script that takes care of data deletion. Runs of this script are typically done via systemd timers. Iceberg does not support partitions in the same way as Hive, but it does support DELETEs.

Some ideas:

Both (A) and (B) are doable via Spark SQL statements.
For (A), we can do something like CALL spark_catalog.system.rewrite_data_files('wmf_traffic.referrer_daily');.
For (B) DELETE * FROM wmf_traffic.referrer_daily WHERE day <= '2023-01-01';. Note that the DELETE is a metadata operation, and to really scrub the data out of HDFS we have to compact files after the DELETE.

We can then implement an Airflow DAG (DAGs?) that will take configuration for what to do with any particular Iceberg table, perhaps similarly as in the Cassandra Loading DAGs.

Config could look like so:

{
    "schema1.table1" : {
        "procedure_calls" : {
            "rewrite_data_files" : { "extraParamKey1": "extraParamVal1", ... }.
            "another_procedure" : ...
        }
        "data_deletion" : {
            "where" : "day <= {{ data_interval_start.substract(days=45) }}"
        }
    },
    "schema1.table2" : {
    ...
    }
}

We should make sure data deletion runs before the procedure calls to guarantee that data files are gone.

These SQL statements generate result sets with metadata. Perhaps that is interesting stuff we should keep around?

Event Timeline

Oh, this looks great! It will simplify the deletion of data so much... 👏👏👏

Regarding the DAGs, I think it would be cool to have a separate DAG for the deletion of each dataset,
unless they are very related: i.e. same data source, same update frequency, same retention period.
This way we can operate them separately.
For instance, if we had to stop deletion for a given dataset, we could do that.
Or if we had to change the retention period for another.
Or in case a dataset produces an error, and it blocks execution of its DAG, it would not affect other datasets.

That said, it would still be cool to group them in DAG files, that could generate multiple DAGs depending on the passed config as @xcollazo suggested!
That would reduce the number of DAG files we maintain.

With T340861 and T335860 coming to a close, we are now in a place where we should prioritize the work in this ticket.

This work will be generally useful for migrating more tables to Apache Iceberg, but it is also needed for Dumps 2.0 pipeline stability.

CC Test Kitchen team and @VirginiaPoundstone .

Commenting here that I no longer think the config should abstract away the SQL statements, as there may be little to gain there. This could be as simple as just referencing SQL statements to run per table?

Wanted to put this on the record so that it doesn't trip us in the future:

When running Iceberg CALLs in Spark 3.x, Spark insists on broadcasting all joins, and for tables with lots of files and/or snapshots, this OOMs the driver. After much pain, I figured the way to run reliably is by disabling auto broadcast with the flag spark.sql.autoBroadcastJoinThreshold=-1.

Here is an example successful spark-sql session:

spark3-sql   --master yarn   --executor-memory 16G   --executor-cores 4   --driver-memory 64G   --driver-cores 8   --conf spark.dynamicAllocation.maxExecutors=64   --conf spark.sql.shuffle.partitions=1024   --conf spark.driver.maxResultSize=32G --conf spark.sql.autoBroadcastJoinThreshold=-1

Here are two example successful CALLs doing maintenance against wmf_dumps.wikitext_raw_rc1:

spark-sql (default)> CALL spark_catalog.system.remove_orphan_files(
                   >     table => 'wmf_dumps.wikitext_raw_rc1',
                   >     max_concurrent_deletes => 5
                   > );
23/09/22 17:01:45 WARN TaskSetManager: Stage 0 contains a task of very large size (1255 KiB). The maximum recommended task size is 1000 KiB.
orphan_file_location
hdfs://analytics-hadoop/user/hive/warehouse/wmf_dumps.db/wikitext_raw_rc1/data/wiki_db=bewiki/revision_timestamp_day=2023-08-23/00023-905-ad1cbb90-4317-4bc8-be08-5c6dff8e974a-00001.avro
hdfs://analytics-hadoop/user/hive/warehouse/wmf_dumps.db/wikitext_raw_rc1/data/wiki_db=dewiki/revision_timestamp_day=2023-08-27/00047-1392-15951939-2c84-4db7-a271-a5a3d0608b5e-00001.avro
hdfs://analytics-hadoop/user/hive/warehouse/wmf_dumps.db/wikitext_raw_rc1/data/wiki_db=frwiki/revision_timestamp_day=2023-08-27/00067-1718-1dc388c1-0168-4953-8573-92b91a13bc81-00001.avro
hdfs://analytics-hadoop/user/hive/warehouse/wmf_dumps.db/wikitext_raw_rc1/data/wiki_db=simplewiki/revision_timestamp_day=2023-08-25/00099-1186-1a84aa9c-5452-48ec-ad69-f9a0693edd6c-00001.avro
...
Time taken: 215.282 seconds, Fetched 10416 row(s)
spark-sql (default)> CALL spark_catalog.system.expire_snapshots(
                   >     table => 'wmf_dumps.wikitext_raw_rc1',
                   >     stream_results => true
                   > );
deleted_data_files_count	deleted_manifest_files_count	deleted_manifest_lists_count
14998	3215	285
Time taken: 273.295 seconds, Fetched 1 row(s)
Ahoelzl renamed this task from Implement mechanism for automatic Iceberg data deletion and optimization to [Iceberg Migration] Implement mechanism for automatic Iceberg data deletion and optimization.Oct 20 2023, 5:05 PM

I would like to add rewrite_manifests to the list of maintenance actions:

xcollazo changed the task status from Open to In Progress.Aug 16 2024, 6:20 PM
xcollazo claimed this task.
xcollazo triaged this task as Medium priority.

Started working on T358365: Implement dataset maintenance for wmf_dumps.wikitext_raw, and figured it would be silly to implement this just for Dumps 2.0 .

So I will be sharing a draft implementation for this mechanism shortly. I bet folks will have opinions about it, so let's discuss the implementation over in the MR.

Example task output for table xcollazo.wikitext_inconsistent_rows_rc1:

remove_orphan_files:

[2024-08-22, 19:21:54 UTC] {spark_submit.py:571} INFO - Spark master: yarn, Application Id: application_1719935448343_789531
[2024-08-22, 19:22:56 UTC] {spark_submit.py:571} INFO - orphan_file_location
[2024-08-22, 19:22:56 UTC] {spark_submit.py:571} INFO - hdfs://analytics-hadoop/user/hive/warehouse/xcollazo.db/wikitext_inconsistent_rows_rc1/metadata/00010-325cfea5-78ac-4adb-8bd7-0e53ea4c29b3.metadata.json
[2024-08-22, 19:22:56 UTC] {spark_submit.py:571} INFO - hdfs://analytics-hadoop/user/hive/warehouse/xcollazo.db/wikitext_inconsistent_rows_rc1/metadata/00025-af2c54c5-c2b2-4c38-8284-11d2d42406a1.metadata.json
[2024-08-22, 19:22:56 UTC] {spark_submit.py:571} INFO - hdfs://analytics-hadoop/user/hive/warehouse/xcollazo.db/wikitext_inconsistent_rows_rc1/metadata/00015-42adfd71-e7c8-4851-8e66-5cc1e01347ed.metadata.json
[2024-08-22, 19:22:56 UTC] {spark_submit.py:571} INFO - hdfs://analytics-hadoop/user/hive/warehouse/xcollazo.db/wikitext_inconsistent_rows_rc1/metadata/00018-c10f0918-e97c-4923-9018-b56408849428.metadata.json
[2024-08-22, 19:22:56 UTC] {spark_submit.py:571} INFO - hdfs://analytics-hadoop/user/hive/warehouse/xcollazo.db/wikitext_inconsistent_rows_rc1/metadata/00027-c786c292-b630-4f5c-bbc6-769b9d593b66.metadata.json
...
[2024-08-22, 19:22:56 UTC] {spark_submit.py:571} INFO - Time taken: 62.081 seconds, Fetched 31 row(s)

expire_snapshots:

[2024-08-22, 19:24:21 UTC] {spark_submit.py:571} INFO - Spark master: yarn, Application Id: application_1719935448343_789536
[2024-08-22, 19:25:11 UTC] {spark_submit.py:571} INFO - deleted_data_files_count	deleted_manifest_files_count	deleted_manifest_lists_count
[2024-08-22, 19:25:11 UTC] {spark_submit.py:571} INFO - 0	0	0
[2024-08-22, 19:25:11 UTC] {spark_submit.py:571} INFO - Time taken: 49.782 seconds, Fetched 1 row(s)

rewrite_manifests:

[2024-08-22, 19:26:21 UTC] {spark_submit.py:571} INFO - Spark master: yarn, Application Id: application_1719935448343_789546
[2024-08-22, 19:26:23 UTC] {spark_submit.py:571} INFO - 24/08/22 19:26:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[2024-08-22, 19:26:38 UTC] {spark_submit.py:571} INFO - rewritten_manifests_count	added_manifests_count
[2024-08-22, 19:26:38 UTC] {spark_submit.py:571} INFO - 4	1
[2024-08-22, 19:26:38 UTC] {spark_submit.py:571} INFO - Time taken: 17.714 seconds, Fetched 1 row(s)

delete_data:

...
[2024-08-22, 19:27:36 UTC] {spark_submit.py:401} INFO - Spark-Submit cmd: spark3-submit --master yarn --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf --conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3 --executor-cores 2 --executor-memory 4G --driver-memory 4G --keytab /etc/security/keytabs/analytics-privatedata/analytics-privatedata.keytab --principal analytics-privatedata/stat1011.eqiad.wmnet@WIKIMEDIA --name table_maintenance_monthly__xcollazo__wikitext_inconsistent_rows_rc1.delete_data__20240822 --class org.apache.spark.sql.hive.thriftserver.WMFSparkSQLCLIDriver --queue default --deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics/wmf-sparksqlclidriver-1.0.0.jar -e DELETE FROM xcollazo.wikitext_inconsistent_rows_rc1 WHERE computation_dt <= TIMESTAMP '2024-07-02T00:00:00'
...
[2024-08-22, 19:27:53 UTC] {spark_submit.py:571} INFO - Spark master: yarn, Application Id: application_1719935448343_789549
[2024-08-22, 19:27:55 UTC] {spark_submit.py:571} INFO - Response code
[2024-08-22, 19:27:55 UTC] {spark_submit.py:571} INFO - Time taken: 1.852 seconds

Note we can see the command output directly in the task log because launcher=local. We should discuss if that is what we want for these types of CALL statements.

One thing I have not contemplated so far is a call to rewrite_data_files(), mostly because we have not needed this in Dumps 2.0 as we do copy on write, but I'm sure we have use cases for it generally. CALLs to this procedure are typically more complicated, and look like so:

CALL spark_catalog.system.rewrite_data_files(
    table => 'wmf_dumps.wikitext_raw_rc2',
    options => map(
        'partial-progress.enabled', true,
        'partial-progress.max-commits', 100,
        'rewrite-all', true,
        'max-file-group-size-bytes', 26843545600
    ),
    where => 'wiki_db = "simplewiki"'
)

The options all depend on the target and what we find words best for each table.

I wonder if we should define these calls in the configuration, like so:

...
    "wmf_dumps. wikitext_raw_rc2": {
        "do_usual_maintenance": True,
        "rewrite_data_files": {
            options : [
            'partial-progress.enabled', true,
            'partial-progress.max-commits', 100,
            'rewrite-all', true,
            'max-file-group-size-bytes', 26843545600
            ],
            "where" : "'wiki_db = "simplewiki"'"
        }
    },
...

Or if we should allow definition of verbatim CALLs:

...
    "wmf_dumps. wikitext_raw_rc2": {
        "do_usual_maintenance": True,
        "rewrite_data_files": "
CALL spark_catalog.system.rewrite_data_files(
    table => 'wmf_dumps.wikitext_raw_rc2',
    options => map(
        'partial-progress.enabled', true,
        'partial-progress.max-commits', 100,
        'rewrite-all', true,
        'max-file-group-size-bytes', 26843545600
    ),
    where => 'wiki_db = "simplewiki"'
)
"
        }
    },
...
xcollazo renamed this task from [Iceberg Migration] Implement mechanism for automatic Iceberg data deletion and optimization to [Iceberg Migration] Implement mechanism for automatic Iceberg table maintenance.Aug 30 2024, 2:10 PM

It's Friday so I won't attempt a deploy. Added this to the regular tuesday analytics train at https://etherpad.wikimedia.org/p/analytics-weekly-train

This was deployed today. I enabled the Airflow job and it is WAD. 🎉