Recently, we added the first monoschema stream to the event_sanitized_analytics_allowlist. See change.
We did that, to keep the non-privacy-sensitive wikifunctions_ui data for longer than 3 months.
However, with the new addition, the sanitization process fails with:
org.wikimedia.analytics.refinery.job.refine.RefineTargetException: Failed refinement of hdfs://analytics-hadoop/wmf/data/event/wikifunctions_ui/datacenter=eqiad/year=2023/month=10/day=17/hour=9 -> `event_sanitized`.`wikifunctions_ui` /wmf/data/event_sanitized/wikifunctions_ui/datacenter=eqiad/year=2023/month=10/day=17/hour=9. Original exception: java.lang.IllegalArgumentException: Invalid salt or allowlist value for map key 'resulthaserror'.
This error is thrown by SanitizeTransformation.scala when trying to compile the allow-list into a sanitization mask.
/** * Returns a sanitization mask (compiled allowlist) for a given MapType and allowlist. * As opposed to the StructMask (that uses implicit indexes), this mask uses lookups * to determine which fields to keep or purge. The reason being that Maps do not * guarantee the order their elements are iterated. * Thus, Maps are less performant than Structs in this case. * * This function also validates that the given allowlist is correctly defined. */ def getMapMask( map: MapType, allowlist: Allowlist, salt: Option[String] = None ): MaskNode = { MapMaskNode( map.valueType match { case MapType(_, _, _) => allowlist.map { case (key, value) => // The allowlist for this field indicates the field is nested. // Build the MaskNode accordingly. If necessary, call recursively. value match { case `keepAllTag` => key -> Identity() case childAllowlist: Allowlist => key -> getMapMask(map.valueType.asInstanceOf[MapType], childAllowlist, salt) case _ => throw new IllegalArgumentException( s"Invalid allowlist value for map key '${key}'." ) } } case _ => allowlist.map { case (key, value) => // The allowlist for this field indicates the field is simple (not nested). // Build the MaskNode accordingly. value match { case `keepTag` => key -> Identity() case `hashTag` if map.valueType == StringType && salt.isDefined => key -> Hash(salt.get) case _ => throw new IllegalArgumentException( s"Invalid salt or allowlist value for map key '${key}'." ) } } } ) }
It tries to match the sanitization allow-list spec with the data types coming from the Spark DataFrame schema. And it seems that Spark does not give us the expected type for nested maps, like custom_data.
The change has been reverted, the alerts have ceased, and the sanitization is running OK in production.
However, we need to fix this issue, otherwise monoschema streams can not use sanitization and keep data for longer than 3 months.
This change involves:
- Reproducing the error by running wikifunctions_ui sanitization in a test setup
- Finding out which type is Spark DataFrame schema giving to custom_data['resulthaserror']
- Try to fix that or work it around
- Add unit tests for the changes
- Test that the whole process works in a test setup