Page MenuHomePhabricator

Propagate field descriptions from event schemas to Hive event tables and into DataHub
Closed, DeclinedPublic5 Estimated Story Points

Description

As a user of DataHub I expect the columns of the tables in the event database to automatically have descriptions based on the corresponding schemas. @Milimetric suggested that since the hive metastore in ingested into DataHub what we actually need is to populate the metastore table metadata with field descriptions from the corresponding schemas

Related Objects

Event Timeline

EChetty set the point value for this task to 2.May 5 2022, 4:15 PM
EChetty changed the point value for this task from 2 to 5.Aug 17 2022, 2:17 PM

Indeed! The JSONSchema -> Spark Schema code does use the descriptions, but I think we are losing them when we map to Hive DDL: https://github.com/wikimedia/analytics-refinery-source/blob/master/refinery-spark/src/main/scala/org/wikimedia/analytics/refinery/spark/sql/HiveExtensions.scala#L124-L134.

I think this would automatically just work if we could create/alter the tables through Spark directly, rather than through Hive.

Once we are fully on Spark 3, we should be able to do T209453: Refine: Use Spark SQL instead of Hive JDBC, which I think should address this problem.

e.g. there are enriched fields in the Hive tables, like geocoded_data, etc.

How would the DataHub description for geocoded_data (and any other fields that get added in refinement) get populated?

They'd have to be set when the field is added to the table the first time. If a new event stream is declared, Refine will pick it up CREATE the table with the geocoded_data field. Since Refine is adding the field, it needs to set the description there.

Not exactly sure how that would work, but probably somewhere in here, augment the workingDf.schema to set the description for the newly added geocoded_data field.

For existent Hive event tables, we'd have to do a big ALTER TABLE set comment kind on all of them.

DataHub has an API and we can use to import the schema. That schema should ideally be tied to the Kafka topics as this is the true source.

This task is about the event tables in Hive, for which most fields are indeed created from the event schemas, but not all. If we properly propagated the field descriptions from the event schemas to the Hive tables, then the descriptions would just be imported into Datahub from Hive via the regular import process.

T318863: [Event Platform] Event Platform and DataHub Integration is about cataloging the Event Platform streams in Kafka with their event schemas.

@Ottomata makes sense. Thanks for posting the ticket

Ottomata renamed this task from Propagate field descriptions from event schemas to metastore to Propagate field descriptions from event schemas to Hive event tables.May 30 2023, 5:02 PM
xcollazo subscribed.

Just passing by to say that it would be nice to see this ticket happen. CC @lbowmaker.

Change 987195 had a related patch set uploaded (by Ottomata; author: Ottomata):

[analytics/refinery/source@master] spark HiveExtensions now support column COMMENTs in DDL and merge helpers

https://gerrit.wikimedia.org/r/987195

I think this would automatically just work if we could create/alter the tables through Spark directly, rather than through Hive.

So, while I think this is true, T209453: Refine: Use Spark SQL instead of Hive JDBC is proving not easy in Spark 3 either. Looks like we'll need some changes in upstream Spark.

However, since I was poking around in the code, I think it is possible to solve this task without the Spark SQL ALTER DDL support we need. I just pushed a patch to refinery-source HiveExtensions that I think should do what we need. It will also have the really nice effect of adding the field descriptions to all of the Refined event tables! ...will test this!

Wow it...kinda...works~

CREATE TABLE otto.mw_page_change0 LIKE event.mediawiki_page_change_v1;

Then I ran our EvolveHiveTable tool to alter my table. Here's the output of the tool with all the generated ALTER statements.

The result!

hive (otto)> describe mw_page_change0;
OK
col_name	data_type	comment
_schema             	string              	A URI identifying the JSONSchema for this event. This should match an schema's $id in a schema repository. E.g. /schema/title/1.0.0
changelog_kind      	string              	The kind of this event in a changelog. This is used to map the event to an action in a data store.
comment             	string              	The comment left by the user that performed this change. Same as revision.comment on edits.
created_redirect_page	struct<is_redirect:boolean,namespace_id:bigint,page_id:bigint,page_title:string,revision_count:bigint>	Page entity that was created at the old title during a page move. This is only set for page move events. Note that the created_redirect_page will also have its own associated page create event.
dt                  	string              	ISO-8601 formatted timestamp of when the event occurred/was generated in UTC), AKA 'event time'. This is different than meta.dt, which is used as the time the system received this event.
meta                	struct<domain:string,dt:string,id:string,request_id:string,stream:string,uri:string>
page                	struct<is_redirect:boolean,namespace_id:bigint,page_id:bigint,page_title:string,revision_count:bigint,redirect_page_link:struct<interwiki_prefix:string,is_redirect:boolean,namespace_id:bigint,page_id:bigint,page_title:string>>	Fields for MediaWiki page entity.
page_change_kind    	string              	The origin kind of the change to this page as viewed by MediaWiki.
performer           	struct<edit_count:bigint,groups:array<string>,is_bot:boolean,is_system:boolean,is_temp:boolean,registration_dt:string,user_id:bigint,user_text:string>	Represents the MediaWiki actor that made this change. If this change is an edit, this will be the same as revision.editor.
prior_state         	struct<page:struct<is_redirect:boolean,namespace_id:bigint,page_id:bigint,page_title:string,revision_count:bigint>,revision:struct<comment:string,content_slots:map<string,struct<content_body:string,content_format:string,content_model:string,content_sha1:string,content_size:bigint,origin_rev_id:bigint,slot_role:string>>,editor:struct<edit_count:bigint,groups:array<string>,is_bot:boolean,is_system:boolean,is_temp:boolean,registration_dt:string,user_id:bigint,user_text:string>,is_comment_visible:boolean,is_content_visible:boolean,is_editor_visible:boolean,is_minor_edit:boolean,rev_dt:string,rev_id:bigint,rev_parent_id:bigint,rev_sha1:string,rev_size:bigint>>	Prior state of this page before this event. Fields are only present if their values have changed.
revision            	struct<comment:string,content_slots:map<string,struct<content_body:string,content_format:string,content_model:string,content_sha1:string,content_size:bigint,origin_rev_id:bigint,slot_role:string>>,editor:struct<edit_count:bigint,groups:array<string>,is_bot:boolean,is_system:boolean,is_temp:boolean,registration_dt:string,user_id:bigint,user_text:string>,is_comment_visible:boolean,is_content_visible:boolean,is_editor_visible:boolean,is_minor_edit:boolean,rev_dt:string,rev_id:bigint,rev_parent_id:bigint,rev_sha1:string,rev_size:bigint>	Fields for MediaWiki revision entity.
wiki_id             	string              	The wiki ID, which is usually the same as the MediaWiki database name. E.g. enwiki, metawiki, etc.
is_wmf_domain       	boolean
normalized_host     	struct<project_class:string,project:string,qualifiers:array<string>,tld:string,project_family:string>
datacenter          	string
year                	bigint
month               	bigint
day                 	bigint
hour                	bigint

# Partition Information
# col_name            	data_type           	comment

datacenter          	string
year                	bigint
month               	bigint
day                 	bigint
hour                	bigint
Time taken: 0.313 seconds, Fetched: 28 row(s)
hive (otto)> describe mediawiki_page_change_v1;
FAILED: SemanticException [Error 10001]: Table not found mediawiki_page_change_v1
hive (otto)> describe event.mediawiki_page_change_v1;
OK
col_name	data_type	comment
_schema             	string
changelog_kind      	string
comment             	string
created_redirect_page	struct<is_redirect:boolean,namespace_id:bigint,page_id:bigint,page_title:string,revision_count:bigint>
dt                  	string
meta                	struct<domain:string,dt:string,id:string,request_id:string,stream:string,uri:string>
page                	struct<is_redirect:boolean,namespace_id:bigint,page_id:bigint,page_title:string,revision_count:bigint,redirect_page_link:struct<interwiki_prefix:string,is_redirect:boolean,namespace_id:bigint,page_id:bigint,page_title:string>>
page_change_kind    	string
performer           	struct<edit_count:bigint,groups:array<string>,is_bot:boolean,is_system:boolean,is_temp:boolean,registration_dt:string,user_id:bigint,user_text:string>
prior_state         	struct<page:struct<is_redirect:boolean,namespace_id:bigint,page_id:bigint,page_title:string,revision_count:bigint>,revision:struct<comment:string,content_slots:map<string,struct<content_body:string,content_format:string,content_model:string,content_sha1:string,content_size:bigint,origin_rev_id:bigint,slot_role:string>>,editor:struct<edit_count:bigint,groups:array<string>,is_bot:boolean,is_system:boolean,is_temp:boolean,registration_dt:string,user_id:bigint,user_text:string>,is_comment_visible:boolean,is_content_visible:boolean,is_editor_visible:boolean,is_minor_edit:boolean,rev_dt:string,rev_id:bigint,rev_parent_id:bigint,rev_sha1:string,rev_size:bigint>>
revision            	struct<comment:string,content_slots:map<string,struct<content_body:string,content_format:string,content_model:string,content_sha1:string,content_size:bigint,origin_rev_id:bigint,slot_role:string>>,editor:struct<edit_count:bigint,groups:array<string>,is_bot:boolean,is_system:boolean,is_temp:boolean,registration_dt:string,user_id:bigint,user_text:string>,is_comment_visible:boolean,is_content_visible:boolean,is_editor_visible:boolean,is_minor_edit:boolean,rev_dt:string,rev_id:bigint,rev_parent_id:bigint,rev_sha1:string,rev_size:bigint>
wiki_id             	string
is_wmf_domain       	boolean
normalized_host     	struct<project_class:string,project:string,qualifiers:array<string>,tld:string,project_family:string>

...

It looks like the comments were only added for top level fields, even though the alter table change column included them?

I just did a test in pure Hive, and indeed, sub struct field comments don't seem to be retained.

CREATE TABLE otto.nested1 (c1 struct<f1:int COMMENT "f1 comment"> COMMENT "c1 comment") STORED AS PARQUET;

hive (otto)> describe otto.nested1;
OK
col_name	data_type	comment
c1                  	struct<f1:int>      	c1 comment

As you can see, there is no comment for sub field c1.f1.

Datahub allows you to add descriptions at sub-field level. We should at some point get to consensus about where we want all this description stuff to live. We talked about:

  • table-creation code (direct hql or indirect via schemas) being the primer, but once it's in Datahub that being the authoritative source of truth. The two interplay nicely as long as changes are backwards compatible, and manual intervention is needed in backwards-incompatible cases.
  • driving everything from the create script/schema and syncing the stuff people put in Datahub back to the code (I have doubts about this approach)

But we should decide sometime soon

we should decide sometime soon

Aye, prob a different ticket.

syncing the stuff people put in Datahub back to the code

Many of the field descriptions come from come from many common/shared fragment schemas. If people want changes in the documentation of a field, they should definitely update it in the source schemas. New schemas will be created using the fragments, causing the 'old' description to be used for the same field in a new schema.

More generally I think things should be synced back anyway. Datahub is not the only way people get info about e.g. Hive tables. E.g. I don't think Datahub will stop people from ever doing describe table.

Change #1016808 had a related patch set uploaded (by Aqu; author: Aqu):

[analytics/refinery/source@master] Add CLI to create or update Iceberg tables

https://gerrit.wikimedia.org/r/1016808

Change #1016808 merged by jenkins-bot:

[analytics/refinery/source@master] Refactor Refine to be triggered by Airflow

https://gerrit.wikimedia.org/r/1016808

@Antoine_Quhen I cannot remember, but I thought the new RefineHiveDataset stuff would have solved this task. Should it have?

We discussed this in today's Data Engineer Sync. The only reason this isn't done is that we didn't do it!

We updated the patch, it should work.

https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/987195

We should probably use this code to do a manual EvolveHiveTable before deploying it so we can manage all the hundreds of ALTERs and make sure it works.

However, this patch as provided will only support adding and evolving COMMENTs on top level fields.

We could potentially support adding COMMENTs on nested fields when tables are first created if we were to modify TableSchemaManager so that CREATE statements were executed via Spark SQL directly. We can't support ALTERs via Spark SQL (because of annoying reasons, see details in T209453), so we wouldn't be able to evolve the comments of nested fields. But at least we'd pick up nested field comments when the tables are created. Comments are rarely changed anyway, so this would be better than nothing!

We'd have to add a big caveat/notice in event platform docs somewhere that say that nested field comments cannot be evolved. Someone might update a comment and expect the Hive table to reflect the change.

We should still at least merge and use the patch as is to get top level field descriptions.

Hm, actually there is a little more needed than just the patch as is. We currently clear schema metaData when evolving the table, but we need to do so but still preserve comments as the original (old) patch did.

However:

  • For each field, we simply call dataType.sql, which already includes nested field comments!
  • Hive JDBC DDL apparently does not even respect nested field comments, so including them in DDL will be ignored.
  • If we clear metadata but preserve nested field comments, Refine will try to run the ALTER to add nested comments on every run. Since they are ignored, they won't be applied, and they will run next time.

So, we need to:

  • clear metadata and preserve comments only on top level fields, but clear all metadata including comments on nested fields.

Growl.

Latest patch I think is on track to do what we need:

  • Let HiveExtensions just do the right thing and set (and merge) schemas to keep comments on all fields.
  • When normalizing or perhaps better when we are about to apply alters, remove comments on nested fields by clearing metadata but retaining comments top level.

WIP, but I have to go now!

I think we can get original nested field descriptions when we create tables if we use Spark SQL to create and only use Hive JDBC for alters. But that might be a riskier change. TBD.

Ottomata renamed this task from Propagate field descriptions from event schemas to Hive event tables to Propagate field descriptions from event schemas to Hive event tables and into DataHub.Sep 29 2025, 4:40 PM

Change #1199473 had a related patch set uploaded (by Aqu; author: Aqu):

[analytics/refinery/source@master] Maintain Spark schema after Hive DDL operations

https://gerrit.wikimedia.org/r/1199473

@aqu why mergeComments vs e.g mergeFieldsMetadata like in https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/987195/10/refinery-spark/src/main/scala/org/wikimedia/analytics/refinery/spark/sql/HiveExtensions.scala? Since Spark treats comments like a kind of field metadata, shouldn't we make the SparkSqlExtension stuff do the same for all metadata?

Worked a bit on this today, needed to do some code reading and remember a lot.

Started a slack thread here.

  • why do we emptyMetadata in the normalizeFieldNamesAndWidenTypes TransformFunction?
  • DataFrameExtensions convertToSchema will preserve comments on top level fields that have not changed (no type casting, etc.)
    • Spark apparently magically keeps the underlying StructField schema on a result if you do a SQL like SELECT f1 from table1; if table1 schema f1 field had a comment, it will remain in the result DataFrame schema f1 field comment too!
    • The comment will not remain if any CASTING is involved, which is how we handle all other cases in buildSQLFieldsRec.
df.schema.toDDL
// res25: String = `a` STRUCT<`aa`: INT, `ab`: BIGINT, `ac`: BOOLEAN>,`b` BIGINT COMMENT 'field b comment v0',`c` BOOLEAN

  
df.createOrReplaceTempView("t1")
spark.sql("select b as b, cast(b as int) as b_int from t1").schema.toDDL
// res28: String = `b` BIGINT COMMENT 'field b comment v0',`b_int` INT
  • I tried to see if I could make convertToSchema somehow preserve comments.
    • First by trying some CAST with COMMENT. AI told me this would work, but it does not.
    • We could try to do some magic at the end of convertToSchema like convertedDf.sqlContext.createDataFrame(convertedDf.rdd, schemaWithComments). This might work, but...
  • But why am I trying to preserve comments on a DataFrame? What I care about is the comments when the table is evolved.
  • Next, I want to try my suggestion here. Antoine added a 'schema' for TransformFunctions. So, there is no real reason to use convertToSchema on a DataFrame to get the final merged schema like is done in applyTransforms. I think we can just merge the schema together with all of the transform schemas and fully avoid using a DataFrame in EvolveHiveTable at all!

Change #1239359 had a related patch set uploaded (by Ottomata; author: Ottomata):

[analytics/refinery/source@master] SparkSqlExtensions - noop rename and comments for DataFrame convertToSchema

https://gerrit.wikimedia.org/r/1239359

I'm trying some things, and I'm not sure updateSparkSchemaProperties is going to work. At least, it hasn't in my own adaptations. Here's why:

The Hive metastore defined schema will not have any comments for fields nested in structs. updateSparkSchemaProperties tries to work around this by relying on the fact that Spark maintains its own version of the schema in tableproperties, and then prefers(?) that over the Hive one.

updateSparkSchemaProperties does successfully add comments to nested fields in Sparks' TBLPROPERTIES spark.sql.sources.schema.part.0 field. But, when attempting to access the table again to check out the schema via Spark scala, I get this exception:

org.apache.spark.sql.AnalysisException: Column in converted table has different data type with source Hive table's. Set spark.sql.hive.convertMetastoreParquet to false, or recreate table `otto`.`te1` to workaround.
  at org.apache.spark.sql.hive.HiveMetastoreCatalog.convertToLogicalRelation(HiveMetastoreCatalog.scala:269)
...

The offending line in HiveMetastoreCatalog is:

if (!result.output.zip(relation.output).forall {
      case (a1, a2) => a1.dataType == a2.dataType }) {

If the field is a struct, the nested field dataTypes are not recursively compared, instead it is just a naive comparison. If a1 has any nested fields with any differences at all (like a nested field has a comment), then this comparison will be false.

I.e.

it should "compare dataType of struct fields with and without comments" in {
        val schema0 = StructType(Seq(
            StructField("F1", IntegerType),
            StructField("f2", StructType(Seq(
                StructField("S1", StringType),
            )))
        ))
        val schema1 = StructType(Seq(
            StructField("F1", IntegerType).withComment("F1 comment 1"),
            StructField("f2", StructType(Seq(
                StructField("S1", StringType).withComment("S1 comment 1"),
            )))
        ))

        assert(
            schema0("F1").dataType == schema1("F1").dataType,
            "Primitive dataType should match even if comments do not."
        )
        assert(
            schema0("f2").dataType == schema1("f2").dataType,
            "This assertion will fail!  StructType dataTypes will not match if any of their nested field comments do not match."
        )
    }

What is extra strange though, is that spark3-sql does not complain!!! I ran updateSparkSchemaProperties on
otto.te1 and look!

spark-sql (default)> show create table otto.te1;
createtab_stmt
CREATE TABLE `otto`.`te1` (
  `$schema` STRING COMMENT 'A URI identifying the JSONSchema for this event. This should match an schema\'s $id in a schema repository. E.g. /schema/title/1.0.0
',
  `meta` STRUCT<`domain`: STRING COMMENT 'Domain the event or entity pertains to', `dt`: STRING COMMENT 'UTC event datetime, in ISO-8601 format', `id`: STRING COMMENT 'Unique ID of this event', `request_id`: STRING COMMENT 'Unique ID of the request that caused the event', `stream`: STRING COMMENT 'Name of the stream/queue/dataset that this event belongs in', `uri`: STRING COMMENT 'Unique URI identifying the event or entity'>,
  `test` STRING,
  `test_map` MAP<STRING, STRING> COMMENT 'We want to support \'map\' types using additionalProperties to specify the value types.  (Keys are always strings.)
',
--- ...

The nested comments are there!

?????

Gotta stop for now!

^ Ah, spark-sql queries fail too. SHOW CREATE TABLE works, but not selects.

At this point, we could

  • update and use my old MR but it will only work for top level fields.
  • Just drop it and push for Refine to Iceberg where this is not a problem.

We should probably still finish and merge your work to add schemas for TransformFunctions though, that is quite nice and will help if/when we do T417176.

Just drop it and push for Refine to Iceberg where this is not a problem.

Could you elaborate why this doesn't affect Iceberg?

For Hive tables, we still use Hive JDBC to evolve the tables (T209453).

Iceberg can use Spark SQL API. Spark SQL API accepts DDL to save the nested field comments.

Got it.

+1 to live with this for Hive, and to invest in making it nicer with Iceberg moving forward.

We are currently running Spark 3.1.2, which has a limitation: all Hive-Parquet tables are V1 tables.

V1 table reads go through HiveMetastoreCatalog.convertToLogicalRelation(), which converts a HiveTableRelation into a LogicalRelation backed by Spark's native Parquet reader. During this conversion, a strict datatype equality check is enforced as pointed by @Ottomata previously. This comparison uses dataType == dataType, which for StructType (nested columns) checks the full structure, including subfield metadata such as nested comments. If we manually add comments to nested columns in the Spark schema definition, the converted schema will differ from the Hive metastore schema, triggering the AnalysisException.

This is a limitation of the table V1 path that we cannot work around on Spark 3.1.2.

I also checked whether Parquet V2 tables would help in Spark 3.5.8.

The two blocking issues cited in SPARK-30627, when parquet was added back to spark.sql.sources.useV1SourceList (January 2020), remain unresolved in Spark 3.5.8 (missing scan metrics and no Dynamic Partition Pruning)

As a result, spark.sql.sources.useV1SourceList still defaults to "avro,csv,json,kafka,orc,parquet,text" in 3.5.8 (SQLConf.scala), and manually enabling Parquet V2 remains a non-trivial trade-off.

Given these constraints, the most straightforward path forward appears to be switching to Iceberg.

Closing the ticket. Redirecting to T333013

In between, what we may do is a custom Datahub import job which would read from ESC and overload the table schemas before import. This would solve the Datahub part, not the spark console part (which we also aimed for usability).

I just want to point out that the Iceberg path is not without bugs either. AFAIK, the issue discussed in T209453#10634329 continues to be there in recent Iceberg versions. Just mentioning it so that we don't forget.

Change #1199473 abandoned by Aqu:

[analytics/refinery/source@master] Maintain Spark schema after Hive DDL operations

Reason:

https://phabricator.wikimedia.org/T307040#11626816

https://gerrit.wikimedia.org/r/1199473