Page MenuHomePhabricator

Implement EventLogging Hive refinement
Closed, ResolvedPublic21 Story Points

Description

Now that we've got a general plan from T153328, we should implement the thang! This ticket will track that work.

Related Objects

Event Timeline

Ottomata created this task.Apr 10 2017, 2:55 PM
Restricted Application added a subscriber: Aklapper. · View Herald TranscriptApr 10 2017, 2:55 PM
Ottomata moved this task from Next Up to In Progress on the Analytics-Kanban board.
Ottomata claimed this task.
Ottomata added a comment.EditedApr 18 2017, 7:52 PM

Working on this. It's getting funky! Talked with Joseph for an hour today about an issue, and I've got things to write down before they fall out of my brain.

I've solved our new fields in struct type problems by doing another pass over the data. DataFrameReader allows you to specify a schema to use while reading. So, the steps are as follows:

  1. inputSchema = add_partitions(sql.read.json(path), partitions).schema This passes all records in this path and infers a schema.
  2. hiveTable = prepareHiveTable(tableName, partitions, inputSchema) This will either CREATE a new table or ALTER the existing to add fields. This will fail if there is a change in the event (struct) schema. We'll have to be smart here. Either we DROP and re-CREATE the table (and REPAIR partitions) on struct changes, or we figure out how to run the CHANGE COLUMN as part of the ALTER. I think this is possible.
  3. uberSchema = hiveTable.schema
  4. inputDF = sql.read.schema(uberSchema).json(path) // This re-reads the input data, this time with our Hive table's uberSchema
  5. outputDF = add_partitions(inputDF, partitions)
  6. outputDF.write.mode("overwrite").partitionBy(partitionName:_*).insertInto(tableName)

This mostly works! It seems to solve the problem of partition fields being placed incorrectly if field counts don't match between partitions. However, there is this problem: https://issues.apache.org/jira/browse/HIVE-10086

In my testing, I can select new fields out of struct fields IFF I don't try to access the newer field in older data that doesn't have it. Say I have the following:

day=1 schema: event struct<a:string> and day=2 schema: event struct<a:string,b:string>. With the above algorithm (assuming we can CHANGE COLUMN to add column b to the Hive table), we get a fully specified Hive table schema, and the data for day=2 writes successfully. I can do queries like select event.a from table where day > 0, but if I want to access column event.b, I have to make sure I only select partitions in which column event.b exists in the Parquet file schema. This includes select *. So, this means that for any evolved EventLogging schema in which a field has been added (to event schema, not capsule), select * will not work, nor will select event.newfield, at least not across multiple partitions with different structs in the Parquet schemas.

HIVE-10086 is supposed to fix this bug, but it was resolved for Hive 1.2.0. and we are running Hive 1.1.0. :(

Or, we could flatten :(

OOf, actually:

  1. inputDF = sql.read.schema(uberSchema).json(path) // This re-reads the input data, this time with our Hive table's uberSchema

This doesn't work as stated, because at this point, we've 'normalied' uberSchema so that all fields are lowercased. This means that when reading, the field names don't match up with the camel cased JSON fields. AGHHHHH.

Thanks for the report @Ottomata !
About the normalization and camel cased fields, we could do it in 2 passes (again). It's not clean, but might work:

    1. Merge schemas without normalization: if normalized fields are equal, keep the non-normalized one
  1. Re-read the data with this schema
  2. Convert the data to the normalized schema (exact same number of fields, but normalized)
gerritbot added a subscriber: gerritbot.

Change 201009 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/refinery@master] Add oozie util workflow to launch spark jobs

https://gerrit.wikimedia.org/r/201009

Nuria moved this task from In Progress to Paused on the Analytics-Kanban board.May 25 2017, 3:52 PM
Ottomata moved this task from Paused to In Progress on the Analytics-Kanban board.Jul 26 2017, 3:03 PM

@joal, FYI: I've been testing with the following commands:

eventlogging
spark-submit \
--master yarn \
--deploy-mode client \
--executor-cores 6 \
--name JsonRefineEventloggingTest \
--class org.wikimedia.analytics.refinery.job.JsonRefine \
~/refinery-source/refinery-job/target/refinery-job-0.0.49-SNAPSHOT.jar \
--input-base-path /wmf/data/raw/eventlogging \
--database otto_json_refine_test \
--output-base-path /user/otto/external/event00 \
--done-flag _REFINED \
--input-regex '.*eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' \
--input-capture 'table,year,month,day,hour' \
--table-blacklist '^Edit$' \
--since 32 \
--send-email-report \
--to-emails otto@wikimedia.org
eventbus
spark-submit \
--master yarn \
--deploy-mode client \
--name JsonRefineEventBusTest \
--class org.wikimedia.analytics.refinery.job.JsonRefine \
~/refinery-source/refinery-job/target/refinery-job-0.0.49-SNAPSHOT.jar \
--input-base-path /wmf/data/raw/event \
--database otto_json_refine_test \
--output-base-path /user/otto/external/event00 \
--done-flag _REFINED \
--input-regex '.*(eqiad|codfw)_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' \
--input-capture 'datacenter,table,year,month,day,hour' \
--since 4 \
--send-email-report \
--to-emails otto@wikimedia.org
Nuria moved this task from In Progress to Paused on the Analytics-Kanban board.Sep 14 2017, 4:19 PM

Change 201009 merged by Ottomata:
[analytics/refinery@master] Add oozie util workflow to launch spark jobs

https://gerrit.wikimedia.org/r/201009

Change 387838 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Refine Eventlogging analytics and eventbus data into Hive tables

https://gerrit.wikimedia.org/r/387838

Mentioned in SAL (#wikimedia-operations) [2017-11-01T19:05:06Z] <otto@tin> Started deploy [analytics/refinery@6d11d67]: Deploying refinery-source artifacts for 0.0.54 for JsonRefine job, T162610

Mentioned in SAL (#wikimedia-analytics) [2017-11-01T19:05:33Z] <ottomata> deploying refinery with refinery/source 0.0.54 for JsonRefine job T162610

Mentioned in SAL (#wikimedia-operations) [2017-11-01T19:08:51Z] <otto@tin> Finished deploy [analytics/refinery@6d11d67]: Deploying refinery-source artifacts for 0.0.54 for JsonRefine job, T162610 (duration: 03m 45s)

Change 387838 merged by Ottomata:
[operations/puppet@production] Refine Eventlogging analytics and eventbus data into Hive tables

https://gerrit.wikimedia.org/r/387838

Change 387860 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Run JsonRefine job in yarn deploy mode cluster and provide hive-site.xml

https://gerrit.wikimedia.org/r/387860

Change 387860 merged by Ottomata:
[operations/puppet@production] Run JsonRefine job in yarn deploy mode cluster and provide hive-site.xml

https://gerrit.wikimedia.org/r/387860

Looking goooood!

Remaining tasks:

  • purging in Hadoop after 90 days. We'll want to get smart purging in as a new task.
  • send an announcement. Let's let this go until next week and announce then.
Nuria closed this task as Resolved.Dec 12 2017, 5:12 PM