Page MenuHomePhabricator

Wire ORES recent_score events into Hadoop
Open, LowestPublic

Description

We'll import the mediawiki.revision-score stream in two steps:

Daily job to archive recent scores

The mediawiki.revision-score events will often include scores for several models. These scores are normalized into a minimal table ores.revision_score, and partitioned by wiki, model, and model_version so that they can be efficiently purged as new models are released.

There are streams coming from both the eqiad and codfw datacenters, and it's possible for scores to be redundant (even slightly different!) between the two, but mostly only in unusual situations like a switchover. In normal operation, we see 16.5M scores from eqiad and only 500 from codfw in 2019 so far. We're going to accept this small potential for duplication in the normalized scores table, but will select distinct rows when building the monthly snapshots in the next step.

Monthly job to produce dump-ready snapshots

The normalized tables will be joined with mediawiki_history in order to produce monthly snapshots of the scores along with the context, wiki page, revision, and user metadata. These will only contain metadata available to an anonymous user, and are suitable for making public dump files.


Document the new tables on wikitech:
https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/ORES

Event Timeline

awight created this task.Nov 16 2018, 11:24 PM
Nuria added a subscriber: Nuria.Nov 16 2018, 11:47 PM

@awight FYI that events need to abide to a schema that can be persisted to sql: https://wikitech.wikimedia.org/wiki/Analytics/Systems/EventLogging/Schema_Guidelines and that schema changes should be backwards compatible, once those requirements are taken care of you can take advantage of the current systems and dump data from kafka into hadoop

@awight FYI that events need to abide to a schema that can be persisted to sql: https://wikitech.wikimedia.org/wiki/Analytics/Systems/EventLogging/Schema_Guidelines and that schema changes should be backwards compatible, once those requirements are taken care of you can take advantage of the current systems and dump data from kafka into hadoop

Sounds good! Looks like T197000: Modify revision-score schema so that model probabilities won't conflict is on track to be resolved soon.

fdans triaged this task as High priority.
Nuria raised the priority of this task from High to Needs Triage.

I'd love some advice on how to proceed with this. The new stream's structure will be slightly different than mediawiki.revision-score, the biggest change is that we want to split scores by model. My understanding is that I should write a new stream transformer in the change-propagation repo. It would be easy to modify ores_updates.js to write each revision's scores to the mediawiki.revision-score stream, then write separate messages for each model to a new long-term score storage stream. To store to HDFS, we simply enable a Kafka Connect flag (in which repo?) to produce a table for the new stream. Is this correct?

@awight kafka topics (sometimes called streams) are set by schema and each schema is propagated to a different table in hive. Hopefully this makes sense. This means that events that abide to two different schemas (I think this is your case as one schema has events per model , the other does not) will not be in the same topic (mediawiki.revision-score) but rather in different topics. Something like: mediawiki.revision-score-by-model and mediawiki.revision-score

Once events are in the appropriate topic they will be persisted to hive w/o you having to do anything. Kafka connect is not used as of this time. We use this bit of code which is what the puppet you linked to executes: https://github.com/wikimedia/analytics-refinery-source/tree/master/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/refine

@awight, I think it would be better to transform the data in event.mediawiki_revision_score to your format in Hadoop. You can do this via Hive or Spark, possibly even reusing the Refine stuff (that puppet link you found I think is very old, but we have something more generic these days).

BTW, I believe that the revision-score stream doesn't contain all models. See https://phabricator.wikimedia.org/T201868#4824198

@Nuria, we talked about making a new stream for each model a lot in T197000: Modify revision-score schema so that model probabilities won't conflict but ultimately decided against it. @awight should split the existing data in hive into a new Hive tables by model.

@awight should split the existing data in hive into a new Hive tables by model.

Thanks for the suggestions! Would this be a Refinery job or an Oozie job?

I think it depends on what you want to do.

Hm, actually, this is not a 'Refine' job. (We use the term 'refine' to mean a 1 to 1 dataset job. Take one dataset in, enhance it, and then output one dataset.)

The tricky part with ORES models is that they don't share a common schema. This means that for every model, you either have to use the schema format that we settled on for hive (with the arrays of probablities object with name, value keys), OR you have to manually generate a Hive new schema for every model.

I think if you are ok with the Hive schema we have, and you just want to split to a table partitioned by model_name, then some kind of INSERT INTO ... SELECT FROM ... EXPLODE ... partition by model_name type query might work.

It's hard to say though, can you describe better what you want to do?

The tricky part with ORES models is that they don't share a common schema.

Good point, I guess the key-value array for probabilities will serve us fine.

It's hard to say though, can you describe better what you want to do?

The big picture is T209611: [Epic] Make ORES scores for wikidata available as a dump, with emphasis on the "dumps of ORES scores" use cases. One tricky detail is that we'll need to backfill quite a bit in order to produce complete dumps with a score for every revision. See the conclusion of T209731 for the desired partitioning. This should make it trivial to produce a dump of all scores for a given model, on a given wiki. It should also make it easy to purge and regenerate.

On a tangent, we're also considering storing raw feature values in HDFS in future work, which should let us run backfill jobs in Spark.

Ah ok, cool! Good to know @JAllemandou is involved there. I think he can help you figure out how this job/query would look. That partitioning scheme makes sense.

If this can't be written as a Hive query (it might be a bit difficult?), then you Should probably Write a Spark job to do this. It will let you programmatically iterate over the source data and generate new partitions. You can do this in pyspark2 if you like :) Once a Hive query or Spark job is written, then we would probably schedule running of it with Oozie.

I see, rather that a new stream of data this is a pivot of data we already have so spark job makes sense.

If I understand correctly, you're willing to process ORES events coming from EventLogging and explode them by model - Am I nearly Correct @awight ?

Bookmark to self—this query splits out the mediawiki_revision_score event by model:

select 
    `database`,
    page_id, page_namespace, page_title,
    rev_id, rev_parent_id, rev_timestamp,
    score
from mediawiki_revision_score
lateral view explode(scores) split_scores as score
where
    -- Constraints are only for testing.
    year=2018 and month=12 and day=10 and hour=1
    and `database`='wikidatawiki'
limit 100;

If I understand correctly, you're willing to process ORES events coming from EventLogging and explode them by model - Am I nearly Correct @awight ?

Yes, that's pretty much all I'm trying to accomplish in this phase of work. The HDFS partitioning is important as well, because the goal is to produce dump files for each (wiki, model) pair.

The query you bookmarked is doing what you expect I guess :)
The next steps are to create the associated table and create new partitions based on the query.
For this you'll need to use dynamic partitioning in hive.
Let me know if you want help @awight :)

Change 481025 had a related patch set uploaded (by Awight; owner: Awight):
[analytics/refinery@master] Schema for ORES scores

https://gerrit.wikimedia.org/r/481025

With the schema above, I was able to insert rows using this query:

set hive.exec.dynamic.partition.mode = 'nonstrict';

insert into table score
partition (wiki, model, model_version)
select
    errors,
    page_id, page_namespace, page_title,
    rev_id, rev_parent_id, rev_timestamp,
    model_score.probability as score,
    -- Dynamic partition columns must appear in this order.
    `database` as wiki,
    model_score.model_name as model,
    model_score.model_version as model_version
from event.mediawiki_revision_score
lateral view explode(scores) split_scores as model_score
where
    -- Constraints are only for testing.
    year=2018 and month=12 and day=10 and hour=1
    and `database`='wikidatawiki'
limit 100;

I think the next step is to write a batch job which runs this query with dynamic event.mediawiki_revision_score partitions. I'd love to see a model for that cos I'm not clear on how to resume processing from the latest event parsed by previous runs.

(Also, I see that I have a bug in the "errors" column, will fix it now.)

awight added a subscriber: Halfak.Dec 20 2018, 7:39 PM

@Halfak In CR, we were discussing the prediction vs. probability fields. I'm currently planning to only include the probabilities, since the 50% threshold thing makes prediction such an "attractive nuisance" for damaging and good faith. Do you have any strong opinions about that?

I think the "attractive nuisance" of the "prediction" field is debatable for editquality and probably not applicable to other models. It feels arbitrary to exclude it from some models and not others. If we really want to exclude the "prediction" field, we probably would want to do it at the source rather than downstream.

Also, it's worth noting that most of our performance metrics stored in models reflect the "prediction" field, so while it may be an attractive nuisance to a naive developer who is playing fast and loose, its expected behavior is still clearly documented.

awight updated the task description. (Show Details)Dec 20 2018, 8:34 PM

@JAllemandou I need another clue here, this query works when I run each insert separately but when run together I get an error about 'No partition predicate found for Alias "mediawiki_revision_score"'...

from event.mediawiki_revision_score
insert into table revision_score
partition (wiki, model, model_version)
select
    page_id, page_namespace, page_title,
    rev_id, rev_parent_id, rev_timestamp,
    model_score.prediction[0] as prediction,
    model_score.probability as probability,
    -- Dynamic partition columns must appear in this order.
    `database` as wiki,
    model_score.model_name as model,
    model_score.model_version as model_version
lateral view explode(scores) split_scores as model_score
where
    -- Constraints are only for testing.
    year=2018 and month=12 and day=10 and hour=1
    and `database`='wikidatawiki'
limit 100

insert into table revision_error
partition (wiki, model, model_version)
select
    page_id, page_namespace, page_title,
    rev_id, rev_parent_id, rev_timestamp,
    model_error.message as error_message,
    model_error.type as error_type,
    -- Dynamic partition columns must appear in this order.
    `database` as wiki,
    model_error.model_name as model,
    model_error.model_version as model_version
lateral view explode(errors) split_errors as model_error
where
    -- Constraints are only for testing.
    year=2018 and month=12 and day=10 and hour=1
    and `database`='wikidatawiki'
limit 100;
Nuria added a comment.Dec 20 2018, 8:53 PM

@awight maybe moving to IRC for this might help, i think your query is missing a ";"

@JAllemandou I didn't have time to chase down the responsible code, but wanted to let you know that the user redactions look good empirically.

For example, revision 17390095 on fiwiki was made by a user with ipb_deleted=1, and when I run the following query I get NULL in all the event_user and user fields except user_id, which is perfect IMO.

select * from mediawiki_history where wiki_db='fiwiki' and revision_id=17390095 and event_entity='revision' and event_type='create' and snapshot='2018-11';

Since ORES scores are expensive to recalculate en masse, we only want to refresh scores when a new ORES model or model_version is released. This page title and username redaction happens on a completely different cadence, once per month right after mediawiki_history is rebuilt. I think the raw scores should be stored separately from the redacted data, so I propose the following. Please let me know if this kind of normalization and materialized view is the wrong approach.

  • Table ores/revision/score will be partitioned on (wiki, model, model_version) as before, but its only columns are (rev_id, prediction, probability). No other data is copied from mediawiki_revision_scored. Only non-erroring rows appear here. This data is only regenerated for new models, otherwise it's appended to by a daily job reading mediawiki_revision_scored.
  • Table ores/revision/error has error details for all revisions that cannot be scored, but similar to the score table we only store (rev_id, error_message, error_type). Only regenerated when a new model is released, and appended daily as above.
  • Provide a view ores_revision_scores_public (name to follow TBD precedent) which denormalizes ORES scores, errors, and the mediawiki_history revision creation metadata including up-to-date redactions of page_title and user_text. Dump files are produced from this view. The view could be materialized. This data is regenerated monthly.

@JAllemandou I didn't have time to chase down the responsible code, but wanted to let you know that the user redactions look good empirically.

Thanks @awight for the above.

I also like the plan about data storage and recomputation. Thanks for that as well :)

Nuria added a comment.Jan 3 2019, 2:11 PM

Let's please make sure this data and tables are documented on wikitech .

awight added a comment.Jan 8 2019, 3:47 AM

This is the idea, https://gerrit.wikimedia.org/r/#/c/analytics/refinery/+/481025/

Should I avoid creating a view, though? I don't see any precedent for it, although the access pattern seems like a good fit in my case. The view would be queried once a month per wiki, to produce a dump of scoring events and their freshly redacted wiki context.

awight added a comment.Jan 8 2019, 4:22 AM

Should I avoid creating a view, though? I don't see any precedent for it, although the access pattern seems like a good fit in my case. The view would be queried once a month per wiki, to produce a dump of scoring events and their freshly redacted wiki context.

Looking at other examples, now I'm thinking I should have two transformations instead of a view: the first is a reduction of mediawiki_revision_score into ores_revision_score{,_error} which happens whenever new mediawiki_revision_score datasets appear. The second is a monthly denormalization of ores_revision_score into ores_revision_scores_public/wiki={wiki}/snapshot={snapshot}, and a dump file job is triggered when each of these datasets completes.

awight updated the task description. (Show Details)Jan 8 2019, 6:56 PM
awight added a comment.Jan 9 2019, 1:04 AM

Updated patches should have working DDL and HQL scripts, but I still need to refine and smoke test the job definitions.

Denormalized output looks like this:

rev_id  rev_timestamp   revision_parent_id      user_id user_text       prediction      probability     page_id page_namespace     page_title      wiki    model   model_version   snapshot
32018047        2018-12-11 13:07:27.0   32000758        0       NULL    false   [{"name":"false","value":0.6055215987898361},{"name":"true","value":0.3944784012101639}]   841607  0       الاحتلال_الإيراني_للجزر_الإماراتية      arwiki     damaging        0.4.0   2018-12
32018076        2018-12-11 13:13:34.0   31977033        1580054 Yara.Ash        false   [{"name":"false","value":0.7623528076180552},{"name":"true","value":0.23764719238194476}]  5126102 0       عبد_الحكيم_أجهر arwiki  damaging  0.4.0    2018-12

Change 482753 had a related patch set uploaded (by Awight; owner: Awight):
[analytics/refinery@master] [WIP] Oozie jobs to produce ORES data

https://gerrit.wikimedia.org/r/482753

I've left the working draft of the two coordinator jobs running in my user database. The hardcoded datacenter is probably a blocker, any suggestions would be appreciated. I tried "datacenter=*" in the URI template but that failed to match existing directories, so I'm thinking wildcards don't work that way.

Burn-in jobs:
https://hue.wikimedia.org/oozie/list_oozie_coordinator/0073727-181112144035577-oozie-oozi-C/
https://hue.wikimedia.org/oozie/list_oozie_coordinator/0073793-181112144035577-oozie-oozi-C/

awight renamed this task from Wire ORES scoring events into Hadoop to Wire ORES recent_score events into Hadoop.Jan 15 2019, 2:45 AM
awight updated the task description. (Show Details)

Something tricky I ran into: success files aren't written for hours where there are zero changeprop events through codfw. Maybe we have to change that job to write success files even for empty hours?

hdfs dfs -ls hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Found 9 items
drwxr-xr-x   - hdfs hadoop          0 2019-01-12 04:21 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=1
drwxr-xr-x   - hdfs hadoop          0 2019-01-12 19:21 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=16
drwxr-xr-x   - hdfs hadoop          0 2019-01-12 20:20 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=17
drwxr-xr-x   - hdfs hadoop          0 2019-01-12 21:20 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=18
drwxr-xr-x   - hdfs hadoop          0 2019-01-12 22:20 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=19
drwxr-xr-x   - hdfs hadoop          0 2019-01-12 05:20 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=2
drwxr-xr-x   - hdfs hadoop          0 2019-01-12 23:20 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=20
drwxr-xr-x   - hdfs hadoop          0 2019-01-13 00:21 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=21
drwxr-xr-x   - hdfs hadoop          0 2019-01-12 06:20 hdfs://analytics-hadoop/wmf/data/event/mediawiki_revision_score/datacenter=codfw/year=2019/month=1/day=12/hour=3
Ottomata added a comment.EditedJan 15 2019, 2:44 PM

Yeah, this isn't the first time we've had this problem. It isn't actually that easy to solve, because the Kafka consumer doesn't advance if there are no messages in the topic, so we don't have a way of knowing if there are just no messages, or if the Kafka consumer is stalled/broken.

We could emit a single test event per hour into the topic in each dc... :)

Nuria added a comment.Jan 15 2019, 3:00 PM

So i understand, the expectation here will be that files are written for all hours but empty for those of which there was no data?

We could emit a single test event per hour into the topic in each dc... :)

That works for me. Emitting the event sounds like an annoying problem though, so feel free to share any mechanisms you've already built. I'd suggest a wiki=testwiki model=damaging event, as an example stream that nobody should be taking seriously and that we don't plan to dump.

So i understand, the expectation here will be that files are written for all hours but empty for those of which there was no data?

As a consumer of mediawiki_recent_score, I think that would solve my immediate problem. Not sure if it introduces any other complications at the Hive level, though.

awight updated the task description. (Show Details)Jan 24 2019, 12:45 AM

There's one more issue that's not being addressed here. The precaching config does not have all the models available for the wiki, so the revision-score event only contains a subset of everything ORES can calculate. There are actually 3 separate sets of models ORES calculates for different purposes T201868#4802296 . I guess you'd want to archive all the models? Can we in that case finally merge all the configs and merge the changeprop and job queue ores pre-calculations?

I'm trying to get alternative solutions because I would honestly hate emitting a heartbeat event.

Nuria added a comment.Feb 12 2019, 8:43 PM

I'm trying to get alternative solutions because I would honestly hate emitting a heartbeat event.

I gotta say that solutions that come at a different layer than where problems are end up being more of a problem than a solution. Issue here is at processing layer, rather than event emitting layer, ideally we would come up with a solution that addresses the processing issue without having to tap in the publishing workflow.

Generally agree with both of you.

Harej triaged this task as Lowest priority.Mon, Mar 25, 4:45 PM
Harej removed awight as the assignee of this task.