Page MenuHomePhabricator

Refine should report about malformed records and continue if possible
Closed, ResolvedPublic

Description

Motive

Re-running failed refine jobs is one of the more time consuming ops-week tasks. The way we do it leaves us open to discarding unknown amounts of data (when we drop malformed, we don't check what percentage of the total number of records were dropped). Some of this can probably be automated.

Proposal

  1. Change refine to run in PERMISSIVE mode by default.
  2. Check the percentage of records dropped, and, if it's above K (K = 1?) then alert.

This should eliminate most of the day to day intervention into refine and alert us when something more actionable is going on.

Event Timeline

razzi moved this task from Incoming to Ops Week on the Analytics board.

On using count to check how many dropped-rows: we could forget that the dataset also goes through deduplication and rows could be dropped for that reason as well.

Ottomata renamed this task from Improve Refine to Refine should always DROPMALFORMED but alert if records are dropped.Dec 7 2020, 4:11 PM
Ottomata added a project: Analytics-Kanban.
Ottomata moved this task from Next Up to In Progress on the Analytics-Kanban board.

If the job fails altogether, try it again in X minutes (X = 30?). If it fails again, alert.

Retrying after dropped records wouldn't change the result. If we want to run with DROPMALFORMED by default we should just alert on this fact, and make it an ops week task to respond and make sure everything is ok.

I'm not sure this is possible!

We'd need to do multiple passes on the data. Once to read the data in PERMISSIVE mode, to know the number of records that exist in the input data, and then again to actually run with DROPMALFORMED. However, even if we did that, as Joseph says, we'd need to be able to run the same transform & filter functions we run on the first pass as the second pass to know the total number of expected records. In PERMISSIVE mode, there's no guarantee that we could even run the transform functions successfully or consistently on the input data if it had records with malformed data in it.

So, we could set DROPMALFORMED by default, but we'd have no way of knowing how much is dropped, and could silently lose data.

Oo, I just had another idea!

Set the mode to PERMISSIVE by default.

PERMISSIVE : sets other fields to null when it meets a corrupted record, and puts the malformed string into a new field configured by columnNameOfCorruptRecord. When a schema is set by user, it sets null for extra fields.

In PERMISSIVE mode, records with malformed data would end up having all fields set to NULL, but with an extra column which would have the name of the original column that had corrupt data in it. E.g. something like

f1   | f2   | corrupted_column_name
-----------------------------
NULL | NULL | f1
a    | b    | NULL

We could then add a step to Refine that checks for any records where corrupted_column_name IS NOT NULL and report on them. Refine then could filter these records itself, effectively resulting in the same DataFrame that would have been returned using DROPMALFORMED.

@JAllemandou, @Milimetric thoughts?

Ops-weekers would still have to manually check on why the field was corrupt in the source data, but we they wouldn't necessarily have to rerun the Refine job.

Huh, this is actually pretty cool. It works!

import spark.implicits._
import org.apache.spark.sql.types._

val schema = StructType(Seq(
    StructField("a", StructType(Seq(StructField("A", IntegerType)))),
    StructField("b", StructType(Seq(StructField("B", IntegerType)))),
    StructField("_corrupt_record", StringType) // default spark.sql.columnNameOfCorruptRecord
))

val records = Seq(
     // good
    """{"a": {"A": 123}, "b": {"B": 456}}""",
    // column a is an integer instead of a struct
    """{"a": 1, "b": {"B": 789}}""",
    // colums b and a are integers instead of structs
    """{"a": 1, "b": 2}""" 
)

val inputDf = spark.read
    .options(Map("mode" -> "PERMISSIVE"))
    .schema(schema)
    .json(records.toDS)

// Records with any bad data have the full original JSON object saved
// as a string in the _corrupt_record column.  All other fields are set to null
inputDf.show
+-----+-----+--------------------+
|    a|    b|     _corrupt_record|
+-----+-----+--------------------+
|[123]|[456]|                null|
| null| null|{"a": 1, "b": {"B...|
| null| null|    {"a": 1, "b": 2}|
+-----+-----+--------------------+

// Get the bad data into its own DataFrame.  We can even save this as a side affect or side table somewhere!
// Maybe in a single table with some extra information about the refine target as a columns?
// E.g. inputPath, outputPath, outputTable name, partition values, etc.
// We could then send an email report now, or generate an hourly alert summing up all refines that had malformed
// data in a given hour based on data in this table.
val corruptDf = inputDf.select("_corrupt_record").where("_corrupt_record IS NOT NULL")
corruptDf.show
+--------------------+
|     _corrupt_record|
+--------------------+
|{"a": 1, "b": {"B...|
|    {"a": 1, "b": 2}|
+--------------------+

// But we'd only write the non-malformed data to the final refine target.
val outputDf = inputDf.where("_corrupt_record IS NULL").drop("_corrupt_record")
outputDf.show
+-----+-----+
|    a|    b|
+-----+-----+
|[123]|[456]|
+-----+-----+

Change 647092 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/refinery/source@master] [WIP] Refine using PERMISSIVE mode and capture corrupt records

https://gerrit.wikimedia.org/r/647092

Ottomata renamed this task from Refine should always DROPMALFORMED but alert if records are dropped to Refine should report about malformed records and continue if possible.Dec 8 2020, 10:11 PM
Ottomata updated the task description. (Show Details)

Ok, in https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/647092 I've made logging and error reporting about corrupt records better.

The job won't (by default) 'continue if possible', as is it will still require manual intervention. However, the ops week-er should now have better information in the refine failed email report to make a decision if the job should just be re-run.

I started doing some fancier stuff with trying to make a side output of corrupt records for later review, and/or making Refine write to a 'statistics' table, kinda like webrequest_sequence_stats. We can do that, but I we should discuss more together about how that system should look.

Change 647092 merged by jenkins-bot:
[analytics/refinery/source@master] Refine using PERMISSIVE mode and log more info about corrupt records

https://gerrit.wikimedia.org/r/647092

Change 654308 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Bump refine jar version to refinery-job 0.0.143

https://gerrit.wikimedia.org/r/654308

Change 654308 merged by Razzi:
[operations/puppet@production] Bump refine jar version to refinery-job 0.0.143

https://gerrit.wikimedia.org/r/654308