We just closed T197000: Modify revision-score schema so that model probabilities won't conflict and started Refining revision-score events. The new revision-score schema has an array of scores, each with a struct schema.
Currently, our schema merging code will only merge sub struct schemas, NOT arrays that have StructType elements. We need to add detection for array fields with StructType elements, and then merge those element StructTypes as well.
This was detected with the following error. You can reproduce this with the following command as well:
sudo -u hdfs /usr/bin/spark2-submit \ --name refine_eventlogging_eventbus_otto_revision_score --driver-java-options='-Drefine.log.level=DEBUG' \ --class org.wikimedia.analytics.refinery.job.refine.Refine \ --files /etc/hive/conf/hive-site.xml,/etc/refinery/refine/refine_eventlogging_eventbus.properties,/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar,/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar --conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:hive-jdbc-1.1.0-cdh5.10.0.jar:hive-service-1.1.0-cdh5.10.0.jar --conf spark.dynamicAllocation.maxExecutors=64 \ /srv/deployment/analytics/refinery/artifacts/org/wikimedia/analytics/refinery/refinery-job-0.0.75.jar \ --config_file /etc/refinery/refine/refine_eventlogging_eventbus.properties \ --since 2018-11-26T18:00:00 --until 2018-11-26T19:00:00 \ --table_whitelist_regex='^mediawiki_revision_score$' --ignore_failure_flag=true ... 18/11/26 22:54:57 DEBUG Refine: Applying org.wikimedia.analytics.refinery.job.refine.deduplicate_eventbus$@157856e0 to `event`.`mediawiki_revision_score` (datacenter="eqiad",year=2018,month=11,day=26,hour=18) 18/11/26 22:54:57 DEBUG deduplicate_eventbus: Dropping duplicates based on `meta.id` field in `event`.`mediawiki_revision_score` (datacenter="eqiad",year=2018,month=11,day=26,hour=18) 18/11/26 22:55:02 DEBUG HiveExtensions: Converting DataFrame using SQL query: SELECT database AS database, NAMED_STRUCT('domain', meta.domain, 'dt', meta.dt, 'id', meta.id, 'request_id', meta.request_id, 'schema_uri', meta.schema_uri, 'topic', meta.topic, 'uri', meta.uri) AS meta, page_id AS page_id, page_namespace AS page_namespace, page_title AS page_title, rev_id AS rev_id, rev_parent_id AS rev_parent_id, rev_timestamp AS rev_timestamp, CAST(scores AS array<struct<model_name:string,model_version:string,prediction:array<string>,probability:array<struct<name:string,value:double>>>>) AS scores, datacenter AS datacenter, year AS year, month AS month, day AS day, hour AS hour FROM t_fd3875eb220e422793b7373f1c7a1718 18/11/26 22:55:02 ERROR Refine: Failed refinement of dataset hdfs://analytics-hadoop/wmf/data/raw/event/eqiad_mediawiki_revision-score/hourly/2018/11/26/18 -> `event`.`mediawiki_revision_score` (datacenter="eqiad",year=2018,month=11,day=26,hour=18). org.apache.spark.sql.AnalysisException: cannot resolve 't_fd3875eb220e422793b7373f1c7a1718.`scores`' due to data type mismatch: cannot cast array<struct<error:struct<message:string,type:string>,model_name:string,model_version:string,prediction:array<string>,probability:array<struct<name:string,value:double>>>> to array<struct<model_name:string,model_version:string,prediction:array<string>,probability:array<struct<name:string,value:double>>>>; line 1 pos 366; 'Project [database#186 AS database#315, named_struct(domain, meta#187.domain, dt, meta#187.dt, id, meta#187.id, request_id, meta#187.request_id, schema_uri, meta#187.schema_uri, topic, meta#187.topic, uri, meta#187.uri) AS meta#316, page_id#188L AS page_id#317L, page_namespace#189L AS page_namespace#318L, page_title#190 AS page_title#319, rev_id#191L AS rev_id#320L, rev_parent_id#192L AS rev_parent_id#321L, rev_timestamp#193 AS rev_timestamp#322, cast(scores#194 as array<struct<model_name:string,model_version:string,prediction:array<string>,probability:array<struct<name:string,value:double>>>>) AS scores#323, datacenter#195 AS datacenter#324, year#196L AS year#325L, month#197L AS month#326L, day#198L AS day#327L, hour#199L AS hour#328L] +- SubqueryAlias t_fd3875eb220e422793b7373f1c7a1718 +- Repartition 3, true +- LogicalRDD [database#186, meta#187, page_id#188L, page_namespace#189L, page_title#190, rev_id#191L, rev_parent_id#192L, rev_timestamp#193, scores#194, datacenter#195, year#196L, month#197L, day#198L, hour#199L], false at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:93) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694) at org.wikimedia.analytics.refinery.spark.sql.HiveExtensions$DataFrameExtensions.convertToSchema(HiveExtensions.scala:631) at org.wikimedia.analytics.refinery.spark.connectors.DataFrameToHive$.apply(DataFrameToHive.scala:155) at org.wikimedia.analytics.refinery.job.refine.Refine$$anonfun$refineTargets$1.apply(Refine.scala:441) at org.wikimedia.analytics.refinery.job.refine.Refine$$anonfun$refineTargets$1.apply(Refine.scala:436) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.wikimedia.analytics.refinery.job.refine.Refine$.refineTargets(Refine.scala:436) at org.wikimedia.analytics.refinery.job.refine.Refine$$anonfun$8.apply(Refine.scala:347) at org.wikimedia.analytics.refinery.job.refine.Refine$$anonfun$8.apply(Refine.scala:341) at scala.collection.parallel.AugmentedIterableIterator$class.map2combiner(RemainsIterator.scala:115) at scala.collection.parallel.immutable.ParHashMap$ParHashMapIterator.map2combiner(ParHashMap.scala:76) at scala.collection.parallel.ParIterableLike$Map.leaf(ParIterableLike.scala:1054) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Map.tryLeaf(ParIterableLike.scala:1051) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341) at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673) at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443) at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426) at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56) at scala.collection.parallel.ExecutionContextTasks$class.executeAndWaitResult(Tasks.scala:558) at scala.collection.parallel.ExecutionContextTaskSupport.executeAndWaitResult(TaskSupport.scala:80) at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 18/11/26 22:55:02 ERROR Refine: The following 1 of 1 dataset partitions for table `event`.`mediawiki_revision_score` failed refinement: Failure(org.wikimedia.analytics.refinery.job.refine.RefineTargetException: Failed refinement of hdfs://analytics-hadoop/wmf/data/raw/event/eqiad_mediawiki_revision-score/hourly/2018/11/26/18 -> `event`.`mediawiki_revision_score` (datacenter="eqiad",year=2018,month=11,day=26,hour=18). Original exception: org.apache.spark.sql.AnalysisException: cannot resolve 't_fd3875eb220e422793b7373f1c7a1718.`scores`' due to data type mismatch: cannot cast array<struct<error:struct<message:string,type:string>,model_name:string,model_version:string,prediction:array<string>,probability:array<struct<name:string,value:double>>>> to array<struct<model_name:string,model_version:string,prediction:array<string>,probability:array<struct<name:string,value:double>>>>; line 1 pos 366; 'Project [database#186 AS database#315, named_struct(domain, meta#187.domain, dt, meta#187.dt, id, meta#187.id, request_id, meta#187.request_id, schema_uri, meta#187.schema_uri, topic, meta#187.topic, uri, meta#187.uri) AS meta#316, page_id#188L AS page_id#317L, page_namespace#189L AS page_namespace#318L, page_title#190 AS page_title#319, rev_id#191L AS rev_id#320L, rev_parent_id#192L AS rev_parent_id#321L, rev_timestamp#193 AS rev_timestamp#322, cast(scores#194 as array<struct<model_name:string,model_version:string,prediction:array<string>,probability:array<struct<name:string,value:double>>>>) AS scores#323, datacenter#195 AS datacenter#324, year#196L AS year#325L, month#197L AS month#326L, day#198L AS day#327L, hour#199L AS hour#328L] +- SubqueryAlias t_fd3875eb220e422793b7373f1c7a1718 +- Repartition 3, true +- LogicalRDD [database#186, meta#187, page_id#188L, page_namespace#189L, page_title#190, rev_id#191L, rev_parent_id#192L, rev_timestamp#193, scores#194, datacenter#195, year#196L, month#197L, day#198L, hour#199L], false