This is about checking that hudi 'works' for the use case, and try to get feelings of performance, using existing data.
Description
Description
Status | Subtype | Assigned | Task | ||
---|---|---|---|---|---|
Declined | None | T258511 Data Lake incremental Data Updates | |||
Open | None | T231938 Get "edits hourly" on a daily basis | |||
Resolved | Milimetric | T258532 [SPIKE] Prototype of incremental updates for mediawiki history for simplewiki , including reverts using apache hudi | |||
Resolved | JAllemandou | T262256 Test hudi and Iceberg as an incremental update system using 2 mediawiki-history snapshots |
Event Timeline
Comment Actions
Learnt stuff:
- Hudi needs a primary-key. We use a hash of values as a compound key. See code below about the implementation.
- one month of changes impacts almost all files (true for simple-wiki and for all-wikis). Need to check with smaller time-ranges to see real use-cases scenario, but in teerm of data-size the gain is none between snapshot and incremental. There could be a way to sort data in files helping in that regard, optimizations for later.
- Even for small data (1 month of updates for simplewiki only), the computation-time is long (~10 minutes) (smaller for insert only).
CODE
- Download hudi bundle jars:
wget https://repository.apache.org/service/local/repositories/releases/content/org/apache/hudi/hudi-spark-bundle_2.11/0.6.0/hudi-spark-bundle_2.11-0.6.0.jar wget https://repository.apache.org/service/local/repositories/releases/content/org/apache/hudi/hudi-hadoop-mr-bundle/0.6.0/hudi-hadoop-mr-bundle-0.6.0.jar wget https://repository.apache.org/service/local/repositories/releases/content/org/apache/hudi/hudi-presto-bundle/0.6.0/hudi-presto-bundle-0.6.0.jar
- Start spark with needed jars
spark2-shell \ --master yarn \ --executor-memory 16G \ --driver-memory 8G \ --executor-cores 2 \ --conf spark.executor.memoryOverhead=2048 \ --conf spark.dynamicAllocation.maxExecutors=46 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar,/home/joal/hudi-spark-bundle_2.11-0.6.0.jar
- Checking primary-key
spark.sql("SET spark.sql.shuffle.partitions = 2048") spark.sql(""" SELECT wiki_db, COUNT(1) as nb_no_single FROM ( select wiki_db, event_entity, event_type, event_timestamp, event_user_id, event_user_text_historical, page_id, page_artificial_id, revision_id, user_id, COUNT(1) as c from wmf.mediawiki_history where snapshot='2020-07' group by wiki_db, event_entity, event_type, event_timestamp, event_user_id, event_user_text_historical, page_id, page_artificial_id, revision_id, user_id having count(1) > 1 ) t GROUP BY wiki_db ORDER BY nb_no_single desc LIMIT 100 """).show(100, false) +---------------+------------+ |wiki_db |nb_no_single| +---------------+------------+ |enwiki |26308 | |idwiki |234 | |commonswiki |111 | |enwiktionary |77 | |ruwiki |74 | |fawiki |56 | |chrwiktionary |35 | |zhwiki |35 | |kywiktionary |28 | |tpiwiktionary |26 | |thwikisource |24 | |eswiki |20 | |itwiki |18 | |dewiki |15 | |miwiktionary |14 | |arwiki |13 | |kuwikiquote |12 | |svwikiversity |12 | |mediawikiwiki |11 | |ptwiki |10 | |testwiki |10 | |bugwiki |9 | |gawiktionary |9 | |shwiki |8 | |glwiki |8 | --> Some duplication, but very small in comparison to the number of rows - To be checked later.
- Data preparation for all events of snapshots 2020-06 and 2020-07, both all-wikis and simplewiki only - Write to avro with primary-key
import org.apache.spark.sql.SaveMode -- Snapshot 2020-06 val df6 = spark.sql(""" SELECT md5(concat( wiki_db, '|', event_entity, '|', event_type, '|', CAST(unix_timestamp(event_timestamp) AS STRING), '|', coalesce(event_user_id, 0), '|', coalesce(event_user_text_historical, ''), '|', coalesce(page_id, 0), '|', coalesce(page_artificial_id, ''), '|', coalesce(revision_id, 0), '|', coalesce(user_id, 0) )) as row_key_hash, * from wmf.mediawiki_history where snapshot = '2020-06' and event_timestamp IS NOT NULL """) val baseAvroPath6 = "hdfs:///user/joal/test_hudi/mwh_avro_6" df6.repartition(2048).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath6) val df6r = spark.read.format("avro").load(baseAvroPath6) val baseAvroPath6Simplewiki = "hdfs:///user/joal/test_hudi/mwh_avro_6_simplewiki" df6r.where("wiki_db = 'simplewiki'").repartition(128).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath6Simplewiki) val df6rs = spark.read.format("avro").load(baseAvroPath6Simplewiki) -- Snapshot 2020-07 val df7 = spark.sql(""" SELECT md5(concat( wiki_db, '|', event_entity, '|', event_type, '|', CAST(unix_timestamp(event_timestamp) AS STRING), '|', coalesce(event_user_id, 0), '|', coalesce(event_user_text_historical, ''), '|', coalesce(page_id, 0), '|', coalesce(page_artificial_id, ''), '|', coalesce(revision_id, 0), '|', coalesce(user_id, 0) )) as row_key_hash, * from wmf.mediawiki_history where snapshot = '2020-07' and event_timestamp IS NOT NULL """) val baseAvroPath7 = "hdfs:///user/joal/test_hudi/mwh_avro_7" df7.repartition(2048).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7) val df7r = spark.read.format("avro").load(baseAvroPath7) val baseAvroPath7Simplewiki = "hdfs:///user/joal/test_hudi/mwh_avro_7_simplewiki" df7r.where("wiki_db = 'simplewiki'").repartition(128).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7Simplewiki) val df7rs = spark.read.format("avro").load(baseAvroPath7Simplewiki)
- Data preparation of insert-only events between snapshots 2020-06 and 2020-07, both all-wikis and simplewiki only - Write to avro with primary-key
// Insert only between 2020-06 and 2020-07 val newIndf7 = df7r.join(df6r, df7r("row_key_hash") === df6r("row_key_hash"), "leftanti") val baseAvroPath7New = "hdfs:///user/joal/test_hudi/mwh_avro_7_new" newIndf7.repartition(512).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7New) val newIndf7r = spark.read.format("avro").load(baseAvroPath7New) val newIndf7s = df7rs.join(df6rs, df7rs("row_key_hash") === df6rs("row_key_hash"), "leftanti") val baseAvroPath7SimplewikiNew = "hdfs:///user/joal/test_hudi/mwh_avro_7_simplewiki_new" newIndf7s.repartition(16).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7SimplewikiNew) val newIndf7rs = spark.read.format("avro").load(baseAvroPath7SimplewikiNew) // ---> 1.5% of df7rs total rows // Insert + update between 2020-06 and 2020-07 val upsertTodf7 = df7r.join(df6r, ( df7r("row_key_hash") <=> df6r("row_key_hash") && df7r("event_comment") <=> df6r("event_comment") && df7r("event_user_text") <=> df6r("event_user_text") && df7r("event_user_blocks_historical") <=> df6r("event_user_blocks_historical") && df7r("event_user_blocks") <=> df6r("event_user_blocks") && df7r("event_user_groups_historical") <=> df6r("event_user_groups_historical") && df7r("event_user_groups") <=> df6r("event_user_groups") && df7r("event_user_is_bot_by_historical") <=> df6r("event_user_is_bot_by_historical") && df7r("event_user_is_bot_by") <=> df6r("event_user_is_bot_by") && df7r("event_user_is_created_by_self") <=> df6r("event_user_is_created_by_self") && df7r("event_user_is_created_by_system") <=> df6r("event_user_is_created_by_system") && df7r("event_user_is_created_by_peer") <=> df6r("event_user_is_created_by_peer") && df7r("event_user_is_anonymous") <=> df6r("event_user_is_anonymous") && df7r("event_user_registration_timestamp") <=> df6r("event_user_registration_timestamp") && df7r("event_user_creation_timestamp") <=> df6r("event_user_creation_timestamp") && df7r("event_user_first_edit_timestamp") <=> df6r("event_user_first_edit_timestamp") && df7r("event_user_revision_count") <=> df6r("event_user_revision_count") && df7r("event_user_seconds_since_previous_revision") <=> df6r("event_user_seconds_since_previous_revision") && df7r("page_title_historical") <=> df6r("page_title_historical") && df7r("page_title") <=> df6r("page_title") && df7r("page_namespace_historical") <=> df6r("page_namespace_historical") && df7r("page_namespace_is_content_historical") <=> df6r("page_namespace_is_content_historical") && df7r("page_namespace") <=> df6r("page_namespace") && df7r("page_namespace_is_content") <=> df6r("page_namespace_is_content") && df7r("page_is_redirect") <=> df6r("page_is_redirect") && df7r("page_is_deleted") <=> df6r("page_is_deleted") && df7r("page_creation_timestamp") <=> df6r("page_creation_timestamp") && df7r("page_first_edit_timestamp") <=> df6r("page_first_edit_timestamp") && df7r("page_revision_count") <=> df6r("page_revision_count") && df7r("page_seconds_since_previous_revision") <=> df6r("page_seconds_since_previous_revision") && df7r("user_text_historical") <=> df6r("user_text_historical") && df7r("user_text") <=> df6r("user_text") && df7r("user_blocks_historical") <=> df6r("user_blocks_historical") && df7r("user_blocks") <=> df6r("user_blocks") && df7r("user_groups_historical") <=> df6r("user_groups_historical") && df7r("user_groups") <=> df6r("user_groups") && df7r("user_is_bot_by_historical") <=> df6r("user_is_bot_by_historical") && df7r("user_is_bot_by") <=> df6r("user_is_bot_by") && df7r("user_is_created_by_self") <=> df6r("user_is_created_by_self") && df7r("user_is_created_by_system") <=> df6r("user_is_created_by_system") && df7r("user_is_created_by_peer") <=> df6r("user_is_created_by_peer") && df7r("user_is_anonymous") <=> df6r("user_is_anonymous") && df7r("user_registration_timestamp") <=> df6r("user_registration_timestamp") && df7r("user_creation_timestamp") <=> df6r("user_creation_timestamp") && df7r("user_first_edit_timestamp") <=> df6r("user_first_edit_timestamp") && df7r("revision_parent_id") <=> df6r("revision_parent_id") && df7r("revision_minor_edit") <=> df6r("revision_minor_edit") && df7r("revision_deleted_parts") <=> df6r("revision_deleted_parts") && df7r("revision_deleted_parts_are_suppressed") <=> df6r("revision_deleted_parts_are_suppressed") && df7r("revision_text_bytes") <=> df6r("revision_text_bytes") && df7r("revision_text_bytes_diff") <=> df6r("revision_text_bytes_diff") && df7r("revision_text_sha1") <=> df6r("revision_text_sha1") && df7r("revision_content_model") <=> df6r("revision_content_model") && df7r("revision_content_format") <=> df6r("revision_content_format") && df7r("revision_is_deleted_by_page_deletion") <=> df6r("revision_is_deleted_by_page_deletion") && df7r("revision_deleted_by_page_deletion_timestamp") <=> df6r("revision_deleted_by_page_deletion_timestamp") && df7r("revision_is_identity_reverted") <=> df6r("revision_is_identity_reverted") && df7r("revision_first_identity_reverting_revision_id") <=> df6r("revision_first_identity_reverting_revision_id") && df7r("revision_seconds_to_identity_revert") <=> df6r("revision_seconds_to_identity_revert") && df7r("revision_is_identity_revert") <=> df6r("revision_is_identity_revert") && df7r("revision_is_from_before_page_creation") <=> df6r("revision_is_from_before_page_creation") && df7r("revision_tags") <=> df6r("revision_tags")), "leftanti") val baseAvroPath7Upsert = "hdfs:///user/joal/test_hudi/mwh_avro_7_upsert" upsertTodf7.repartition(512).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7Upsert) val upsertTodf7r = spark.read.format("avro").load(baseAvroPath7Upsert) val upsertTodf7s = df7rs.join(df6rs,( df7rs("row_key_hash") <=> df6rs("row_key_hash") && df7rs("event_comment") <=> df6rs("event_comment") && df7rs("event_user_text") <=> df6rs("event_user_text") && df7rs("event_user_blocks_historical") <=> df6rs("event_user_blocks_historical") && df7rs("event_user_blocks") <=> df6rs("event_user_blocks") && df7rs("event_user_groups_historical") <=> df6rs("event_user_groups_historical") && df7rs("event_user_groups") <=> df6rs("event_user_groups") && df7rs("event_user_is_bot_by_historical") <=> df6rs("event_user_is_bot_by_historical") && df7rs("event_user_is_bot_by") <=> df6rs("event_user_is_bot_by") && df7rs("event_user_is_created_by_self") <=> df6rs("event_user_is_created_by_self") && df7rs("event_user_is_created_by_system") <=> df6rs("event_user_is_created_by_system") && df7rs("event_user_is_created_by_peer") <=> df6rs("event_user_is_created_by_peer") && df7rs("event_user_is_anonymous") <=> df6rs("event_user_is_anonymous") && df7rs("event_user_registration_timestamp") <=> df6rs("event_user_registration_timestamp") && df7rs("event_user_creation_timestamp") <=> df6rs("event_user_creation_timestamp") && df7rs("event_user_first_edit_timestamp") <=> df6rs("event_user_first_edit_timestamp") && df7rs("event_user_revision_count") <=> df6rs("event_user_revision_count") && df7rs("event_user_seconds_since_previous_revision") <=> df6rs("event_user_seconds_since_previous_revision") && df7rs("page_title_historical") <=> df6rs("page_title_historical") && df7rs("page_title") <=> df6rs("page_title") && df7rs("page_namespace_historical") <=> df6rs("page_namespace_historical") && df7rs("page_namespace_is_content_historical") <=> df6rs("page_namespace_is_content_historical") && df7rs("page_namespace") <=> df6rs("page_namespace") && df7rs("page_namespace_is_content") <=> df6rs("page_namespace_is_content") && df7rs("page_is_redirect") <=> df6rs("page_is_redirect") && df7rs("page_is_deleted") <=> df6rs("page_is_deleted") && df7rs("page_creation_timestamp") <=> df6rs("page_creation_timestamp") && df7rs("page_first_edit_timestamp") <=> df6rs("page_first_edit_timestamp") && df7rs("page_revision_count") <=> df6rs("page_revision_count") && df7rs("page_seconds_since_previous_revision") <=> df6rs("page_seconds_since_previous_revision") && df7rs("user_text_historical") <=> df6rs("user_text_historical") && df7rs("user_text") <=> df6rs("user_text") && df7rs("user_blocks_historical") <=> df6rs("user_blocks_historical") && df7rs("user_blocks") <=> df6rs("user_blocks") && df7rs("user_groups_historical") <=> df6rs("user_groups_historical") && df7rs("user_groups") <=> df6rs("user_groups") && df7rs("user_is_bot_by_historical") <=> df6rs("user_is_bot_by_historical") && df7rs("user_is_bot_by") <=> df6rs("user_is_bot_by") && df7rs("user_is_created_by_self") <=> df6rs("user_is_created_by_self") && df7rs("user_is_created_by_system") <=> df6rs("user_is_created_by_system") && df7rs("user_is_created_by_peer") <=> df6rs("user_is_created_by_peer") && df7rs("user_is_anonymous") <=> df6rs("user_is_anonymous") && df7rs("user_registration_timestamp") <=> df6rs("user_registration_timestamp") && df7rs("user_creation_timestamp") <=> df6rs("user_creation_timestamp") && df7rs("user_first_edit_timestamp") <=> df6rs("user_first_edit_timestamp") && df7rs("revision_parent_id") <=> df6rs("revision_parent_id") && df7rs("revision_minor_edit") <=> df6rs("revision_minor_edit") && df7rs("revision_deleted_parts") <=> df6rs("revision_deleted_parts") && df7rs("revision_deleted_parts_are_suppressed") <=> df6rs("revision_deleted_parts_are_suppressed") && df7rs("revision_text_bytes") <=> df6rs("revision_text_bytes") && df7rs("revision_text_bytes_diff") <=> df6rs("revision_text_bytes_diff") && df7rs("revision_text_sha1") <=> df6rs("revision_text_sha1") && df7rs("revision_content_model") <=> df6rs("revision_content_model") && df7rs("revision_content_format") <=> df6rs("revision_content_format") && df7rs("revision_is_deleted_by_page_deletion") <=> df6rs("revision_is_deleted_by_page_deletion") && df7rs("revision_deleted_by_page_deletion_timestamp") <=> df6rs("revision_deleted_by_page_deletion_timestamp") && df7rs("revision_is_identity_reverted") <=> df6rs("revision_is_identity_reverted") && df7rs("revision_first_identity_reverting_revision_id") <=> df6rs("revision_first_identity_reverting_revision_id") && df7rs("revision_seconds_to_identity_revert") <=> df6rs("revision_seconds_to_identity_revert") && df7rs("revision_is_identity_revert") <=> df6rs("revision_is_identity_revert") && df7rs("revision_is_from_before_page_creation") <=> df6rs("revision_is_from_before_page_creation") && df7rs("revision_tags") <=> df6rs("revision_tags")), "leftanti") val baseAvroPath7SimplewikiUpsert = "hdfs:///user/joal/test_hudi/mwh_avro_7_simplewiki_upsert" upsertTodf7s.repartition(16).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7SimplewikiUpsert) val upsertIndf7rs = spark.read.format("avro").load(baseAvroPath7SimplewikiUpsert) // ---> 4% of df7rs rows
- Now write some hudi !
Note: Adding to hive fails because of hive-version mismatch - Maybe we can try the same trick we did for event-refine.
import org.apache.spark.sql.SaveMode import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig val tableName = "joal.test_hudi_mwh" val basePath = "hdfs:///user/joal/test_hudi/mwh" val tableNameSimplewiki = "joal.test_hudi_mwh_simplewiki" val basePathSimplewiki = "hdfs:///user/joal/test_hudi/mwh_simplewiki" // Simplewiki only // Initial load of 2020-06 data df6rs.write.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki). //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableNameSimplewiki). // Use 'bulk_insert' as table operation to load initial values option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL). // Explicitely set Copy-on-write table mode (set by default, cool to know how) option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL). // row-key defined as a hash of fields option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash"). // If two rows have same key, use max timestamp option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot"). // trying to use wiki_db as partition-path option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db"). // Set hive-partition style to true option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). // Set parallelism to a smallish value as data is small option("hoodie.bulkinsert.shuffle.parallelism", "4"). mode(SaveMode.Overwrite). save(basePathSimplewiki) // check inserted data val hf6s = spark.read.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki). // Read snapshot option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL). //The number of wildcard asterisks here must be one greater than the number of partition load(basePathSimplewiki + "/*/*") hf6s.groupBy("snapshot").count.show(100, false) // Insert new rows only newIndf7rs.write.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki). //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableNameSimplewiki). // Use 'upsert' as table operation to update values option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL). // Explicitely set Copy-on-write table mode (set by default, cool to know how) option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL). // row-key defined as a hash of fields option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash"). // If two rows have same key, use max timestamp option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot"). // trying to use wiki_db as partition-path option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db"). // Set hive-partition style to true option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). // Add table to hive in addition to HDFS //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). // Hive Metastore url //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA"). option("hoodie.insert.shuffle.parallelism", "4"). mode(SaveMode.Append). save(basePathSimplewiki) // Upsert (new + changed) upsertIndf7rs.write.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki). //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableNameSimplewiki). // Use 'upsert' as table operation to update values option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL). // Explicitely set Copy-on-write table mode (set by default, cool to know how) option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL). // row-key defined as a hash of fields option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash"). // If two rows have same key, use max timestamp option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot"). // trying to use wiki_db as partition-path option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db"). // Set hive-partition style to true option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). // Add table to hive in addition to HDFS //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). // Hive Metastore url //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA"). option("hoodie.upsert.shuffle.parallelism", "4"). mode(SaveMode.Append). save(basePathSimplewiki) // Full reload of 2020-07 changed data (same as rewrite, except that deletes are not taken into consideration) df7rs.write.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki). //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableNameSimplewiki). // Use 'upsert' as table operation to update values option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL). // Explicitely set Copy-on-write table mode (set by default, cool to know how) option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL). // row-key defined as a hash of fields option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash"). // If two rows have same key, use max timestamp option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot"). // trying to use wiki_db as partition-path option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db"). // Set hive-partition style to true option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). // Add table to hive in addition to HDFS //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). // Hive Metastore url //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA"). option("hoodie.upsert.shuffle.parallelism", "16"). mode(SaveMode.Append). save(basePathSimplewiki) // All wikis // Initial load of 2020-06 data df6r.write.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableName). //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableName). // Use 'bulk_insert' as table operation to load initial values option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL). // Explicitely set Copy-on-write table mode (set by default, cool to know how) option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL). // row-key defined as a hash of fields option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash"). // If two rows have same key, use max timestamp option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot"). // trying to use wiki_db as partition-path option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db"). // Set hive-partition style to true option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). mode(SaveMode.Overwrite). save(basePath) // Check inserted data val hf6 = spark.read.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableName). // Read snapshot option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL). //The number of wildcard asterisks here must be one greater than the number of partition load(basePath + "/*/*") hf6.groupBy("snapshot").count.show(100, false) // Insert new rows only newIndf7r.write.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableName). //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableName). // Use 'upsert' as table operation to update values option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL). // Explicitely set Copy-on-write table mode (set by default, cool to know how) option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL). // row-key defined as a hash of fields option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash"). // If two rows have same key, use max timestamp option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot"). // trying to use wiki_db as partition-path option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db"). // Set hive-partition style to true option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). // Add table to hive in addition to HDFS //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). // Hive Metastore url //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA"). mode(SaveMode.Append). save(basePath) // Upsert (new + changed) upsertTodf7r.write.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableName). //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableName). // Use 'upsert' as table operation to update values option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL). // Explicitely set Copy-on-write table mode (set by default, cool to know how) option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL). // row-key defined as a hash of fields option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash"). // If two rows have same key, use max timestamp option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot"). // trying to use wiki_db as partition-path option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db"). // Set hive-partition style to true option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). // Add table to hive in addition to HDFS //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). // Hive Metastore url //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA"). option("hoodie.upsert.shuffle.parallelism", "4096"). mode(SaveMode.Append). save(basePath) // Full reload of 2020-07 changed data (same as rewrite, except that deletes are not taken into consideration) df7r.write.format("hudi"). // Set table name option(HoodieWriteConfig.TABLE_NAME, tableName). //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableName). // Use 'upsert' as table operation to update values option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL). // Explicitely set Copy-on-write table mode (set by default, cool to know how) option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL). // row-key defined as a hash of fields option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash"). // If two rows have same key, use max timestamp option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot"). // trying to use wiki_db as partition-path option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db"). // Set hive-partition style to true option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). // Add table to hive in addition to HDFS //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). // Hive Metastore url //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA"). option("hoodie.upsert.shuffle.parallelism", "8192"). mode(SaveMode.Append). save(basePath)
Comment Actions
Initial test is done, calling this done, we can either upgrade to Spark 3 using Iceberg's snapshot feature, even if data mutation is not available.