Page MenuHomePhabricator

HiveExtensions.convertToSchema does not properly convert arrays of structs
Open, HighPublic

Description

StructTypes need to be recursively merged and reordered via SQL. This is done for StructType fields, but it is not done for ArrayTypes with elementType == StructType.

Since no checking is done for ArrayType, the SQL for an array of struct field will look like:

CAST(d AS ARRAY<STRUCT<`db_string`: STRING, `da_long`: BIGINT>>) AS d

Which will not work if the source schema has the struct fields in a different order, e.g. db_long, db_string.

This is causing bad refined data for mediawiki_cirrussearch_request, as reported in https://phabricator.wikimedia.org/P12200.

We need to make convertToSchema smarter about converting an array of structs.

Event Timeline

Change 619034 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/refinery/source@master] [WIP] Fix for convertToSchema with array of structs

https://gerrit.wikimedia.org/r/619034

Attempt at https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/619034, but it doesn't quite work.

That changes the SQL to:

ARRAY(NAMED_STRUCT('db_string', d.db_string, 'da_long', d.da_long)) AS d

Which seems correct to me, but I am getting:

cannot resolve 'CAST(t_2c1aea2d83d849f5a74834554454c39d.`d` AS BIGINT)' due to data type mismatch: cannot cast array<struct<da_long:bigint,db_string:string>> to bigint; line 1 pos 15;
'Project [b#5L AS b#13L, cast(d#7 as bigint) AS d#14, named_struct(aa, cast(a#4.aa as array<int>), ab, cast(a#4.ab as map<string,string>), ad, cast(null as bigint), ac, a#4.ac, ae, array(named_struct(aea, cast(a#4.ae.aea as string))), af, cast(a#4.af as map<struct<afa:bigint>,struct<afb:bigint,afc:string>>)) AS a#15, c#6 AS c#16, array(named_struct(db_string, d#7.db_string, da_long, d#7.da_long)) AS d#17]

which I don't yet understand. Why does it think I want to cast d to a BIGINT? I see in the project: array(named_struct(db_string, d#7.db_string, da_long, d#7.da_long)) AS d#17, so I dunno.

This was noticed by @EBernhardson this week as I merged the fix for T255818: Refine drops $schema field values on Monday. I'm no longer merging (and properly reordering?) the struct fields at read time with the Hive schema, so the bug manifests itself as I'm now calling convertToSchema from a DataFrame with its struct fields our of order in the array.

I think I'm going to have to revert and backfill mediawiki_cirrussearch_requeust since Monday. I don't know if this is affecting other data.

Change 618825 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Revert "Bump refine job refinery version to 0.0.132 to fix $schema field bug"

https://gerrit.wikimedia.org/r/618825

Change 618825 merged by Ottomata:
[operations/puppet@production] Revert "Bump refine job refinery version to 0.0.132 to fix $schema field bug"

https://gerrit.wikimedia.org/r/618825

Launching backfill:

sudo -u analytics /usr/bin/spark2-submit \
--name refine_event_backfill_cirrussearch_request \
--class org.wikimedia.analytics.refinery.job.refine.Refine \
--files /etc/hive/conf/hive-site.xml,/etc/refinery/refine/refine_event.properties,/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar,/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar --master yarn --deploy-mode cluster --queue production --driver-memory 8G --executor-memory 4G --conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:hive-jdbc-1.1.0-cdh5.10.0.jar:hive-service-1.1.0-cdh5.10.0.jar --conf spark.dynamicAllocation.maxExecutors=64  \
--principal analytics/an-launcher1002.eqiad.wmnet@WIKIMEDIA --keytab /etc/security/keytabs/analytics/analytics.keytab \
/srv/deployment/analytics/refinery/artifacts/org/wikimedia/analytics/refinery/refinery-job-0.0.129.jar \
--config_file refine_event.properties --table_whitelist_regex=mediawiki_cirrussearch_request --since=2020-08-03T13:00:00 --until=2020-08-07T20:00:00 --ignore_done_flag=true --ignore_failure_flag=true

application_1596639839773_18212

I don't know if this is affecting other data.

Is the hits field the one affected? https://schema.wikimedia.org/repositories//primary/jsonschema/mediawiki/cirrussearch/request/0.0.1.yaml

hits:
   description: Final set of result pages returned for the CirrusSearch request
   type: array
   items:
     type: object
     additionalProperties: false
     properties:
       page_title:
         description: MediaWiki page title of the result
         type: string
       page_id:
         description: MediaWiki page id of the result. May be -1 for interwiki results
         type: integer
       index:
         description: ElasticSearch index this result came from
         type: string
       score:
         description: Score from ElasticSearch for this result
         type: number
       profile_name:
         description: The profile name for comp_suggest queries
         type: string

which I don't yet understand. Why does it think I want to cast d to a BIGINT?

Ah, because I had a field in the test already called d, doh.

Ok getting closer, but something is still not quite right with my fix. Calling it a day.

I'll check on the backfill on Monday.

Is the hits field the one affected?

In this case it is the elasticsearch_requests[].hits_returned vs elasticsearch_requests[].query (and also other fields out of order). The value of hits_returned (an int) is being stuck in the query field and cast to a string.

20/08/08 02:30:22 INFO Refine: Successfully refined 103 of 103 dataset partitions into table `event`.`mediawiki_cirrussearch_request` (total # refined records: 1129822940)

Looks better:

select elasticsearch_requests from mediawiki_cirrussearch_request where year=2020 and month=8 and day=3 and hour=17 limit 10\G
...
elasticsearch_requests | [{query=The plot against , query_type=comp_suggest, indices=[enwiki_titlesuggest], namespaces=[0], request_time_ms=27, search_time_ms=0, limit=null, hits_total=14, hits_returned=6, hits_offset=0, ...
mforns triaged this task as High priority.
mforns moved this task from Incoming to Smart Tools for Better Data on the Analytics board.
mforns added a project: Analytics-Kanban.

AHHHH RATS. I don't think Refine can do this, at least not with reading the incoming data with the merged Hive schema. ARRAY doesn't do what I had hoped, of course (it just makes an array of something, it isn't a 'named_struct' type of function.

I've tried many many different ways of using SQL to select from one array of structs into another ordered by field name, but I have not succeeded.

I think we need to find another solution. I'll follow up with Joseph here when he gets back.

Can you use spark higher order functions, particularly transform(array<T>, function<T, U>): array<U> ? This effectively maps a function over the array, which could be a named struct?

Change 619496 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Refine - bump version to 0.0.132, but default to not merging Hive schemas

https://gerrit.wikimedia.org/r/619496

Change 619496 merged by Ottomata:
[operations/puppet@production] Refine - bump version to 0.0.132, but default to not merging Hive schemas

https://gerrit.wikimedia.org/r/619496

Can you use spark higher order functions, particularly transform(array<T>, function<T, U>): array<U> ? This effectively maps a function over the array, which could be a named struct?

Good call @EBernhardson :)

From what I gathered in the code reviews, this is easier in Spark 3 and we're waiting for that, correct?

I can't 100% recall but I believe that is correct.

Indeed we're waiting for Spark3 - We'll be able to make this happen soon !

JAllemandou claimed this task.