In T209453#10632894, @xcollazo wrote:But, IIUC, there is no SQL syntax for adding columns to struct types inside of arrays or map values.
Iceberg does support it:
spark.sql(""" SELECT * FROM xcollazo.mediawiki_content_history_v1 """).printSchema() root |-- page_id: long (nullable = true) ... |-- revision_content_slots: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- content_body: string (nullable = true) | | |-- content_format: string (nullable = true) | | |-- content_model: string (nullable = true) | | |-- content_sha1: string (nullable = true) | | |-- content_size: long (nullable = true) |-- revision_content_is_visible: boolean (nullable = true) |-- wiki_id: string (nullable = true) ... spark.sql(""" -- add a field to the struct within an array. Using keyword 'value' to access the array's element column. ALTER TABLE xcollazo.mediawiki_content_history_v1 ADD COLUMN revision_content_slots.value.origin_rev_id bigint; """).show() spark.sql(""" SELECT * FROM xcollazo.mediawiki_content_history_v1 """).printSchema() root |-- page_id: long (nullable = true) ... |-- revision_content_slots: map (nullable = true) | |-- key: string | |-- value: struct (valueContainsNull = true) | | |-- content_body: string (nullable = true) | | |-- content_format: string (nullable = true) | | |-- content_model: string (nullable = true) | | |-- content_sha1: string (nullable = true) | | |-- content_size: long (nullable = true) | | |-- origin_rev_id: long (nullable = true) |-- revision_content_is_visible: boolean (nullable = true) |-- wiki_id: string (nullable = true) ... spark.sql(""" SELECT revision_content_slots['main'].content_size, revision_content_slots['main'].origin_rev_id FROM xcollazo.mediawiki_content_history_v1 LIMIT 5 """).show() [Stage 13:=====================================================>(190 + 1) / 191] +-----------------------------------------+------------------------------------------+ |revision_content_slots[main].content_size|revision_content_slots[main].origin_rev_id| +-----------------------------------------+------------------------------------------+ | 3867| null| | 18782| null| | 11710| null| | 2655| null| | 2825| null| +-----------------------------------------+------------------------------------------+However, on Iceberg 1.6.1, I did find a bug in which, after adding that field, if I select it exclusively, we get a stack:
spark.sql(""" SELECT * FROM xcollazo.mediawiki_content_history_v1 """).printSchema() 25/03/13 13:44:59 WARN TaskSetManager: Lost task 19.0 in stage 16.0 (TID 824) (an-worker1119.eqiad.wmnet executor 46): java.lang.IllegalArgumentException: [revision_content_slots, key_value, key] required binary key (STRING) = 22 is not in the store: [] 276000 at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ColumnChunkPageReadStore.getPageReader(ColumnChunkPageReadStore.java:288) at org.apache.iceberg.parquet.ParquetValueReaders$PrimitiveReader.setPageSource(ParquetValueReaders.java:224) at org.apache.iceberg.parquet.ParquetValueReaders$RepeatedKeyValueReader.setPageSource(ParquetValueReaders.java:572) at org.apache.iceberg.parquet.ParquetValueReaders$OptionReader.setPageSource(ParquetValueReaders.java:408) at org.apache.iceberg.parquet.ParquetValueReaders$StructReader.setPageSource(ParquetValueReaders.java:724) at org.apache.iceberg.parquet.ParquetReader$FileIterator.advance(ParquetReader.java:156) at org.apache.iceberg.parquet.ParquetReader$FileIterator.next(ParquetReader.java:126) at org.apache.iceberg.io.FilterIterator.advance(FilterIterator.java:65) at org.apache.iceberg.io.FilterIterator.hasNext(FilterIterator.java:49) at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:135) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:119) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:156) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) at scala.Option.exists(Option.scala:376) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)I'll investigate this issue separately, but it looks like a repro of an old one: https://github.com/apache/iceberg/issues/2962.
Description
Description
Details
Details
Related Changes in Gerrit:
Event Timeline
Comment Actions
Here is a test environment to replicate the problem: https://gitlab.wikimedia.org/aqu/iceberg_nested_map_test
I tested different versions. Even the freshest one (Spark 3.5, Iceberg 1.9, Parquet 1.13) is still buggy.
I've toyed with options, and dived into the Iceberg code without success.
I've presented the problem on Iceberg Slack to get a clue for a fix or a confirmation of the behavior.
Comment Actions
I asked questions to the Iceberg community:
- Mailing list: https://lists.apache.org/thread/z6pkxqh4qbymlxo2mqjrz3wzn451p2q8
- Slack: https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1757315682467989
- Github: https://github.com/apache/iceberg/issues/14043
In the meantime, we identified 2 options:
- adding the new field in root and keeping this issue as tech debt.
- creating a copycat table, adding the field to it, inserting from the old table, switching name between tables. The temporary duplication of this 42TB table would eat some of our margin of HDFS free space (~1PB). The fill-in may take something like a day depending of the allowed resources.
Comment Actions
Change #1191982 had a related patch set uploaded (by Aqu; author: Aqu):
[analytics/refinery/source@master] WIP: Script to manually add a map-column to mediawiki_content_history_v1
Comment Actions
@Antoine_Quhen @xcollazo If we are about to do a big migration (WIP: Script to manually add a map-column to mediawiki_content_history_v1 (1191982)) to add origin_rev_id, would it be worth adding a user_central_id field while we do it? E.g. T365648: Add user_central_id to mediawiki_history and mediawiki_history_reduced Hive tables, but for mediawiki_content_history?
Maybe that is scope creep, but this would be a very useful peace of data to have!
(Or, I suppose, perhaps a 'migration' is not needed, since e.g. user_central_id wouldn't be in a map field?)
Comment Actions
Some notes from a meeting with @Antoine_Quhen:
We confirmed that an ALTER TABLE x RENAME TO y works for iceberg tables. This seems to be a Hive Metastore metadata transaction, and doesn't affect any underlying Iceberg content.
We figured we need to follow this plan:
- Antoine to continue testing a script that will copy data wiki by wiki, while adding the new origin_rev_id column. The script will copy revision_id on the new origin_rev_id column. This will be correct data for all wikis except commonswiki
- Xabriel to start modifying reconcile script to take into account origin_rev_id. We expect to do a future run in which commonswiki will have to reconcile all the wrong origin_rev_id, but all other wikis should be fine.
- Xabriel to also start modyfing ingestion scripts to account for origin_rev_id. We will not merge this yet but want to have it ready.
- Once we are happy with testing from (1), we will launch the job to start writing to wmf_content.mediawiki_content_v2.
- After (4) is done, we will do spot checks and count checks on the data.
- If we are happy, then we rename mediawiki_content_v1 to mediawiki_content_v1_backup, and mediawiki_content_v2 to mediawiki_content_v1.
- Deploy changes from (2) and (3).
- Observer behavior.
Considering the plan above is specifically related to wmf_content.mediawiki_content_v1 and we are not changing Iceberg, I will open a separate ticket for this work.
Comment Actions
Adding user_central_id will require a different strategy. For origin_rev_id the vast majority of data is just copying revision_id. But for user_central_id we need to figure out a source for the data that hopefully doesn't require reconciling all rows (which would be really, really expensive).
Thus I'd rather tackle this separately. As you mention though, adding user_central_id, since it will be on the root of the table, is indeed supported by Iceberg.
Comment Actions
This is what I did to run the table copy after copying the table with:
https://gitlab.wikimedia.org/repos/data-engineering/dumps/mediawiki-content-dump/-/blob/main/hql/create-wmf_content_mediawiki_content_history_v1.hql?ref_type=heads
And compiling the the jar with the job I'm pasting next.
SPARK_CONF_DIR=/etc/spark3/conf \ SPARK_HOME=venv/lib/python3.10/site-packages/pyspark \ spark3-submit \ --driver-cores 8 \ --driver-memory 32G \ --executor-cores 1 \ --executor-memory 16g \ --conf spark.executor.memoryOverhead=3GB \ --conf spark.dynamicAllocation.maxExecutors=160 \ --conf spark.executorEnv.SPARK_HOME=venv/lib/python3.10/site-packages/pyspark \ --conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf \ --master yarn \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.shuffle.service.enabled=true \ --conf spark.yarn.maxAppAttempts=1 \ --conf spark.shuffle.service.name=spark_shuffle_3_3 \ --conf spark.shuffle.service.port=7338 \ --conf spark.sql.shuffle.partitions=10240 \ --conf write.spark.accept-any-schema=true \ --conf spark.yarn.archive=hdfs:///user/spark/share/lib/spark-3.3.2-assembly.zip \ --conf spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.6.1 \ --conf spark.jars.ivySettings=/etc/maven/ivysettings.xml \ --conf 'spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp/table_maintenance_iceberg_monthly/ivy_spark3/cache -Divy.home=/tmp/table_maintenance_iceberg_monthly/ivy_spark3/home -Dlog4j.configuration=file:quieter_spark_log4j.properties' \ --conf spark.sql.autoBroadcastJoinThreshold=-1 \ --conf spark.sql.iceberg.locality.enabled=false \ --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 \ --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf \ --conf spark.yarn.appMasterEnv.SPARK_HOME=venv/lib/python3.10/site-packages/pyspark \ --archives 'hdfs:///wmf/cache/artifacts/airflow/analytics/spark-3.3.2-1.0.0.conda.tgz#venv' \ --name cp_mediawiki_content_history_v2 \ --deploy-mode cluster \ --class org.wikimedia.analytics.refinery.job.MediawikiContentHistoryMigration \ ~/analytics_refinery_source/refinery-job/target/refinery-job-0.3.3-SNAPSHOT-shaded.jar \ --source-table wmf_content.mediawiki_content_history_v1 \ --target-table aqu.mediawiki_content_history_v1
package org.wikimedia.analytics.refinery.job import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.{col, struct} import org.wikimedia.analytics.refinery.tools.LogHelper import scopt.OptionParser object MediawikiContentHistoryMigration extends LogHelper { case class Config( sourceTable: String = "wmf_content.mediawiki_content_history_v1", targetTable: String = "aqu.mediawiki_content_history_v1", dryRun: Boolean = false, appName: String = "MediawikiContentHistoryMigration" ) val argsParser = new OptionParser[Config]("MediawikiContentHistoryMigration") { head("MediawikiContentHistoryMigration", "1.0") note("Migrate mediawiki_content_history_v1 table to add origin_rev_id field") opt[String]("source-table") .required() .action((x, c) => c.copy(sourceTable = x)) .text("Source table name (default: wmf_content.mediawiki_content_history_v1)") opt[String]("target-table") .required() .action((x, c) => c.copy(targetTable = x)) .text("Target table name (default: wmf_content.mediawiki_content_history_v1_tmp)") opt[Unit]("dry-run") .action((_, c) => c.copy(dryRun = true)) .text("Dry run mode - only show what would be done") opt[String]("app-name") .action((x, c) => c.copy(appName = x)) .text("Application name for Spark job (default: MediawikiContentHistoryMigration)") help("help").text("Print this usage text") } def main(args: Array[String]): Unit = { argsParser.parse(args, Config()) match { case Some(config) => val spark = SparkSession.builder() .appName(config.appName) .enableHiveSupport() .getOrCreate() try { runMigration(spark, config) } finally { spark.stop() } case None => sys.exit(1) } } def runMigration(spark: SparkSession, config: Config): Unit = { log.info(s"Starting migration from ${config.sourceTable} to ${config.targetTable}") log.info(s"Dry run: ${config.dryRun}") if (config.dryRun) { log.info("DRY RUN: Would migrate data using 4-batch strategy") return } // Batch 1: All wikis except the big 3 log.info("Processing Batch 1: All wikis except enwiki, wikidatawiki, commonswiki") migrateWikiBatch(spark, config, notWiki = List("enwiki", "wikidatawiki", "commonswiki")) // Batch 2: enwiki log.info("Processing Batch 2: enwiki") migrateWikiBatch(spark, config, wiki = "enwiki") // Batch 3: wikidatawiki log.info("Processing Batch 3: wikidatawiki") migrateWikiBatch(spark, config, wiki = "wikidatawiki") // Batch 4: commonswiki log.info("Processing Batch 4: commonswiki") migrateWikiBatch(spark, config, wiki = "commonswiki") // Verification: Compare source and target table stats log.info("Verifying migration by comparing table statistics...") verifyMigration(spark, config) log.info("Migration completed successfully!") } def getWikiStats(spark: SparkSession, sourceTable: String): Array[(String, Long)] = { log.info("Getting wiki statistics...") val wikiStatsDf = spark.sql(s"""SELECT wiki_id, COUNT(*) as row_count FROM $sourceTable GROUP BY wiki_id ORDER BY row_count DESC""") wikiStatsDf.collect().map(row => (row.getString(0), row.getLong(1))) } def migrateWikiBatch(spark: SparkSession, config: Config, wiki: String = null, notWiki: List[String] = List("enwiki", "wikidatawiki", "commonswiki")): Unit = { val sourceDf = if (wiki != null) { // Process specific wiki log.info(s"Processing single wiki: $wiki") spark.table(config.sourceTable).filter(col("wiki_id") === wiki) } else { // Process all wikis except the ones in notWiki list log.info(s"Processing all wikis except: ${notWiki.mkString(", ")}") spark.table(config.sourceTable).filter(!col("wiki_id").isin(notWiki: _*)) } val migratedDf = addOriginRevIdToContentSlots(sourceDf) migratedDf.writeTo(config.targetTable).append() log.info(s"Completed batch migration") } def verifyMigration(spark: SparkSession, config: Config): Unit = { val sourceStats = getWikiStats(spark, config.sourceTable) val targetStats = getWikiStats(spark, config.targetTable) val sourceMap = sourceStats.toMap val targetMap = targetStats.toMap log.info("=== MIGRATION VERIFICATION ===") log.info(s"Source table: ${sourceStats.length} wikis, ${sourceStats.map(_._2).sum} total rows") log.info(s"Target table: ${targetStats.length} wikis, ${targetStats.map(_._2).sum} total rows") // Check for differences val allWikiIds = (sourceMap.keys ++ targetMap.keys).toSet var differences = 0 allWikiIds.foreach { wikiId => val sourceCount = sourceMap.getOrElse(wikiId, 0L) val targetCount = targetMap.getOrElse(wikiId, 0L) if (sourceCount != targetCount) { log.warn(s"DIFFERENCE: Wiki $wikiId - Source: $sourceCount, Target: $targetCount, Diff: ${targetCount - sourceCount}") differences += 1 } } if (differences == 0) { log.info("✓ All wiki row counts match between source and target tables") } else { log.warn(s"⚠ Found $differences wikis with row count differences") } log.info("=== END VERIFICATION ===") } def addOriginRevIdToContentSlots(df: DataFrame): DataFrame = { df.select( df.schema.fields.map { field => if (field.name == "revision_content_slots") { transformContentSlotsMap(col(field.name), col("revision_id")).as("revision_content_slots") } else { col(field.name) } }: _* ) } def transformContentSlotsMap(contentSlotsCol: org.apache.spark.sql.Column, revisionIdCol: org.apache.spark.sql.Column): org.apache.spark.sql.Column = { org.apache.spark.sql.functions.transform_values( contentSlotsCol, (key: org.apache.spark.sql.Column, valueStruct: org.apache.spark.sql.Column) => { struct( valueStruct.getField("content_body").as("content_body"), valueStruct.getField("content_format").as("content_format"), valueStruct.getField("content_model").as("content_model"), valueStruct.getField("content_sha1").as("content_sha1"), valueStruct.getField("content_size").as("content_size"), revisionIdCol.as("origin_rev_id") ) } ) } }
Comment Actions
Change #1195258 had a related patch set uploaded (by Aqu; author: Aqu):
[analytics/refinery/source@master] Add job to duplicate mediawiki_content_history_table
Comment Actions
The tables have been migrated with a copy+rename strategy and the new column is present in the mw_content_history_v1.
The migration script is kept for reference here: 1195258 event if we don't merge it.
Here is the incantation to launch the a spark3-sql console with Iceberg:
https://phabricator.wikimedia.org/P83757
Very useful for testing.