Page MenuHomePhabricator

[metrics platform] Event sanitization fails when processing monoschema events
Closed, ResolvedPublic5 Estimated Story Points

Description

Recently, we added the first monoschema stream to the event_sanitized_analytics_allowlist. See change.
We did that, to keep the non-privacy-sensitive wikifunctions_ui data for longer than 3 months.
However, with the new addition, the sanitization process fails with:

org.wikimedia.analytics.refinery.job.refine.RefineTargetException:
Failed refinement of hdfs://analytics-hadoop/wmf/data/event/wikifunctions_ui/datacenter=eqiad/year=2023/month=10/day=17/hour=9 -> `event_sanitized`.`wikifunctions_ui` /wmf/data/event_sanitized/wikifunctions_ui/datacenter=eqiad/year=2023/month=10/day=17/hour=9.
Original exception: java.lang.IllegalArgumentException: Invalid salt or allowlist value for map key 'resulthaserror'.

This error is thrown by SanitizeTransformation.scala when trying to compile the allow-list into a sanitization mask.

/**
  * Returns a sanitization mask (compiled allowlist) for a given MapType and allowlist.
  * As opposed to the StructMask (that uses implicit indexes), this mask uses lookups
  * to determine which fields to keep or purge. The reason being that Maps do not
  * guarantee the order their elements are iterated.
  * Thus, Maps are less performant than Structs in this case.
  *
  * This function also validates that the given allowlist is correctly defined.
  */
def getMapMask(
    map: MapType,
    allowlist: Allowlist,
    salt: Option[String] = None
): MaskNode = {
    MapMaskNode(
        map.valueType match {
            case MapType(_, _, _) => allowlist.map { case (key, value) =>
                // The allowlist for this field indicates the field is nested.
                // Build the MaskNode accordingly. If necessary, call recursively.
                value match {
                    case `keepAllTag` => key -> Identity()
                    case childAllowlist: Allowlist =>
                        key -> getMapMask(map.valueType.asInstanceOf[MapType], childAllowlist, salt)
                    case _ => throw new IllegalArgumentException(
                        s"Invalid allowlist value for map key '${key}'."
                    )
                }
            }
            case _ => allowlist.map { case (key, value) =>
                // The allowlist for this field indicates the field is simple (not nested).
                // Build the MaskNode accordingly.
                value match {
                    case `keepTag` => key -> Identity()
                    case `hashTag` if map.valueType == StringType && salt.isDefined => key -> Hash(salt.get)
                    case _ => throw new IllegalArgumentException(
                        s"Invalid salt or allowlist value for map key '${key}'."
                    )
                }
            }
        }
    )
}

It tries to match the sanitization allow-list spec with the data types coming from the Spark DataFrame schema. And it seems that Spark does not give us the expected type for nested maps, like custom_data.


The change has been reverted, the alerts have ceased, and the sanitization is running OK in production.
However, we need to fix this issue, otherwise monoschema streams can not use sanitization and keep data for longer than 3 months.

This change involves:

  • Reproducing the error by running wikifunctions_ui sanitization in a test setup
  • Finding out which type is Spark DataFrame schema giving to custom_data['resulthaserror']
  • Try to fix that or work it around
  • Add unit tests for the changes
  • Test that the whole process works in a test setup

Event Timeline

And it seems that Spark does not give us the expected type for nested maps, like custom_data.

That's strange. What's the DataType in the source event table? For the source event table, Refine gets the DataType via the event schema, which is done by the Spark JsonSchemaConverter. (BTW, we'd like to move this implementation into eventutilities-spark soon: T321854: [Event Platform] Move Spark JsonSchemaConverter out of analytics/refinery/source and into wikimedia-event-utilities)

Putting this 🍪 back in the jar while I work on the releasing the latest version of the JS client library for T350495: [EPIC] Deploy latest version of Metrics Platform client libraries.

xcollazo subscribed.

Just discussed this ticket with @mforns, and decided I could help out. Thus reassigning to myself.

Change 979406 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[analytics/refinery/source@master] Fix recursion for Maps with Structs on SanitizeTransformation

https://gerrit.wikimedia.org/r/979406

Ok this one was a bit painful to test, but I think it is now solved. Here are my notes:

  1. This is how the allowlist looks:
wikifunctions_ui:
    agent:
        client_platform_family: keep
...
    custom_data:
        edit:
            data_type: keep
            value: keep
        haserrors:
            data_type: keep
            value: keep
...
  1. Since this error happens on sanitation, the incoming schema is not from our json schemas, but from the schema of the table event.wikifunctions_ui, which looks like so:
spark-sql (default)> describe wikifunctions_ui;
col_name	data_type	comment
_schema	string	NULL
...
custom_data	map<string,struct<data_type:string,value:string>>	NULL
...
  1. From the stack shared in description, it was then clear that there was an issue in which the sanitation mask created in SanitizeTransformation was not able to recurse when the data at hand includes maps with struct values.
  1. On https://gerrit.wikimedia.org/r/979406, we modify the code so this case is now supported.

Additionally, we refactor the tests harness so that tree comparisons use a tree comparator, for correctness. This was previously alluded to here, but it got lost in translation.

  1. I tested this with production data as follows:

a) Copied code to a stat machine, then mvn package.
b) Ran the code with the following config:

/usr/bin/spark3-submit \
--name xcollazo_test_refine_wikifunctions_ui_0 \
--class org.wikimedia.analytics.refinery.job.refine.RefineSanitize \
--master local \
--deploy-mode client \
--conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar \
--driver-java-options='-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080' \
/home/xcollazo/wmf/gerrit/source/refinery-job/target/refinery-job-0.2.27-SNAPSHOT-shaded.jar \
--hive_server_url=an-coord1001.eqiad.wmnet:10000 \
--allowlist_path=/user/xcollazo/artifacts/wikifunctions_ui_allowlist.yaml \
--input_database=event \
--table_include_regex='wikifunctions_ui' \
--output_database=xcollazo_refine_test \
--output_path=/user/xcollazo/refine_test_output \
--since=10 --until=2

c) For allowlist at /user/xcollazo/artifacts/wikifunctions_ui_allowlist.yaml, I removed any hash fields as they were giving me permission errors to find the (secure) salts and those fields are unrelated to the original error.

d) Sanitize was happy with the manual run:

...
23/12/04 19:52:56 INFO DataFrameToHive: Wrote DataFrame to /user/xcollazo/refine_test_output/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=4/hour=9 and added partition `xcollazo_refine_test`.`wikifunctions_ui` /user/xcollazo/refine_test_output/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=4/hour=9
23/12/04 19:52:56 INFO Refine: Finished refinement of dataset hdfs://analytics-hadoop/wmf/data/event/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=4/hour=9 -> `xcollazo_refine_test`.`wikifunctions_ui` /user/xcollazo/refine_test_output/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=4/hour=9. (# refined records: 1)
23/12/04 19:52:56 INFO Refine: Successfully refined 16 of 16 dataset partitions into table `xcollazo_refine_test`.`wikifunctions_ui` (total # refined records: 73)

e) A couple sample queries confirm data looks legit and as expected:

spark-sql (default)> select custom_data from wikifunctions_ui limit 2;
custom_data
{"isnewzobject":{"data_type":"boolean","value":"false"},"loadtime":{"data_type":"number","value":"253"},"viewname":{"data_type":"string","value":"default-view"}}
{"isnewzobject":{"data_type":"boolean","value":"false"},"loadtime":{"data_type":"number","value":"527"},"viewname":{"data_type":"string","value":"default-view"}}
Time taken: 0.585 seconds, Fetched 2 row(s)


spark-sql (default)> select key, count(1) as count
                   > from (
                   >   select explode(custom_data)
                   >   from wikifunctions_ui
                   >   where year = 2023
                   > ) group by key;
key	count
isnewzobject	61
zobjectid	30
resulthaserror	12
zobjecttype	30
edit	18
loadtime	43
viewname	43
selectedfunctionzid	12
Time taken: 17.873 seconds, Fetched 8 row(s)

Change 979406 merged by jenkins-bot:

[analytics/refinery/source@master] Fix recursion for Maps with Structs on SanitizeTransformation

https://gerrit.wikimedia.org/r/979406

Change 980445 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Update the refinery version used by the refine jobs

https://gerrit.wikimedia.org/r/980445

Change 980923 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Update the refinery version used by the refine production jobs

https://gerrit.wikimedia.org/r/980923

Change 980445 merged by Btullis:

[operations/puppet@production] Update the refinery version used by the refine test jobs

https://gerrit.wikimedia.org/r/980445

@BTullis deployed the changes to the test cluster, and the airflow dag refine_webrequest_hourly_test_text looks healthy.

Change 980923 merged by Btullis:

[operations/puppet@production] Update the refinery version used by the refine production jobs

https://gerrit.wikimedia.org/r/980923

Mentioned in SAL (#wikimedia-analytics) [2023-12-18T09:22:57Z] <btullis> deploying refinery version 0.02.27 to production refinery jobs with https://gerrit.wikimedia.org/r/c/operations/puppet/+/980923 for T349121

Change 983740 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[analytics/refinery@master] Revert "Revert "Add the wikifunctions_ui metrics platform schema to the allowlist""

https://gerrit.wikimedia.org/r/983740

Change 983740 merged by Xcollazo:

[analytics/refinery@master] Revert "Revert "Add the wikifunctions_ui metrics platform schema to the allowlist""

https://gerrit.wikimedia.org/r/983740

@mforns helped to do an expedited deploy of static_data/sanitization/event_sanitized_analytics_allowlist.yaml into HDFS. (Thanks @mforns!)

Next time sanitization kicks in we should pick this up in prod.

Will keep this ticket open until a couple successful ingests.

Change 983946 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[operations/puppet@production] Bump refine_sanitize refinery version to pickup fix for T349121

https://gerrit.wikimedia.org/r/983946

Change 983946 merged by Btullis:

[operations/puppet@production] Bump refine_sanitize refinery version to pickup fix for T349121

https://gerrit.wikimedia.org/r/983946

Change 984247 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[analytics/refinery@master] For wikifunctions_ui sanitization, keep performer.name instead of performer.id

https://gerrit.wikimedia.org/r/984247

@mforns helped to do an expedited deploy of static_data/sanitization/event_sanitized_analytics_allowlist.yaml into HDFS. (Thanks @mforns!)

Next time sanitization kicks in we should pick this up in prod.

Will keep this ticket open until a couple successful ingests.

After this deployment, we started failing on another part of the code:

Code now fails with:

org.wikimedia.analytics.refinery.job.refine.RefineTargetException: Failed refinement of hdfs://analytics-hadoop/wmf/data/event/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=19/hour=9 -> `event_sanitized`.`wikifunctions_ui` /wmf/data/event_sanitized/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=19/hour=9. Original exception: java.lang.IllegalArgumentException: Invalid allowlist value hash for field `id` BIGINT

I investigated, and our sanitation process only allows STRINGs to be hashed. So this configuration is, as of today, user error:

performer:
    edit_count_bucket: keep
    id: hash          <<<<<<<<<< fails since BIGINT
    is_bot: keep
    is_logged_in: keep
    pageview_id: hash
    session_id: hash

@mforns suggest we could use the performer.name field instead:

Just for easier reading, the proposed (quick) solution is: keeping a hashed version of the username (field: name) instead of the hashed version of the user id (field: id). This would avoid the error and would be a 1-liner change.

performer.name does have NULLs, so we will test this approach before prod.

Note that we missed this whole thing since we had not tested the hashed fields in T349121#9380867. That is my bad!

(Slack thread for completeness: https://wikimedia.slack.com/archives/CSV483812/p1702998045787309)

Ok, similarly as in T349121#9380867, here is a test with new changes, but this time I do test the hash!:

a) Copied code to a stat machine, then mvn package.

b) Generate fake salt

hdfs dfs -mkdir /user/xcollazo/artifacts/fake_salt_folder
hdfs dfs -put ~/artifacts/fake_salt/2023100100_2024010100_2024022000 /user/xcollazo/artifacts/fake_salt_folder
xcollazo@stat1007:~/artifacts/fake_salt$ hdfs dfs -cat /user/xcollazo/artifacts/fake_salt_folder/2023100100_2024010100_2024022000
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa

c) Ran the code with the following config:

/usr/bin/spark3-submit \
--name xcollazo_test_refine_wikifunctions_ui_1 \
--class org.wikimedia.analytics.refinery.job.refine.RefineSanitize \
--master local \
--deploy-mode client \
--conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar \
--driver-java-options='-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080' \
/home/xcollazo/wmf/gerrit/source/refinery-job/target/refinery-job-0.2.27-SNAPSHOT-shaded.jar \
--hive_server_url=an-coord1001.eqiad.wmnet:10000 \
--allowlist_path=/user/xcollazo/artifacts/wikifunctions_ui_allowlist.yaml \
--input_database=event \
--table_include_regex='wikifunctions_ui' \
--output_database=xcollazo_refine_test \
--output_path=/user/xcollazo/refine_test_output \
--salts_path=/user/xcollazo/artifacts/fake_salt_folder \
--ignore_failure_flag=true \
--since=10 --until=2

d) Sanitize was happy with the manual run:

...
23/12/19 18:01:06 INFO DataFrameToHive: Wrote DataFrame to /user/xcollazo/refine_test_output/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=19/hour=9 and added partition `xcollazo_refine_test`.`wikifunctions_ui` /user/xcollazo/refine_test_output/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=19/hour=9
23/12/19 18:01:06 INFO Refine: Finished refinement of dataset hdfs://analytics-hadoop/wmf/data/event/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=19/hour=9 -> `xcollazo_refine_test`.`wikifunctions_ui` /user/xcollazo/refine_test_output/wikifunctions_ui/datacenter=eqiad/year=2023/month=12/day=19/hour=9. (# refined records: 11)
23/12/19 18:01:06 INFO Refine: Successfully refined 16 of 16 dataset partitions into table `xcollazo_refine_test`.`wikifunctions_ui` (total # refined records: 131)

e) A couple sample queries confirm data looks legit and as expected, and the hash is happy to process NULLs:

spark-sql (default)> show tables;
database	tableName	isTemporary
xcollazo_refine_test	wikifunctions_ui	false
Time taken: 0.081 seconds, Fetched 1 row(s)

spark-sql (default)> select count(1) from wikifunctions_ui;
count(1)
131

spark-sql (default)> select substring(performer.name, 0, 5), count(1) as count from wikifunctions_ui group by performer.name order by count DESC;
substring(performer.name, 0, 5)	count
NULL	70
49E90	34
C98DC	9
5B363	8
8B19C	8
6B335	2

I reviewed the test, and it looks great to me!

Change 984247 merged by Mforns:

[analytics/refinery@master] For wikifunctions_ui sanitization, keep performer.name instead of performer.id

https://gerrit.wikimedia.org/r/984247

All right, we stopped getting failure emails, and data started flowing to event_sanitized.wikifunctions_ui.

Did a backfill of last 90 days of data via:

sudo -u analytics kerberos-run-command analytics \
  /usr/local/bin/refine_event_sanitized_analytics_immediate \
  --ignore_failure_flag=true \
  --table_include_regex='wikifunctions_ui' \
  --since='2023-09-20T19:00:00.000Z' \
  --until='2023-12-19T19:00:00.000Z' \
  --verbose

It was successful in about ~3 hours.

Example available partitions:

spark-sql (default)> show partitions event_sanitized.wikifunctions_ui;
...
datacenter=eqiad/year=2023/month=12/day=9/hour=6
datacenter=eqiad/year=2023/month=12/day=9/hour=7
datacenter=eqiad/year=2023/month=12/day=9/hour=8
datacenter=eqiad/year=2023/month=12/day=9/hour=9
Time taken: 0.411 seconds, Fetched 3825 row(s)

I think we are finally done here! 🎉