In order to do our best at handling backwards incompatible data in EventLogging, when reading raw input JSON data during Refine, we do so with a schema made by merging the Hive tables schema + event JSONSchema. We assume that the Hive table has all potential fields it has ever seen, and the JSONSchema might have new ones.
When we merge the schemas, we also normalize them to avoid casing (and other) differences between SQL and non-SQL systems. Our field normalization also converts SQL incompatible chars in field names to '_'. So, $schema becomes _schema. When this merged schema is used to read the JSON data, it doesn't have a $schema field, and as such that field in the JSON data is lost.
In MEP, we don't really need to merge the JSONSchema with the Hive table schema anymore to read the JSON data anymore, so we should probably stop doing that eventually. However we do need to merge and normalize the event schema with the Hive table schema, in order to successfully write into it. If we do this now, we'd end up with a DataFrame that has two _schema fields in it: one all NULL from Hive, and one with real schema URIs from raw data.
We definitely need to avoid normalizing the schema before reading. Once we do that, @joal and I came up with possible solutions to solve the double _schema field problem.
- Easy fix: Drop all _schema columns from all Hive tables. The next time data is refined, the actual $schema will be read in with real values, and then the field name will be normalized to _schema before writing.
- Correct fix: When merging, keep track of what fields get name changes due to normalization, and drop any columns from the Hive side DataFrame that will be normalized. This effectively chooses the input side normalized column over the Hive side one.
The correct fix sounds great, but will be difficult to implement for nested struct fields. We have to somehow recurse into a DataFrame and rebuild it using a new schema with a different number of fields, or figure how to normalize by recursively renaming columns in a DataFrame, not just a StructType schema like we do now.