Page MenuHomePhabricator

Test hudi and Iceberg as an incremental update system using 2 mediawiki-history snapshots
Closed, ResolvedPublic

Description

This is about checking that hudi 'works' for the use case, and try to get feelings of performance, using existing data.

Event Timeline

Learnt stuff:

  • Hudi needs a primary-key. We use a hash of values as a compound key. See code below about the implementation.
  • one month of changes impacts almost all files (true for simple-wiki and for all-wikis). Need to check with smaller time-ranges to see real use-cases scenario, but in teerm of data-size the gain is none between snapshot and incremental. There could be a way to sort data in files helping in that regard, optimizations for later.
  • Even for small data (1 month of updates for simplewiki only), the computation-time is long (~10 minutes) (smaller for insert only).

CODE

  • Download hudi bundle jars:
wget https://repository.apache.org/service/local/repositories/releases/content/org/apache/hudi/hudi-spark-bundle_2.11/0.6.0/hudi-spark-bundle_2.11-0.6.0.jar
wget https://repository.apache.org/service/local/repositories/releases/content/org/apache/hudi/hudi-hadoop-mr-bundle/0.6.0/hudi-hadoop-mr-bundle-0.6.0.jar
wget https://repository.apache.org/service/local/repositories/releases/content/org/apache/hudi/hudi-presto-bundle/0.6.0/hudi-presto-bundle-0.6.0.jar
  • Start spark with needed jars
spark2-shell \
  --master yarn \
  --executor-memory 16G \
  --driver-memory 8G \
  --executor-cores 2 \
  --conf spark.executor.memoryOverhead=2048 \
  --conf spark.dynamicAllocation.maxExecutors=46 \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar,/home/joal/hudi-spark-bundle_2.11-0.6.0.jar
  • Checking primary-key
spark.sql("SET spark.sql.shuffle.partitions = 2048")
spark.sql("""
SELECT wiki_db, COUNT(1) as nb_no_single
FROM (
  select
    wiki_db,
    event_entity,
    event_type,
    event_timestamp,
    event_user_id,
    event_user_text_historical,
    page_id,
    page_artificial_id,
    revision_id,
    user_id,
    COUNT(1) as c
  from wmf.mediawiki_history
  where snapshot='2020-07'
  group by
    wiki_db,
    event_entity,
    event_type,
    event_timestamp,
    event_user_id,
    event_user_text_historical,
    page_id,
    page_artificial_id,
    revision_id,
    user_id
  having count(1) > 1
) t
GROUP BY wiki_db
ORDER BY nb_no_single desc LIMIT 100
""").show(100, false)
+---------------+------------+
|wiki_db        |nb_no_single|
+---------------+------------+
|enwiki         |26308       |
|idwiki         |234         |
|commonswiki    |111         |
|enwiktionary   |77          |
|ruwiki         |74          |
|fawiki         |56          |
|chrwiktionary  |35          |
|zhwiki         |35          |
|kywiktionary   |28          |
|tpiwiktionary  |26          |
|thwikisource   |24          |
|eswiki         |20          |
|itwiki         |18          |
|dewiki         |15          |
|miwiktionary   |14          |
|arwiki         |13          |
|kuwikiquote    |12          |
|svwikiversity  |12          |
|mediawikiwiki  |11          |
|ptwiki         |10          |
|testwiki       |10          |
|bugwiki        |9           |
|gawiktionary   |9           |
|shwiki         |8           |
|glwiki         |8           |

--> Some duplication, but very small in comparison to the number of rows - To be checked later.
  • Data preparation for all events of snapshots 2020-06 and 2020-07, both all-wikis and simplewiki only - Write to avro with primary-key
import org.apache.spark.sql.SaveMode

-- Snapshot 2020-06
val df6 = spark.sql("""
SELECT
  md5(concat(
    wiki_db,                                         '|',
    event_entity,                                    '|',
    event_type,                                      '|',
    CAST(unix_timestamp(event_timestamp) AS STRING), '|',
    coalesce(event_user_id, 0),                      '|',
    coalesce(event_user_text_historical, ''),        '|',
    coalesce(page_id, 0),                            '|',
    coalesce(page_artificial_id, ''),                '|',
    coalesce(revision_id, 0),                        '|',
    coalesce(user_id, 0)
  )) as row_key_hash,
  *
from wmf.mediawiki_history
where snapshot  = '2020-06'
  and event_timestamp IS NOT NULL
""")

val baseAvroPath6 = "hdfs:///user/joal/test_hudi/mwh_avro_6"
df6.repartition(2048).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath6)
val df6r = spark.read.format("avro").load(baseAvroPath6)

val baseAvroPath6Simplewiki = "hdfs:///user/joal/test_hudi/mwh_avro_6_simplewiki"
df6r.where("wiki_db = 'simplewiki'").repartition(128).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath6Simplewiki)
val df6rs = spark.read.format("avro").load(baseAvroPath6Simplewiki)


-- Snapshot 2020-07
val df7 = spark.sql("""
SELECT
  md5(concat(
    wiki_db,                                         '|',
    event_entity,                                    '|',
    event_type,                                      '|',
    CAST(unix_timestamp(event_timestamp) AS STRING), '|',
    coalesce(event_user_id, 0),                      '|',
    coalesce(event_user_text_historical, ''),        '|',
    coalesce(page_id, 0),                            '|',
    coalesce(page_artificial_id, ''),                '|',
    coalesce(revision_id, 0),                        '|',
    coalesce(user_id, 0)
  )) as row_key_hash,
  *
from wmf.mediawiki_history
where snapshot  = '2020-07'
  and event_timestamp IS NOT NULL
""")

val baseAvroPath7 = "hdfs:///user/joal/test_hudi/mwh_avro_7"
df7.repartition(2048).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7)
val df7r = spark.read.format("avro").load(baseAvroPath7)

val baseAvroPath7Simplewiki = "hdfs:///user/joal/test_hudi/mwh_avro_7_simplewiki"
df7r.where("wiki_db = 'simplewiki'").repartition(128).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7Simplewiki)
val df7rs = spark.read.format("avro").load(baseAvroPath7Simplewiki)
  • Data preparation of insert-only events between snapshots 2020-06 and 2020-07, both all-wikis and simplewiki only - Write to avro with primary-key
// Insert only between 2020-06 and 2020-07

val newIndf7 = df7r.join(df6r, df7r("row_key_hash") === df6r("row_key_hash"), "leftanti")
val baseAvroPath7New = "hdfs:///user/joal/test_hudi/mwh_avro_7_new"
newIndf7.repartition(512).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7New)
val newIndf7r = spark.read.format("avro").load(baseAvroPath7New)


val newIndf7s = df7rs.join(df6rs, df7rs("row_key_hash") === df6rs("row_key_hash"), "leftanti")
val baseAvroPath7SimplewikiNew = "hdfs:///user/joal/test_hudi/mwh_avro_7_simplewiki_new"
newIndf7s.repartition(16).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7SimplewikiNew)
val newIndf7rs = spark.read.format("avro").load(baseAvroPath7SimplewikiNew)
// ---> 1.5% of df7rs total rows



// Insert + update between 2020-06 and 2020-07

val upsertTodf7 = df7r.join(df6r, (
  df7r("row_key_hash") <=> df6r("row_key_hash") &&
  df7r("event_comment") <=> df6r("event_comment") &&
  df7r("event_user_text") <=> df6r("event_user_text") &&
  df7r("event_user_blocks_historical") <=> df6r("event_user_blocks_historical") &&
  df7r("event_user_blocks") <=> df6r("event_user_blocks") &&
  df7r("event_user_groups_historical") <=> df6r("event_user_groups_historical") &&
  df7r("event_user_groups") <=> df6r("event_user_groups") &&
  df7r("event_user_is_bot_by_historical") <=> df6r("event_user_is_bot_by_historical") &&
  df7r("event_user_is_bot_by") <=> df6r("event_user_is_bot_by") &&
  df7r("event_user_is_created_by_self") <=> df6r("event_user_is_created_by_self") &&
  df7r("event_user_is_created_by_system") <=> df6r("event_user_is_created_by_system") &&
  df7r("event_user_is_created_by_peer") <=> df6r("event_user_is_created_by_peer") &&
  df7r("event_user_is_anonymous") <=> df6r("event_user_is_anonymous") &&
  df7r("event_user_registration_timestamp") <=> df6r("event_user_registration_timestamp") &&
  df7r("event_user_creation_timestamp") <=> df6r("event_user_creation_timestamp") &&
  df7r("event_user_first_edit_timestamp") <=> df6r("event_user_first_edit_timestamp") &&
  df7r("event_user_revision_count") <=> df6r("event_user_revision_count") &&
  df7r("event_user_seconds_since_previous_revision") <=> df6r("event_user_seconds_since_previous_revision") &&
  df7r("page_title_historical") <=> df6r("page_title_historical") &&
  df7r("page_title") <=> df6r("page_title") &&
  df7r("page_namespace_historical") <=> df6r("page_namespace_historical") &&
  df7r("page_namespace_is_content_historical") <=> df6r("page_namespace_is_content_historical") &&
  df7r("page_namespace") <=> df6r("page_namespace") &&
  df7r("page_namespace_is_content") <=> df6r("page_namespace_is_content") &&
  df7r("page_is_redirect") <=> df6r("page_is_redirect") &&
  df7r("page_is_deleted") <=> df6r("page_is_deleted") &&
  df7r("page_creation_timestamp") <=> df6r("page_creation_timestamp") &&
  df7r("page_first_edit_timestamp") <=> df6r("page_first_edit_timestamp") &&
  df7r("page_revision_count") <=> df6r("page_revision_count") &&
  df7r("page_seconds_since_previous_revision") <=> df6r("page_seconds_since_previous_revision") &&
  df7r("user_text_historical") <=> df6r("user_text_historical") &&
  df7r("user_text") <=> df6r("user_text") &&
  df7r("user_blocks_historical") <=> df6r("user_blocks_historical") &&
  df7r("user_blocks") <=> df6r("user_blocks") &&
  df7r("user_groups_historical") <=> df6r("user_groups_historical") &&
  df7r("user_groups") <=> df6r("user_groups") &&
  df7r("user_is_bot_by_historical") <=> df6r("user_is_bot_by_historical") &&
  df7r("user_is_bot_by") <=> df6r("user_is_bot_by") &&
  df7r("user_is_created_by_self") <=> df6r("user_is_created_by_self") &&
  df7r("user_is_created_by_system") <=> df6r("user_is_created_by_system") &&
  df7r("user_is_created_by_peer") <=> df6r("user_is_created_by_peer") &&
  df7r("user_is_anonymous") <=> df6r("user_is_anonymous") &&
  df7r("user_registration_timestamp") <=> df6r("user_registration_timestamp") &&
  df7r("user_creation_timestamp") <=> df6r("user_creation_timestamp") &&
  df7r("user_first_edit_timestamp") <=> df6r("user_first_edit_timestamp") &&
  df7r("revision_parent_id") <=> df6r("revision_parent_id") &&
  df7r("revision_minor_edit") <=> df6r("revision_minor_edit") &&
  df7r("revision_deleted_parts") <=> df6r("revision_deleted_parts") &&
  df7r("revision_deleted_parts_are_suppressed") <=> df6r("revision_deleted_parts_are_suppressed") &&
  df7r("revision_text_bytes") <=> df6r("revision_text_bytes") &&
  df7r("revision_text_bytes_diff") <=> df6r("revision_text_bytes_diff") &&
  df7r("revision_text_sha1") <=> df6r("revision_text_sha1") &&
  df7r("revision_content_model") <=> df6r("revision_content_model") &&
  df7r("revision_content_format") <=> df6r("revision_content_format") &&
  df7r("revision_is_deleted_by_page_deletion") <=> df6r("revision_is_deleted_by_page_deletion") &&
  df7r("revision_deleted_by_page_deletion_timestamp") <=> df6r("revision_deleted_by_page_deletion_timestamp") &&
  df7r("revision_is_identity_reverted") <=> df6r("revision_is_identity_reverted") &&
  df7r("revision_first_identity_reverting_revision_id") <=> df6r("revision_first_identity_reverting_revision_id") &&
  df7r("revision_seconds_to_identity_revert") <=> df6r("revision_seconds_to_identity_revert") &&
  df7r("revision_is_identity_revert") <=> df6r("revision_is_identity_revert") &&
  df7r("revision_is_from_before_page_creation") <=> df6r("revision_is_from_before_page_creation") &&
  df7r("revision_tags") <=> df6r("revision_tags")), "leftanti")

val baseAvroPath7Upsert = "hdfs:///user/joal/test_hudi/mwh_avro_7_upsert"
upsertTodf7.repartition(512).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7Upsert)
val upsertTodf7r = spark.read.format("avro").load(baseAvroPath7Upsert)


val upsertTodf7s = df7rs.join(df6rs,(
  df7rs("row_key_hash") <=> df6rs("row_key_hash") &&
  df7rs("event_comment") <=> df6rs("event_comment") &&
  df7rs("event_user_text") <=> df6rs("event_user_text") &&
  df7rs("event_user_blocks_historical") <=> df6rs("event_user_blocks_historical") &&
  df7rs("event_user_blocks") <=> df6rs("event_user_blocks") &&
  df7rs("event_user_groups_historical") <=> df6rs("event_user_groups_historical") &&
  df7rs("event_user_groups") <=> df6rs("event_user_groups") &&
  df7rs("event_user_is_bot_by_historical") <=> df6rs("event_user_is_bot_by_historical") &&
  df7rs("event_user_is_bot_by") <=> df6rs("event_user_is_bot_by") &&
  df7rs("event_user_is_created_by_self") <=> df6rs("event_user_is_created_by_self") &&
  df7rs("event_user_is_created_by_system") <=> df6rs("event_user_is_created_by_system") &&
  df7rs("event_user_is_created_by_peer") <=> df6rs("event_user_is_created_by_peer") &&
  df7rs("event_user_is_anonymous") <=> df6rs("event_user_is_anonymous") &&
  df7rs("event_user_registration_timestamp") <=> df6rs("event_user_registration_timestamp") &&
  df7rs("event_user_creation_timestamp") <=> df6rs("event_user_creation_timestamp") &&
  df7rs("event_user_first_edit_timestamp") <=> df6rs("event_user_first_edit_timestamp") &&
  df7rs("event_user_revision_count") <=> df6rs("event_user_revision_count") &&
  df7rs("event_user_seconds_since_previous_revision") <=> df6rs("event_user_seconds_since_previous_revision") &&
  df7rs("page_title_historical") <=> df6rs("page_title_historical") &&
  df7rs("page_title") <=> df6rs("page_title") &&
  df7rs("page_namespace_historical") <=> df6rs("page_namespace_historical") &&
  df7rs("page_namespace_is_content_historical") <=> df6rs("page_namespace_is_content_historical") &&
  df7rs("page_namespace") <=> df6rs("page_namespace") &&
  df7rs("page_namespace_is_content") <=> df6rs("page_namespace_is_content") &&
  df7rs("page_is_redirect") <=> df6rs("page_is_redirect") &&
  df7rs("page_is_deleted") <=> df6rs("page_is_deleted") &&
  df7rs("page_creation_timestamp") <=> df6rs("page_creation_timestamp") &&
  df7rs("page_first_edit_timestamp") <=> df6rs("page_first_edit_timestamp") &&
  df7rs("page_revision_count") <=> df6rs("page_revision_count") &&
  df7rs("page_seconds_since_previous_revision") <=> df6rs("page_seconds_since_previous_revision") &&
  df7rs("user_text_historical") <=> df6rs("user_text_historical") &&
  df7rs("user_text") <=> df6rs("user_text") &&
  df7rs("user_blocks_historical") <=> df6rs("user_blocks_historical") &&
  df7rs("user_blocks") <=> df6rs("user_blocks") &&
  df7rs("user_groups_historical") <=> df6rs("user_groups_historical") &&
  df7rs("user_groups") <=> df6rs("user_groups") &&
  df7rs("user_is_bot_by_historical") <=> df6rs("user_is_bot_by_historical") &&
  df7rs("user_is_bot_by") <=> df6rs("user_is_bot_by") &&
  df7rs("user_is_created_by_self") <=> df6rs("user_is_created_by_self") &&
  df7rs("user_is_created_by_system") <=> df6rs("user_is_created_by_system") &&
  df7rs("user_is_created_by_peer") <=> df6rs("user_is_created_by_peer") &&
  df7rs("user_is_anonymous") <=> df6rs("user_is_anonymous") &&
  df7rs("user_registration_timestamp") <=> df6rs("user_registration_timestamp") &&
  df7rs("user_creation_timestamp") <=> df6rs("user_creation_timestamp") &&
  df7rs("user_first_edit_timestamp") <=> df6rs("user_first_edit_timestamp") &&
  df7rs("revision_parent_id") <=> df6rs("revision_parent_id") &&
  df7rs("revision_minor_edit") <=> df6rs("revision_minor_edit") &&
  df7rs("revision_deleted_parts") <=> df6rs("revision_deleted_parts") &&
  df7rs("revision_deleted_parts_are_suppressed") <=> df6rs("revision_deleted_parts_are_suppressed") &&
  df7rs("revision_text_bytes") <=> df6rs("revision_text_bytes") &&
  df7rs("revision_text_bytes_diff") <=> df6rs("revision_text_bytes_diff") &&
  df7rs("revision_text_sha1") <=> df6rs("revision_text_sha1") &&
  df7rs("revision_content_model") <=> df6rs("revision_content_model") &&
  df7rs("revision_content_format") <=> df6rs("revision_content_format") &&
  df7rs("revision_is_deleted_by_page_deletion") <=> df6rs("revision_is_deleted_by_page_deletion") &&
  df7rs("revision_deleted_by_page_deletion_timestamp") <=> df6rs("revision_deleted_by_page_deletion_timestamp") &&
  df7rs("revision_is_identity_reverted") <=> df6rs("revision_is_identity_reverted") &&
  df7rs("revision_first_identity_reverting_revision_id") <=> df6rs("revision_first_identity_reverting_revision_id") &&
  df7rs("revision_seconds_to_identity_revert") <=> df6rs("revision_seconds_to_identity_revert") &&
  df7rs("revision_is_identity_revert") <=> df6rs("revision_is_identity_revert") &&
  df7rs("revision_is_from_before_page_creation") <=> df6rs("revision_is_from_before_page_creation") &&
  df7rs("revision_tags") <=> df6rs("revision_tags")), "leftanti")

val baseAvroPath7SimplewikiUpsert = "hdfs:///user/joal/test_hudi/mwh_avro_7_simplewiki_upsert"
upsertTodf7s.repartition(16).write.format("avro").mode(SaveMode.Overwrite).save(baseAvroPath7SimplewikiUpsert)
val upsertIndf7rs = spark.read.format("avro").load(baseAvroPath7SimplewikiUpsert)
// ---> 4% of df7rs rows
  • Now write some hudi !

Note: Adding to hive fails because of hive-version mismatch - Maybe we can try the same trick we did for event-refine.

import org.apache.spark.sql.SaveMode
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig

val tableName = "joal.test_hudi_mwh"
val basePath = "hdfs:///user/joal/test_hudi/mwh"

val tableNameSimplewiki = "joal.test_hudi_mwh_simplewiki"
val basePathSimplewiki = "hdfs:///user/joal/test_hudi/mwh_simplewiki"


// Simplewiki only
// Initial load of 2020-06 data
df6rs.write.format("hudi").
  // Set table name
  option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki).
  //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableNameSimplewiki).
  // Use 'bulk_insert' as table operation to load initial values
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
  // Explicitely set Copy-on-write table mode (set by default, cool to know how)
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).
  // row-key defined as a hash of fields
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash").
  // If two rows have same key, use max timestamp
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot").
  // trying to use wiki_db as partition-path
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db").
  // Set hive-partition style to true
  option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
  // Set parallelism to a smallish value as data is small
  option("hoodie.bulkinsert.shuffle.parallelism", "4").
  mode(SaveMode.Overwrite).
  save(basePathSimplewiki)

// check inserted data
val hf6s = spark.read.format("hudi").
     // Set table name
     option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki).
     // Read snapshot
     option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
     //The number of wildcard asterisks here must be one greater than the number of partition
     load(basePathSimplewiki + "/*/*")
hf6s.groupBy("snapshot").count.show(100, false)


// Insert new rows only
newIndf7rs.write.format("hudi").
  // Set table name
  option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki).
  //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableNameSimplewiki).
  // Use 'upsert' as table operation to update values
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL).
  // Explicitely set Copy-on-write table mode (set by default, cool to know how)
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).
  // row-key defined as a hash of fields
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash").
  // If two rows have same key, use max timestamp
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot").
  // trying to use wiki_db as partition-path
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db").
  // Set hive-partition style to true
  option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
  // Add table to hive in addition to HDFS
  //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
  // Hive Metastore url
  //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA").
  option("hoodie.insert.shuffle.parallelism", "4").
  mode(SaveMode.Append).
  save(basePathSimplewiki)


// Upsert (new + changed)

upsertIndf7rs.write.format("hudi").
  // Set table name
  option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki).
  //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableNameSimplewiki).
  // Use 'upsert' as table operation to update values
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
  // Explicitely set Copy-on-write table mode (set by default, cool to know how)
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).
  // row-key defined as a hash of fields
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash").
  // If two rows have same key, use max timestamp
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot").
  // trying to use wiki_db as partition-path
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db").
  // Set hive-partition style to true
  option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
  // Add table to hive in addition to HDFS
  //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
  // Hive Metastore url
  //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA").
  option("hoodie.upsert.shuffle.parallelism", "4").
  mode(SaveMode.Append).
  save(basePathSimplewiki)


// Full reload of 2020-07 changed data (same as rewrite, except that deletes are not taken into consideration)
df7rs.write.format("hudi").
  // Set table name
  option(HoodieWriteConfig.TABLE_NAME, tableNameSimplewiki).
  //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableNameSimplewiki).
  // Use 'upsert' as table operation to update values
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
  // Explicitely set Copy-on-write table mode (set by default, cool to know how)
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).
  // row-key defined as a hash of fields
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash").
  // If two rows have same key, use max timestamp
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot").
  // trying to use wiki_db as partition-path
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db").
  // Set hive-partition style to true
  option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
  // Add table to hive in addition to HDFS
  //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
  // Hive Metastore url
  //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA").
  option("hoodie.upsert.shuffle.parallelism", "16").
  mode(SaveMode.Append).
  save(basePathSimplewiki)



// All wikis

// Initial load of 2020-06 data 
df6r.write.format("hudi").
  // Set table name
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableName).
  // Use 'bulk_insert' as table operation to load initial values
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
  // Explicitely set Copy-on-write table mode (set by default, cool to know how)
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).
  // row-key defined as a hash of fields
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash").
  // If two rows have same key, use max timestamp
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot").
  // trying to use wiki_db as partition-path
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db").
  // Set hive-partition style to true
  option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
  mode(SaveMode.Overwrite).
  save(basePath)

// Check inserted data
val hf6 = spark.read.format("hudi").
     // Set table name
     option(HoodieWriteConfig.TABLE_NAME, tableName).
     // Read snapshot
     option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).
     //The number of wildcard asterisks here must be one greater than the number of partition
     load(basePath + "/*/*")
hf6.groupBy("snapshot").count.show(100, false)


// Insert new rows only
newIndf7r.write.format("hudi").
  // Set table name
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableName).
  // Use 'upsert' as table operation to update values
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL).
  // Explicitely set Copy-on-write table mode (set by default, cool to know how)
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).
  // row-key defined as a hash of fields
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash").
  // If two rows have same key, use max timestamp
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot").
  // trying to use wiki_db as partition-path
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db").
  // Set hive-partition style to true
  option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
  // Add table to hive in addition to HDFS
  //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
  // Hive Metastore url
  //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA").
  mode(SaveMode.Append).
  save(basePath)

// Upsert (new + changed)
upsertTodf7r.write.format("hudi").
  // Set table name
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableName).
  // Use 'upsert' as table operation to update values
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
  // Explicitely set Copy-on-write table mode (set by default, cool to know how)
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).
  // row-key defined as a hash of fields
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash").
  // If two rows have same key, use max timestamp
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot").
  // trying to use wiki_db as partition-path
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db").
  // Set hive-partition style to true
  option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
  // Add table to hive in addition to HDFS
  //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
  // Hive Metastore url
  //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA").
  option("hoodie.upsert.shuffle.parallelism", "4096").
  mode(SaveMode.Append).
  save(basePath)


// Full reload of 2020-07 changed data (same as rewrite, except that deletes are not taken into consideration)
df7r.write.format("hudi").
  // Set table name
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  //option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY, tableName).
  // Use 'upsert' as table operation to update values
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
  // Explicitely set Copy-on-write table mode (set by default, cool to know how)
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL).
  // row-key defined as a hash of fields
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "row_key_hash").
  // If two rows have same key, use max timestamp
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "snapshot").
  // trying to use wiki_db as partition-path
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "wiki_db").
  // Set hive-partition style to true
  option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
  // Add table to hive in addition to HDFS
  //option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
  // Hive Metastore url
  //option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://an-coord1001.eqiad.wmnet:10000/default;principal=hive/_HOST@WIKIMEDIA").
  option("hoodie.upsert.shuffle.parallelism", "8192").
  mode(SaveMode.Append).
  save(basePath)
Milimetric moved this task from Incoming to Smart Tools for Better Data on the Analytics board.
JAllemandou renamed this task from Test hudi as an incremental update system using 2 mediawiki-history snapshots to Test hudi and Iceberg as an incremental update system using 2 mediawiki-history snapshots.Oct 26 2020, 4:38 PM
JAllemandou moved this task from In Progress to Paused on the Analytics-Kanban board.

Next up: test Apache Iceberg when data mutation feature (Copy on Write) is released.

Milimetric added a subscriber: Milimetric.

Initial test is done, calling this done, we can either upgrade to Spark 3 using Iceberg's snapshot feature, even if data mutation is not available.