Page MenuHomePhabricator

Research Spike: Better support for Eventlogging data on hive
Closed, ResolvedPublic13 Estimated Story Points

Description

Research Spike: Better support for Eventlogging data on hive

The biggest issue with EL data on hive is that unlike mysql we cannot create tables on top of new schemas automatically, that means that is harder to query the data than it could be.

Event Timeline

Kite (+Morphlines?) seems to make this way easier. I didn't get it working with partitions yet, but http://kitesdk.org/docs/1.1.0/Using-the-Kite-CLI-to-Create-a-Dataset.html (but with JSON instead of CSV) totally worked! :)

Just brainbounced with @JAllemandou. We some have ideas, am going to write them here.

First of all, this works! (Tested with Spark 2.1.)

val path = "/wmf/data/raw/eventlogging/eventlogging_PageContentSaveComplete/hourly/2017/03/14/00"
val rdd = sc.sequenceFile[Long, String](path).map(t => t._2)
val ds = spark.read.json(rdd)
ds.write.saveAsTable("otto.PageContentSaveComplete1")

That will actually create a Parquet backed table in Hive with the proper schema infered from the JSON field names. Pretty cool!

But, in order to handle schema changes properly, we think table creation should be done separately from data refinement. So, this would be a 2 job process:

EventLogging Hive Table Creation Job

Filters mediawiki.revision-create (or perhaps a new stream that contains meta.wikimedia.org schema changes only) for meta.wikimedia.org schema changes. Anytime a new schema revision is created, this job would get the content of all of that schema's revisions from the API, and then union the resulting object together to create one mega schema. If a table for this schema does not yet exist, then we will convert the json schema object into a Hive CREATE TABLE statement, and create a new table. Else if a table for this schema does exist, then we will diff the field names in the JSONschema with the ones in the table. We will generate an ALTER TABLE statement to add the any missing fields to the Hive table.

This does not support type changes in the JSONschema. We will not examine types. If someone changes types in a JSONSchema, their Hive queries will not work. We'll have to do some education about this.

We had considered dropping and recreating the Hive table when a JSONschema changes, but were worried that this might interfere with live running Hive queries. If the ALTER TABLE route doesn't work for us, then we can revisit the drop/create idea.

This job could be a streaming job that listens to mediawiki.revision-create, or a batch job that filters the data after it is imported in Hadoop.

EventLogging Refinement Job

EventLogging data is considerably smaller than webrequest, so we'd like to only refine this on a daily basis. Each schema would get its own coordinator, and new partitions would be inserted into the refined EventLogging tables with something like: spark select * from temp table insert into hive table. We may also want to provide a custom transformation function that would do some extra refinement steps.

Each refined EventLogging table would then be partitioned by revision, year, month, day.

For more information about how Hive and Parquet reconcile schema differences, see: http://spark.apache.org/docs/latest/sql-programming-guide.html#hiveparquet-schema-reconciliation

to figure out...

We should better understand how often schema changes happen, and how badly they break things when they happen (type changes are no good!). If type changes happen really often, then we might not be able to support multiple schema revisions in the same table, as I'd like to.

If we do end up making revision just a partition, then we need to educate EventLogging schema designers and so that they know not to ever do type changes.

Ottomata set the point value for this task to 12.

Filters mediawiki.revision-create

On second thought, this seems a little crazy to me. Perhaps we can just always import with revision as a partition in eventlogging raw data, and then during refinement, get distinct schema, revisions for a schema, look up each schema, then do the field comparision to generate an uber schema, and then recreate the refined table based on uber schema if there are new revisions.

spark -> hive partitions

Been doing a little experimenting. From https://issues.apache.org/jira/browse/SPARK-14927 and http://stackoverflow.com/questions/31341498/save-spark-dataframe-as-dynamic-partitioned-table-in-hive, I gather that while proper Hive style HDFS partitions (e.g. revision=5588433) can be added when using saveAsTable, the resulting created Hive table does not actually have partitions in the schema. One would have to have the hive table created beforehand with partition columns for this to work.

Just parking this here:

I tried to unionAll two EventLogging DataFrames, and got: org.apache.spark.sql.AnalysisException: unresolved operator 'Union;. But, 2 EventLogging RDDs worked fine via val rddUnion = rdd1 ++ rdd2. I was then able to:

val datasetUnion = sqlContext.read.json(rddUnion)
val emptyDatasetUnion = datasetUnion.filter("1 = 0")
emptyDatasetUnion.write.saveAsTable("otto.UnionSchema1")

This got me partitionless Hive table with union of all fields from both schemas.

Filters mediawiki.revision-create

On second thought, this seems a little crazy to me.

Yup, we should be aware that not every edit to a schema page on Meta corresponds to the creation of a new revision table for that schema. AIUI, the latter only happens once the new code (corresponding to a new revision) is deployed and starts sending data tagged with the new revision number. In this case, for example, there were 13 edits corresponding to only 3 table revisions in the database (16364296 16208085, 15777589). So it seems indeed preferable to base this only on the revision numbers that are actually being received.

In any case, I think there are cases where this kind of automatic UNION would be quite useful (with a prime example being this week's breaking change of the user agent field + renaming across all schemas).

It looks like this is on a good path, but as always, please don't make final decisions on these things without a stakeholder consultation about any user-facing breaking changes - in particular considering there has been a desire to ultimately make this new Hive-based solution the default and end support for the current setup.

@Tbayer don't worry! Right now we are just figuring out how to make this work. We know we want EventLogging data more easily queryable in Hive, but that doesn't mean we will turn off MySQL just because we figure it out.

Ok, after much spiking, here's the current route we'd like to go down.

We are pretty sure we can infer a good enough schema for each imported dataset using Spark's DataFrame JSON integration. We can translate from the inferred Spark schema to a Hive schema, and then execute CREATE statements when a table doesn't exist yet, or ALTER statements to add newly discovered fields to an existing table. We are pretty sure this works, because Spark will actually scan through the entire dataset it is reading, examine every JSON record, infer a schema for each, and then merge the resulting schema altogether. Once we create or update the Hive schema, we can use DataFrame insert into with partitions like:

val dfWithPartitions = df.withColumn("year", lit(year)).withColumn("month", lit(month)).withColumn("day", lit(day))
dfWithPartitions.write.mode("append").partitionBy("year", "month", "day").insertInto(tableName)

This works! As long as the Hive table schema has the proper partition column information, the partitionBy clause seems to properly write Parquet data into hive partitioned directories AND add the partitions to Hive.

The downside of this approach is this will only ever add fields to the schema that are actually present in the data. If there is a revision of a JSONSchema that has a bunch of optional fields that are never populated in data records, those fields will never be added to the Hive schema. This should be ok though, since if there are no records with these fields populated, they would all be NULL anyway.

The upside is that we don't need to be tied to a schema repository. If (this might be a BIG IF) the JSON inferred schemas are relatively similar (never changing types!) then the schemas we create and maintain in Hive should be sufficient to query any Parquet file created from the raw JSON data.

We can either call this spike task complete, because we are pretty sure we have a way forward, OR we can get around to actually implementing a hacky prototype. I'm inclined to make another task for the implementation. We might run into issues along the way and have to go back to the drawing board, but right now I think this should work!

Awesome ! I'm in favor of creating a new task for prototyping.
I'm also gently waving to @Ottomata to provide help if needed :)

Nuria changed the point value for this task from 12 to 13.Apr 11 2017, 3:07 PM

@Tbayer don't worry! Right now we are just figuring out how to make this work. We know we want EventLogging data more easily queryable in Hive, but that doesn't mean we will turn off MySQL just because we figure it out.

Thanks @Ottomata, but it wasn't just about giving a heads-up before turning off MySQL. It's about consulting stakeholders about any breaking changes that will affect them in the new setup on Hive that is being developed here, before final decisions are made about what it will look like - it looks like we are already in the implementation stage now (T162610)?

In general, thinking about the following questions may be useful:

  • Are there any things that developers will have to change about the way events are being sent, compared to current practices?
  • Are there any queries that analysts use which would be more difficult (or impossible) in the new setup?

More specific examples (after skimming the above discussion):

  • Will it remain possible to add / remove fields when updating a schema to a new revision?
  • Will it cause any problems if several different revisions of the same schema are active at the same time (i.e. are sending events during the same year+month+day+hour)?

Separately, regarding the performance advantages that this will hopefully bring: Will it be possible to have hourly partitions too, instead of daily ones? (That seems to be the standard for our current Hive tables, and on the other hand there were cases in the past where querying a full day's worth of EL data in MySQL was already causing performance issues.)