Page MenuHomePhabricator

Refine: Use Spark SQL instead of Hive JDBC
Open, MediumPublic8 Estimated Story Points

Description

We workaround a limitation in Spark that doesn't allow ALTER TABLE CHANGE COLUMN statements by issuing the DDL through a manually created Hive JDBC connection, instead of through spark.sql(). This recently caused a bug: T209407: EventLogging Hive Refine broken after upgrade to CDH 5.15.0. We should fix this by submitting a fix for https://issues.apache.org/jira/browse/SPARK-23890 in Apache Spark upstream.

WIP Pull Request here: https://github.com/apache/spark/pull/21012

If we do this, I believe that event schema column descriptions/comments will finally make their way into the Hive tables, and then up into DataHub.

Related Objects

Event Timeline

fdans triaged this task as Medium priority.Nov 15 2018, 6:06 PM
fdans moved this task from Incoming to Smart Tools for Better Data on the Analytics board.
Ottomata set the point value for this task to 8.

Yesterday I updated my pull request and pinged more Apache Spark folks review. I was told that they were in the process of completely refactoring the SQL catalog code, and the refactor will include support for this. From what I can tell, this refactor is targeting Spark 3.0.0, so we will have to keep waiting for a while longer.

This task can remain open until we are able to change Refine code to alter tables Spark directly.

This was supposed to be fixed in Spark 3 with the new v2 datasource. I just tried to add a field to a nested column on an Iceberg table via spark3-sql CLI:

CREATE TABLE otto.test_table03 (
    `s1` STRUCT<`s1_f1`: STRING>
)
USING ICEBERG;

ALTER TABLE otto.test_table03 CHANGE COLUMN `s1` `s1` STRUCT<`s1_f1`: STRING, `s1_added_field0`: STRING>;
Error in query: Cannot update spark_catalog.otto.test_table03 field s1 type: update a struct by updating its fields; line 1 pos 0;
AlterTable org.apache.iceberg.spark.SparkSessionCatalog@1af78e37, otto.test_table03, RelationV2[s1#78] spark_catalog.otto.test_table03, [org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnType@346f80ce]

I posted on my old Spark GitHub PR to see if we could reopen the issue:
https://github.com/apache/spark/pull/21012#issuecomment-1856336189

I've also reopened the Jira:
https://issues.apache.org/jira/browse/SPARK-23890

A few more investigations:

@JAllemandou suggested that we might be able to use the v2 alter table syntax on our existent hive parquet tables. This does not work (at least with our current spark version).

spark-sql (default)> ALTER TABLE otto.eventerror ADD COLUMN event.added_field0 STRING;
Error in query: AlterTableAddColumnsCommand does not support nested column: event.added_field0

That is a very explicit check about using ADD COLUMN with nested support using v1 tables. I don't know how spark determines whether a table is v1 or v2.

I just used spark sql to create a table with the datasource v2 syntax in two different ways, and got the same error:

-- using 'hive'
CREATE TABLE otto.test_table04 (
  `s1` STRUCT<`s1_f1`: STRING COMMENT 's1_f1 is inside of struct s1'>
)
USING hive OPTIONS(fileFormat 'parquet') 
PARTITIONED BY (
  `f1` STRING
);
ALTER TABLE otto.test_table04 ADD COLUMN s1.added_field0 STRING;

-- using 'parquet'
CREATE TABLE otto.test_table05 (
  `s1` STRUCT<`s1_f1`: STRING COMMENT 's1_f1 is inside of struct s1'>
)
USING parquet
PARTITIONED BY (
  `f1` STRING
);
ALTER TABLE otto.test_table05 ADD COLUMN s1.added_field0 STRING;

These both fail with AlterTableAddColumnsCommand does not support nested column.

However, USING iceberg works just fine:

CREATE TABLE otto.test_table06 (
  `s1` STRUCT<`s1_f1`: STRING COMMENT 's1_f1 is inside of struct s1'>
)
USING iceberg
PARTITIONED BY (
  `f1` STRING
);
ALTER TABLE otto.test_table06 ADD COLUMN s1.added_field0 STRING;
spark-sql (default)> describe otto.test_table06;
col_name	data_type	comment
s1	struct<s1_f1:string,added_field0:string>
f1	string

I'm curious as to how Spark knows the different between a v2 and a v1 table. I would have though that USING parquet would be a v2 table, but clearly not? I'm sure there is something magical here about Spark 'catalog's and inferring the datasource version to use. From trying to read the code, it looks like a lot of this has changed between spark 3.1.2 and spark 3.5, so it might be worth investigating on newer versions.

Anyway, going to park this for now and work on other things.

Was able to try this in Spark 3.5 via @xcollazo's instructions

USING hive:

scala> spark.sql("ALTER TABLE otto.test_table04 ADD COLUMN s1.added_field0 STRING")
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table `spark_catalog`.`otto`.`test_table04` does not support ADD COLUMN with qualified column. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog".

USING parquet:

scala> spark.sql("ALTER TABLE otto.test_table05 ADD COLUMN s1.added_field0 STRING")
org.apache.spark.sql.AnalysisException: [UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table `spark_catalog`.`otto`.`test_table05` does not support ADD COLUMN with qualified column. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog".

USING iceberg works as before.

So, the error handling looks like it has been refactored in Spark 3.5, but the behavior is the same.

I made some progress modifying Spark to make it support adding nested column. I'll stop here and wait for feedback from upstream before I clean it up and try a little harder.

However, I'm not sure that this fix will be sufficient. We support e.g. map fields with structs as values, and arrays with structs as element types. Even if we make ADD COLUMN able to add fields in structs, I don't see how we could use it to add fields in structs inside of arrays and maps.

(Came here to test this on Iceberg, but I can see from T209453#9407647 and T209453#9409199 that this test has been done, and the conclusion is that there is no issue with Iceberg.)

But, IIUC, there is no SQL syntax for adding columns to struct types inside of arrays or map values.

Iceberg does support it:

spark.sql("""
SELECT *
FROM xcollazo.mediawiki_content_history_v1
""").printSchema()
root
 |-- page_id: long (nullable = true)
...
 |-- revision_content_slots: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- content_body: string (nullable = true)
 |    |    |-- content_format: string (nullable = true)
 |    |    |-- content_model: string (nullable = true)
 |    |    |-- content_sha1: string (nullable = true)
 |    |    |-- content_size: long (nullable = true)
 |-- revision_content_is_visible: boolean (nullable = true)
 |-- wiki_id: string (nullable = true)
...

spark.sql("""
-- add a field to the struct within an array. Using keyword 'value' to access the array's element column.
ALTER TABLE xcollazo.mediawiki_content_history_v1
ADD COLUMN revision_content_slots.value.origin_rev_id bigint;
""").show()

spark.sql("""
SELECT *
FROM xcollazo.mediawiki_content_history_v1
""").printSchema()
root
 |-- page_id: long (nullable = true)
...
 |-- revision_content_slots: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- content_body: string (nullable = true)
 |    |    |-- content_format: string (nullable = true)
 |    |    |-- content_model: string (nullable = true)
 |    |    |-- content_sha1: string (nullable = true)
 |    |    |-- content_size: long (nullable = true)
 |    |    |-- origin_rev_id: long (nullable = true)
 |-- revision_content_is_visible: boolean (nullable = true)
 |-- wiki_id: string (nullable = true)
...


spark.sql("""
SELECT revision_content_slots['main'].content_size, revision_content_slots['main'].origin_rev_id
FROM xcollazo.mediawiki_content_history_v1
LIMIT 5
""").show()
[Stage 13:=====================================================>(190 + 1) / 191]
+-----------------------------------------+------------------------------------------+
|revision_content_slots[main].content_size|revision_content_slots[main].origin_rev_id|
+-----------------------------------------+------------------------------------------+
|                                     3867|                                      null|
|                                    18782|                                      null|
|                                    11710|                                      null|
|                                     2655|                                      null|
|                                     2825|                                      null|
+-----------------------------------------+------------------------------------------+

However, on Iceberg 1.6.1, I did find a bug in which, after adding that field, if I select it exclusively, we get a stack:

spark.sql("""
SELECT *
FROM xcollazo.mediawiki_content_history_v1
""").printSchema()

25/03/13 13:44:59 WARN TaskSetManager: Lost task 19.0 in stage 16.0 (TID 824) (an-worker1119.eqiad.wmnet executor 46): java.lang.IllegalArgumentException: [revision_content_slots, key_value, key] required binary key (STRING) = 22 is not in the store: [] 276000
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ColumnChunkPageReadStore.getPageReader(ColumnChunkPageReadStore.java:288)
	at org.apache.iceberg.parquet.ParquetValueReaders$PrimitiveReader.setPageSource(ParquetValueReaders.java:224)
	at org.apache.iceberg.parquet.ParquetValueReaders$RepeatedKeyValueReader.setPageSource(ParquetValueReaders.java:572)
	at org.apache.iceberg.parquet.ParquetValueReaders$OptionReader.setPageSource(ParquetValueReaders.java:408)
	at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.setPageSource(ParquetValueReaders.java:724)
	at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:156)
	at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:126)
	at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65)
	at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:49)
	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:135)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

I'll investigate this issue separately, but it looks like a repro of an old one: https://github.com/apache/iceberg/issues/2962.

ADD COLUMN revision_content_slots.value.origin_rev_id bigint;

\( ゚ヮ゚)/

Where did you find that syntax?!?!

Does it work for arrays too, not just maps?

Where did you find that syntax?!?!

https://iceberg.apache.org/docs/nightly/spark-ddl/#alter-table-add-column

Does it work for arrays too, not just maps?

Yes, but similar bug as before when SELECTed exclusively:

spark.sql("""
CREATE TABLE xcollazo.test_add_column_to_array (
  a array<struct<one: string, two: bigint>>
)
USING ICEBERG
""").show()
++
||
++
++

spark.sql("""
INSERT INTO xcollazo.test_add_column_to_array
VALUES array(struct('1', 1)), array(struct('2', 2))
""").show()                                                                 
++
||
++
++

spark.sql("""
SELECT * FROM xcollazo.test_add_column_to_array
""").show()
+--------+
|       a|
+--------+
|[{1, 1}]|
|[{2, 2}]|
+--------+

                                                                                
spark.sql("""
ALTER TABLE xcollazo.test_add_column_to_array
ADD COLUMN a.element.three string
""").show()
++
||
++
++

spark.sql("""
SELECT * FROM xcollazo.test_add_column_to_array
""").show()
+--------------+
|             a|
+--------------+
|[{1, 1, null}]|
|[{2, 2, null}]|
+--------------+

                                                                                
spark.sql("""
SELECT a.two, a.three FROM xcollazo.test_add_column_to_array
""").show()
+---+------+
|two| three|
+---+------+
|[1]|[null]|
|[2]|[null]|
+---+------+

spark.sql("""
SELECT a.three, a.three IS NOT NULL FROM xcollazo.test_add_column_to_array
""").show()
+-----+---------------------+
|three|(a.three IS NOT NULL)|
+-----+---------------------+
|   []|                 true|                        <<<< BUG
|   []|                 true|
+-----+---------------------+

spark.sql("""
INSERT INTO xcollazo.test_add_column_to_array
VALUES array(struct('3', 3, '3'))
""").show()
++
||
++
++

spark.sql("""
SELECT * FROM xcollazo.test_add_column_to_array
""").show()
+--------------+
|             a|
+--------------+
|   [{3, 3, 3}]|
|[{1, 1, null}]|
|[{2, 2, null}]|
+--------------+

spark.sql("""
SELECT a.three FROM xcollazo.test_add_column_to_array
""").show()
+-----+
|three|
+-----+
|  [3]|
|   []|                        <<<< BUG
|   []|
+-----+

Amazing. So we just need to get that bug fixed, convert everything to Iceberg, and then we can stop using JDBC! ;)

BTW, I updated the upstream Spark GitLab MR and the Jira.