Page MenuHomePhabricator

Migrate referrer_daily to Iceberg
Closed, ResolvedPublic5 Estimated Story Points

Description

If you don't know about the Iceberg format, please read some docs (https://iceberg.apache.org/)

  • Prepare the data model
    • we wish to partition the data in iceberg by time using hidden partitioning and partition size to be defined based on datasize.
    • The dataset might be missing a timestamp field as time data would then be encoded in the partition values.
    • We then need to add a timestamp field to the new table
  • Convert old data to the new data model in iceberg
    • We will duplicate the dataset creating a new table with the iceberg format and new model
    • We will load the old data onto the new table as a spark one-off.
  • Add a new airflow job to automatically insert new data into the new table

After T335306 and T335314 are done and we know we can deprecate the old dataset:
- Communicate the deprecation
- Update airflow jobs depedent on the non-iceberg table to use the iceberg table
- Deprecate the old dataset
Edit: Let's do deprecation and removal on a separate task. This way, we could compartmentalize these deprecations by table groups (say, for all migrated to wmf_traffic.).

For this first exercise, we have chosen a small dataset referrer_daily : https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Traffic/referrer_daily

Details

Related Changes in Gerrit:
Related Changes in GitLab:
TitleReferenceAuthorSource BranchDest Branch
Add iceberg version of referrer_daily table.repos/data-engineering/airflow-dags!378xcollazoT335305-migrate-referrer-daily-to-icebergmain
Customize query in GitLab

Event Timeline

xcollazo renamed this task from Update the dataset to an Iceberg compatible format and convert to iceberg (new datasets) to Migrate referrer_daily to Iceberg.May 1 2023, 3:31 PM
xcollazo updated the task description. (Show Details)
xcollazo set the point value for this task to 5.

Change 917404 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[analytics/refinery@master] Add iceberg version of referrer_daily table.

https://gerrit.wikimedia.org/r/917404

On my first go at this table, I followed current partitioning strategy of partitioning by day:

CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily_iceberg_part_by_date`(
    `country`             string  COMMENT 'Reader country per IP geolocation',
    `lang`                string  COMMENT 'Wikipedia language -- e.g., en for English',
    `browser_family`      string  COMMENT 'Browser family from user-agent',
    `os_family`           string  COMMENT 'OS family from user-agent',
    `search_engine`       string  COMMENT 'One of ~20 standard search engines (e.g., Google)',
    `num_referrals`       int     COMMENT 'Number of pageviews from the referral source',
    `day`                 date    COMMENT 'The date of the request'
)
USING ICEBERG
PARTITIONED BY (day);

@JAllemandou suggested to try partitioning by month to experiment with Iceberg hidden partitioning, but also to try and tackle the small files problem since this and many other of our tables are small:

CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily_iceberg_part_by_month`(
    `country`             string  COMMENT 'Reader country per IP geolocation',
    `lang`                string  COMMENT 'Wikipedia language -- e.g., en for English',
    `browser_family`      string  COMMENT 'Browser family from user-agent',
    `os_family`           string  COMMENT 'OS family from user-agent',
    `search_engine`       string  COMMENT 'One of ~20 standard search engines (e.g., Google)',
    `num_referrals`       int     COMMENT 'Number of pageviews from the referral source',
    `day`                 date    COMMENT 'The date of the request'
)
USING ICEBERG
PARTITIONED BY (months(day));

To experiment with the two versions, I backfilled them with data available from May 2023:

for i in {1..22}
do
   spark3-sql --master yarn --executor-memory 8G --executor-cores 4 --driver-memory 2G --conf spark.dynamicAllocation.maxExecutors=64 \
              -f compute_referer_daily_iceberg.hql                                                    \
              -d min_num_daily_referrals=500                                                  \
              -d source_table=wmf.pageview_actor                                              \
              -d referer_daily_destination_table=xcollazo_iceberg.referrer_daily_iceberg_part_by_date                    \
              -d coalesce_partitions=4                                                        \
              -d year=2023                                                                    \
              -d month=5                                                                      \
              -d day="${i}"

   spark3-sql --master yarn --executor-memory 8G --executor-cores 4 --driver-memory 2G --conf spark.dynamicAllocation.maxExecutors=64 \
              -f compute_referer_daily_iceberg.hql                                                    \
              -d min_num_daily_referrals=500                                                  \
              -d source_table=wmf.pageview_actor                                              \
              -d referer_daily_destination_table=xcollazo_iceberg.referrer_daily_iceberg_part_by_month                    \
              -d coalesce_partitions=4                                                        \
              -d year=2023                                                                    \
              -d month=5                                                                      \
              -d day="${i}"
done

Confirmed data is equal:

spark-sql (default)> select count(1) from referrer_daily_iceberg_part_by_month;
count(1)
205709
Time taken: 11.977 seconds, Fetched 1 row(s)
spark-sql (default)> select count(1) from referrer_daily_iceberg_part_by_date;
count(1)
205709

Total amount of parquet files and total file size:

date:

xcollazo@stat1007:/mnt/hdfs/user/hive/warehouse/xcollazo_iceberg.db/referrer_daily_iceberg_part_by_date/data$ ls ./* | grep parquet | wc -l
88

xcollazo@stat1007:/mnt/hdfs/user/hive/warehouse/xcollazo_iceberg.db/referrer_daily_iceberg_part_by_date/data$ ls -lsa ./* | grep parquet | awk {'print $6'} | awk '{ sum+=$1; }END{print sum;}'
1514942

month:

xcollazo@stat1007:/mnt/hdfs/user/hive/warehouse/xcollazo_iceberg.db/referrer_daily_iceberg_part_by_month/data$ ls ./* | grep parquet | wc -l
88

xcollazo@stat1007:/mnt/hdfs/user/hive/warehouse/xcollazo_iceberg.db/referrer_daily_iceberg_part_by_month/data$ ls -lsa ./* | grep parquet | awk {'print $6'} | awk '{ sum+=$1; }END{print sum;}'
1515813

Note that this is a very small table at ~1515813 bytes = ~1.44 MBs.

Running rewrite_data_files yields:

spark-sql (default)>  CALL spark_catalog.system.rewrite_data_files('xcollazo_iceberg.referrer_daily_iceberg_part_by_month');
rewritten_data_files_count	added_data_files_count	rewritten_bytes_count
88	1	1515813
Time taken: 15.262 seconds, Fetched 1 row(s)

spark-sql (default)>  CALL spark_catalog.system.rewrite_data_files('xcollazo_iceberg.referrer_daily_iceberg_part_by_date');
rewritten_data_files_count	added_data_files_count	rewritten_bytes_count
0	0	0
Time taken: 0.211 seconds, Fetched 1 row(s)

The monthtly partitioned version benefits from the rewrite, while the partitioned by date does not. At least not with the default parameters.

Let's now expire snapshots, which will also remove data files that are not referenced anymore:

spark-sql (default)> select now();
now()
2023-05-24 01:32:52.621

spark-sql (default)> CALL spark_catalog.system.expire_snapshots('xcollazo_iceberg.referrer_daily_iceberg_part_by_month', TIMESTAMP '2023-05-24 01:32:52.621');
deleted_data_files_count	deleted_manifest_files_count	deleted_manifest_lists_count
88	22	44
Time taken: 38.351 seconds, Fetched 1 row(s)

spark-sql (default)> CALL spark_catalog.system.expire_snapshots('xcollazo_iceberg.referrer_daily_iceberg_part_by_date', TIMESTAMP '2023-05-24 01:32:52.621');
deleted_data_files_count	deleted_manifest_files_count	deleted_manifest_lists_count
0	0	43
Time taken: 18.661 seconds, Fetched 1 row(s)

Now we can count files again:

xcollazo@stat1007:/mnt/hdfs/user/hive/warehouse/xcollazo_iceberg.db/referrer_daily_iceberg_part_by_month/data$ ls ./* | grep parquet | wc -l
1

xcollazo@stat1007:/mnt/hdfs/user/hive/warehouse/xcollazo_iceberg.db/referrer_daily_iceberg_part_by_date/data$ ls ./* | grep parquet | wc -l
88

Take aways:

  • In particular for the referrer_daily table, one way to avoid having many files right away is to tune down the COALESCE parameter. In the Airflow job script, it is currently set to 8! But given the ~20KB files, it does look like we really should be writing the one file.
  • In general, both big and small tables will benefit from automating runs of rewrite_data_files() and expire_snapshots(), helping us tackle the small files problem.
  • In general, our smaller tables will benefit from weekly, monthly, or even yearly hidden partitioning since, as we can see from example above, file sizes are very small, and scanning one ~1.5MB file with one executor is more efficient than spinning up executors to read ~20KB files.
  • In general, our smaller tables will benefit from weekly, monthly, or even yearly hidden partitioning since, as we can see from example above, file sizes are very small, and scanning one ~1.5MB file with one executor is more efficient than spinning up executors to read ~20KB files.

Yay!

Change 917404 merged by Milimetric:

[analytics/refinery@master] Add iceberg version of referrer_daily table.

https://gerrit.wikimedia.org/r/917404

Executed the following to create a new HDFS folder for the wmf_traffic database:

ssh an-coord1001.eqiad.wmnet
sudo -u hdfs bash

kerberos-run-command hdfs hdfs dfs -mkdir /wmf/data/wmf_traffic
kerberos-run-command hdfs hdfs dfs -chown analytics:analytics-privatedata-users /wmf/data/wmf_traffic
kerberos-run-command hdfs hdfs dfs -chmod 755 /wmf/data/wmf_traffic

Confirming everything looks good:

hdfs@an-coord1001:/home/xcollazo$ kerberos-run-command hdfs hdfs dfs -ls /wmf/data | grep traffic
drwxr-xr-x   - analytics          analytics-privatedata-users          0 2023-05-30 15:44 /wmf/data/wmf_traffic

Ran the following manual steps.

Create the new wmf_traffic database:

sudo -u analytics bash

kerberos-run-command analytics hive
hive (default)> create database wmf_traffic;
OK
Time taken: 0.091 seconds

Create the table:

kerberos-run-command analytics spark3-sql
spark-sql (default)> use wmf_traffic;
Response code
Time taken: 2.248 seconds
spark-sql (default)> CREATE EXTERNAL TABLE IF NOT EXISTS `referrer_daily`(
                   >     `country`             string  COMMENT 'Reader country per IP geolocation',
                   >     `lang`                string  COMMENT 'Wikipedia language -- e.g., en for English',
                   >     `browser_family`      string  COMMENT 'Browser family from user-agent',
                   >     `os_family`           string  COMMENT 'OS family from user-agent',
                   >     `search_engine`       string  COMMENT 'One of ~20 standard search engines (e.g., Google)',
                   >     `num_referrals`       int     COMMENT 'Number of pageviews from the referral source',
                   >     `day`                 date    COMMENT 'The date of the request'
                   > )
                   > USING ICEBERG
                   > PARTITIONED BY (months(day))
                   > LOCATION '/wmf/data/wmf_traffic/referrer/daily'
                   > ;
Response code
Time taken: 0.968 seconds

Backfill the table with all data available at wmf.referrer_daily:

analytics@an-coord1001:/home/xcollazo$ kerberos-run-command analytics spark3-sql --master yarn --executor-memory 8G --executor-cores 4 --driver-memory 2G --conf spark.dynamicAllocation.maxExecutors=64


INSERT INTO wmf_traffic.referrer_daily
SELECT /*+ COALESCE(1) */
       country,
       lang,
       browser_family,
       os_family,
       search_engine,
       num_referrals,
       TO_DATE(CONCAT_WS('-', LPAD(year, 4, '0'), LPAD(month, 2, '0'), LPAD(day, 2, '0')), 'yyyy-MM-dd') AS day
FROM wmf.referrer_daily
ORDER BY day;

Confirm data seems legit:

spark-sql (default)> select count(1) from wmf.referrer_daily;
count(1)
7127894
Time taken: 19.58 seconds, Fetched 1 row(s)
spark-sql (default)> select count(1) from wmf_traffic.referrer_daily;
count(1)
7127894
Time taken: 3.982 seconds, Fetched 1 row(s)

Mentioned in SAL (#wikimedia-operations) [2023-05-30T19:48:39Z] <xcollazo@deploy1002> Started deploy [airflow-dags/analytics@cd667c2]: Deplot Iceberg version of referrer_daily on analytics Airflow instance. T335305.

Mentioned in SAL (#wikimedia-operations) [2023-05-30T19:48:49Z] <xcollazo@deploy1002> Finished deploy [airflow-dags/analytics@cd667c2]: Deplot Iceberg version of referrer_daily on analytics Airflow instance. T335305. (duration: 00m 09s)

With https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/378 now merged and deployed, I will wait till the next daily run to confirm things look good.

The Iceberg SparkSqlOperator had failed due to me forgetting that refinery-source must be deployed...!

@mforns did the deployment. Thanks Marcel!

The DAG run for 2023-05-30 is now successful, and I can confirm that we have the data on the Iceberg table and it matches the original Hive table:

spark-sql (default)> select count(1) from referrer_daily where day = '2023-05-30';
count(1)
9278
Time taken: 1.927 seconds, Fetched 1 row(s)
spark-sql (default)> select count(1) from wmf.referrer_daily where year = 2023 and month = 5 and day = 30;
count(1)
9278
Time taken: 1.5 seconds, Fetched 1 row(s)

Before we close this though, I should document this table.

I have started the general documentation effort by creating an Iceberg page here https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Iceberg.

I have also documented the Iceberg version of referrer_daily at https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Traffic/referrer_daily. I've marked is as experimental for now, until we settle the database naming being discussed at T337562.

xcollazo updated the task description. (Show Details)

We are done with our first production level Iceberg table! 🎉 🎉 🎉