Page MenuHomePhabricator

[SPIKE] Evaluation on iceberg sensor for airflow
Closed, ResolvedPublic5 Estimated Story PointsSpike

Description

We need to be able to trigger new jobs based on new data arrival in iceberg tables. Approaches to evaluate:

  • Adding a piece of metadata that is part of the commit
  • Querying the data: The sensor could be a query
  • sense for snapshots
  • Partition sensing

Key tasks

  • Investigate with the Iceberg community solutions or approaches
  • Document pros and cons of each approach
  • Present results and make a final decision

Event Timeline

JAllemandou renamed this task from Investigate Airflow dependency to Investigate iceberg sensor for airflow.Apr 24 2023, 8:15 PM
JAllemandou updated the task description. (Show Details)

I did a deep dive here. Here is what I found:

  1. There are currently no Airflow Sensors available for Iceberg tables.
  2. At first, it looked like we could implement it ourselves by leveraging PyIceberg (an incomplete but active python interface to Iceberg).
  3. Unfortunately, (2) is missing many pieces that we would need:
    • (A) since we are keeping the hive metastore as our catalog, PyIceberg would need to connect to it to fetch the latest metadata location for a particular table. But PyIceberg does not currently support Kerberos. Looking at code, I speculate implementing this could take a couple days.
    • (B) PyIceberg does not currently support HDFS! Looking at code, I speculate this could be done in a couple days. They already support fsspec and pyarrow, and those two FileIO interfaces already support HDFS, so implementing it should be a matter of a bit of code, a bunch of testing and documentation. Kerberos may throw a wrench on that though.
    • (C) It does not look like PyIceberg supports listing Partitions yet. The metadata is indeed available (as can be seen on the Spark implementation), just not exposed thru this utility yet. This however likely requires a detailed understanding of how Iceberg metadata works. I speculate this would take a week or two.
  4. (3) does seem like a great opportunity to contribute these features, but I also acknowledge that this is likely a full sprint of work or a bit more, considering the learning curve.
  5. An alternative to (3) would be to instead use Iceberg's Java Table API to get the partition info. The learning curve could be smaller, since we will be utilizing an existing API rather than implementing it. We could wrap that API as a utility (think hdfs dfs), and we could have it spit out the partition info ( something like iceberg --partitions --target db1.table1). We would still need to deal with connecting to Hive Metastore (and figuring if Kerberos is supported) in this approach, and we would also need to maintain this one off utility (while going PyIceberg has a community behind it). But I speculate we could do this in a week or so.
  6. Finally, please note that on either approach (3) or (5), we still need to also wrap it into an Airflow Sensor. So add a couple more days.

CC @lbowmaker, @JAllemandou, @Ottomata, @Milimetric

JArguello-WMF renamed this task from Investigate iceberg sensor for airflow to Decide on iceberg sensor for airflow.May 15 2023, 4:48 PM
JArguello-WMF set the point value for this task to 3.

Iceberg tries to make partitions transparent, plus at some point we will be using partition compaction, which could make the partitioning of a dataset for a given time interval unpredictable.
This makes me think that looking at partitions when probing for the existence of data might be problematic?

An alternative would be to query the tables to check for data for a given time interval, for instance:

select count(*) from <table> where dt between <t1> and <t2>;

Or use Spark directly. We could wrap this in a Spark scala app, that would return whether the data exists or not via exit codes or something.

Iceberg tries to make partitions transparent, plus at some point we will be using partition compaction, which could make the partitioning of a dataset for a given time interval unpredictable.
This makes me think that looking at partitions when probing for the existence of data might be problematic?

When partitioning evolves, we will definitely find different data. Example: if we first partition by date, we will find partition 2020-01-01, but then if we switch to partitioning by hour, we will also find 2020-01-02 01:00:00:00. Since old data is not rewritten unless you compact the table, we may find both of those partitioning schemas when scanning for metadata.

So you are right that this would be difficult to sense if we evolve the table.

An alternative would be to query the tables to check for data for a given time interval, for instance:

select count(*) from <table> where dt between <t1> and <t2>;

Or use Spark directly. We could wrap this in a Spark scala app, that would return whether the data exists or not via exit codes or something.

Another alternative would be to drop the idea of trying to sense time intervals altogether, and start sensing for Iceberg snapshots. So for every commit to the table, there is a unique snapshot ID. We can keep track of what snapshot is the last, and when that changes, do an incremental read. But this also gets complicated since we now need to keep track of state (snapshot IDs).

Thanks for bringing this up Marcel, I feel like we need a spike here to consider the options.

Another alternative would be to drop the idea of trying to sense time intervals altogether, and start sensing for Iceberg snapshots.

Oh, wow. This is cool! (I spent a while reading the pasted Iceberg docs, and whoa... looks comoplicated)

I wonder though if snapshots will cover the use cases of re-runs and backfilling?
Since old snapshots are garbage-collected after some time.

EDIT:
Also, imagine an hourly Airflow DAG failed at hour 6, but subsequent hours succeeded until now. We'd have the latest Iceberg snapshot indicating data has been updated now, but it would be difficult to know that hour 6 is missing, no?

Restricted Application changed the subtype of this task from "Task" to "Spike". · View Herald TranscriptMay 23 2023, 5:44 PM
JArguello-WMF renamed this task from Decide on iceberg sensor for airflow to [SPIKE] Evaluation on iceberg sensor for airflow.May 24 2023, 5:24 PM
JArguello-WMF updated the task description. (Show Details)
JArguello-WMF changed the point value for this task from 3 to 5.

Folks from the community gave me some answers:

  1. One person is using the Java Table API to check for data existence. They are calling Java from Python. I asked if they had had issues with that. And they said they hadn't, that the only difficulty was "that you need to get yourself familiar with py4j stuffs and put some glue code to connect two sides". I guess this gives plus points to option #5.
  1. Another person suggested to use Spark's change data capture API. After reading the docs, I think we could use this.

Both options seem like a potential path forward.

get yourself familiar with py4j stuffs

I've used this API a bunch for pyflink stuff, and pyspark uses this too. Spark/Flink handles a bunch of stuff that could be hard (adding jars to workers classpath, etc.?) but if you don't need this stuff (I think you don't here), using instantiating a jvm with py4j directly probably is fine. As long as you are doing basic stuff, the API is actually pretty easy to work with.

use Spark's change data capture API

Cool!

I received another answer, this time from a committer & PMC on Apache Avro, Airflow, Druid and Iceberg. He thinks the best way would be to use PyIceberg and implement support for HDFS/Kerberos. Since spinning up a full JVM/Spark would be "overkill for just checking the number of rows".

I think if we use PyIceberg, we can use:

from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual

catalog = load_catalog("default")
table = catalog.load_table("nyc.taxis")

scan = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
)

[task.file.file_path for task in scan.plan_files()]

Which returns the files that may contain data for the given time interval.
Those file objects indicate whether they are data files or delete files.
We can assume data is complete for a given time interval if the last data file is of type data, no?

We discussed in standup whether it would be OK to spawn JVM/Spark for each poke of the Iceberg sensors.
So here's some modelling:

We have currently 151 sensors running in the analytics Airflow instance.
But not all of them run every 5 minutes, here are the frequencies:

frequency  sensors
30sec            1
5min            10
10min           21
15min           21
20min           15
1h              74
2h               6
6h               2

In average our sensors poke for data 7.6 times per minute. Initially, that seems like a low amount.

However, the sensors tend to trigger at the same time, because of how dag_runs are triggered.
So it could happen that at the beginning of each hour a good amount of sensors (>100) trigger almost at the same time.

If each of those sensors is spawning Spark, and it's using 1 core and 1G memory for the driver and 1 core and 1G memory for the executor (would we need executor?),
then we'd be using 200 cores and 200G memory of the cluster, once every hour (during a couple minutes?), just for Iceberg sensors.

Is that OK? I guess for now it is?
Would it be OK if Airflow grows, say 10x?

If each of those sensors is spawning Spark, and it's using 1 core and 1G memory for the driver and 1 core and 1G memory for the executor (would we need executor?),
then we'd be using 200 cores and 200G memory of the cluster, once every hour (during a couple minutes?), just for Iceberg sensors.

Is that OK? I guess for now it is?
Would it be OK if Airflow grows, say 10x?

Could we use presto instead of spark?

Could we use presto instead of spark?

That would be lighter and with less latency, right?
But, I thought not all datasets were queriable in the Presto cluster...?

If each of those sensors is spawning Spark, and it's using 1 core and 1G memory for the driver and 1 core and 1G memory for the executor (would we need executor?),
then we'd be using 200 cores and 200G memory of the cluster, once every hour (during a couple minutes?), just for Iceberg sensors.

Is that OK? I guess for now it is?
Would it be OK if Airflow grows, say 10x?

To me, that definitely sounds like a lot of waste, even if for a few minutes, considering that other folks may get latency in getting their executors fulfilled just because we are sensing.

Could we use presto instead of spark?

That would be lighter and with less latency, right?
But, I thought not all datasets were queriable in the Presto cluster...?

All tables declared in the Hive Metastore are (currently) queryable from Presto, and this includes Iceberg tables. See T335314#8852976.

If each of those sensors is spawning Spark

Maybe I misunderstood something, but why do these need to use Spark? I thought just a local JVM would suffice, so we could use "Java Table API"...hm, oh, is this via Spark? Sorry misunderstood. Indeed, I think launching Spark just for sensors would be overkill. Python + py4j to call some Java code could be okay though.

If we use Presto, we wouldn't need to launch anything, since we could use Presto's SQL server. We could even avoid Skein, since we could call Presto SQL from the sensor itself (Airflow machine). Joseph and I did some tests today, and it took 4 seconds to sense for a full week of daily referrer data. I also like the idea of using Java Table API, but at the same time I think the way that Iceberg is intended to be used is via SQL, and not checking internals (manifests, files, partitions...), no?

Because Iceberg guarantees that updates are atomic, we know we won't check for the presence of data in the middle of a write. Also, we know that if we find 1 record in a given update interval, then all the interval must have data.

If we use Presto, we wouldn't need to launch anything, since we could use Presto's SQL server.

I like the simplicity of this approach. It got me thinking of the following though: we'd be coupling the availability of Presto with the availability (or progress) of Airflow, which I think is something to consider.

I also like the idea of using Java Table API, but at the same time I think the way that Iceberg is intended to be used is via SQL, and not checking internals (manifests, files, partitions...), no?

The Java Table API is public, as is meant to be used by non-sql systems to query Iceberg. It does expose manifests, etc, but I don't think we'd need to go that deep, we just need the data files.

Also, we know that if we find 1 record in a given update interval, then all the interval must have data.

A question: Do we ever ingest 0 data rows? Or is this quite unlikely?

I like the simplicity of this approach. It got me thinking of the following though: we'd be coupling the availability of Presto with the availability (or progress) of Airflow, which I think is something to consider.

Yes, agree.

The Java Table API is public, as is meant to be used by non-sql systems to query Iceberg. It does expose manifests, etc, but I don't think we'd need to go that deep, we just need the data files.

Yes it is public, it was inaccurate to say "internals".
But we still need to get the files by condition and check whether they are just data files or data and delete files,
in which case we should look at their order/snapshot_id to determine whether the condition interval has data or not.
Plus, if we decide that a dataset's partitioning is different from their update frequency (dataset partitioned monthly updated daily),
the list of files we'd get by using the API could contain more than the data enclosed in the condition interval,
and we'd still need to read the files and filter for the rows that fulfill the condition, no?
I still think it's fine to do all that (since that's how iceberg is defined), just explaining why I said "internals" :-)

A question: Do we ever ingest 0 data rows? Or is this quite unlikely?

Huh... very good point!
Indeed we have some tables in the event and event_sanitized databases that are so low-throughput that for some hours they don't receive any events.
They should receive always canary events though, but yea we'd depend on the availability of the canary events service.
Apart from those event-bases low-throughput datasets, I think all other datasets are very unlikely to receive 0-row updates.
This issue would penalize both the Java Table API and the Iceberg query approaches, no?

The Java Table API is public, as is meant to be used by non-sql systems to query Iceberg. It does expose manifests, etc, but I don't think we'd need to go that deep, we just need the data files.

Yes it is public, it was inaccurate to say "internals".
But we still need to get the files by condition and check whether they are just data files or data and delete files,
in which case we should look at their order/snapshot_id to determine whether the condition interval has data or not.
Plus, if we decide that a dataset's partitioning is different from their update frequency (dataset partitioned monthly updated daily),
the list of files we'd get by using the API could contain more than the data enclosed in the condition interval,
and we'd still need to read the files and filter for the rows that fulfill the condition, no?
I still think it's fine to do all that (since that's how iceberg is defined), just explaining why I said "internals" :-)

Fair enough, I certainly agree it is more complicated that just doing a query in Presto!

A question: Do we ever ingest 0 data rows? Or is this quite unlikely?

Huh... very good point!
Indeed we have some tables in the event and event_sanitized databases that are so low-throughput that for some hours they don't receive any events.
They should receive always canary events though, but yea we'd depend on the availability of the canary events service.
Apart from those event-bases low-throughput datasets, I think all other datasets are very unlikely to receive 0-row updates.
This issue would penalize both the Java Table API and the Iceberg query approaches, no?

Right it would be a negative for both of those approaches. With regular Hive tables we don't have this issue since we consume partitions.

This sensing on top of Iceberg business is tough!

Today we discussed the possibility of adding custom metadata to an Iceberg snapshot as a mean to replace our _SUCCESS files.

It took a while to find the feature since it doesn't seem to be documented, but the PR that implemented it is https://github.com/apache/iceberg/pull/1241, and the way to use it is:

df : DataFrame = ...

df
  .writeTo("t1")
  .option("snapshot-property.key", "value")
  .append()

So again, this would mean that we will have to either use Scala or PySpark to write; the feature doesn't work via vanilla SQL.

Another alternative to mimic _SUCCESS that was discussed in today's standup was using Airflow's database, or even ZooKeeper. We hesitated because of the complexity and adding one more dependency.

But, how about having an Iceberg table that keeps track of Iceberg ingestions? Perhaps a schema like this:

database_name : STRING
table_name : STRING
ingested_time_period_start : TIMESTAMP
ingested_time_period_end : TIMESTAMP
....

Then we can have an Airflow operator that INSERTs into this table after a successful ingest into an Iceberg table.

Downstream Airflow jobs would use a sensor that checks whether the target time period has been ingested. Considering this ingestion table will likely be small, a SELECT TRUE FROM ingestions WHERE database_name = 'X' AND table_name = 'Y' and ingested_time_period_end = {{ ds }} would be quite cheap.

Pros I see here:

  • no ambiguity on whether ingestion happened or not (i.e. the 0-rows issue).
  • Independent of Airflow and/or Airflow upgrades
  • Doesn't add a service dependency.

Cons:

  • Using an analytics table format for doing transactional work.

custom metadata to an Iceberg snapshot

Q: How would this be queried by the sensor?

Iceberg table that keeps track of Iceberg ingestions

Could be nice! Would also be nice to maybe even keep track of some statistics? count of records inserted / modified? etc?

Is there a reason to do this as Iceberg instead of just plain ol' Hive?

Iceberg table that keeps track of Iceberg ingestions

I really like that idea! Data would be small by nature (not necessarily the case when using actual data and not a summarized view), it solves the problem of 0 rows insertions, and adds a "reference" of computed jobs.

Could be nice! Would also be nice to maybe even keep track of some statistics? count of records inserted / modified? etc?

I also like the idea of adding statistics - it could seen as adding data quality metrics! But this wouldn't come from free as it involve querying the written data to get those stats. Let's discuss about this step, as we'd need to define the metrics we'd want to store and check.

Is there a reason to do this as Iceberg instead of just plain ol' Hive?

Yes! We'd want to be able to do parallel insertions for multiple jobs to save their data, and to do data compaction in order to remove small files. If we go for plain ol' hive, we'd need to partition by dataset and time, but time-partitionning wouldn't be the same based on dataset, so not that easy, and many small files and partitions. Iceberg should help us here :)

Iceberg table that keeps track of Iceberg ingestions

I also like this idea!
Good thing about having it in an Iceberg table is that we can also apply deletes. This would help in case we have some mechanism that triggers cascading re-runs.
Maybe we could implement our own ExternalTaskSensor/ExternalTaskMarker in Airflow that would use this table.
This way we could get cascading re-runs without being limited to 1 Airflow instance, plus the state would not be in the Airflow database. Win-win.
Maybe we could even use this table to aid us implementing Refine?

custom metadata to an Iceberg snapshot

Q: How would this be queried by the sensor?

We'd still have to use PyIceberg for that. So we would still need to implement some missing functionality discussed in T335306#8830541.

Could be nice! Would also be nice to maybe even keep track of some statistics? count of records inserted / modified? etc?

I also like the idea of adding statistics - it could seen as adding data quality metrics! But this wouldn't come from free as it involve querying the written data to get those stats. Let's discuss about this step, as we'd need to define the metrics we'd want to store and check.

Iceberg already writes some statistics to the summary field of a snapshot. They are not part of the spec (the summary field is meant as a generic key-value map), but the Spark implementation definitely writes the following:

xcollazo@stat1007:~/tmp$ hdfs dfs -cat /wmf/data/wmf_traffic/referrer/daily/metadata/00029-807dbf53-8f07-479e-bda8-94b63092d6a4.metadata.json | grep summary -B 4 -A 11 | tail -n 33
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
  }, {
    "snapshot-id" : 3469897287412533829,
    "parent-snapshot-id" : 4132422446651522181,
    "timestamp-ms" : 1686618082805,
    "summary" : {
      "operation" : "overwrite",
      "spark.app.id" : "application_1686244344351_20702",
      "changed-partition-count" : "0",
      "total-records" : "7245325",
      "total-files-size" : "29426584",
      "total-data-files" : "40",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "/wmf/data/wmf_traffic/referrer/daily/metadata/snap-3469897287412533829-1-5c0fcaaf-ff78-4e71-a611-cfdaec589216.avro",
--
  }, {
    "snapshot-id" : 4936701216864538834,
    "parent-snapshot-id" : 3469897287412533829,
    "timestamp-ms" : 1686618258590,
    "summary" : {
      "operation" : "append",
      "spark.app.id" : "application_1686244344351_20702",
      "added-data-files" : "1",
      "added-records" : "9225",
      "added-files-size" : "59550",
      "changed-partition-count" : "1",
      "total-records" : "7254550",
      "total-files-size" : "29486134",
      "total-data-files" : "41",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",

As you can see, the statistics depend on the operation type.

Now, we could copy these into the proposed ingestions table, but we could also wait until we pickup a newer Spark (via T338057) and then we will be able to access these statistics with a very cool:

SELECT wmf_traffic.referrer_daily.snapshots;

SELECT wmf_traffic.referrer_daily.snapshots;

Wow that is cool!