ORES change propagation is configured to mostly go through one datacenter, but that might change during a cutover or at some point in the future. We rely on hourly directories being created in each datacenter as a precondition to the ORES import recent scores job, so we'll need to tickle the streams, perhaps with a synthetic event emitted on the *:30, every hour.
|Open||None||T209611 [Epic] Make ORES scores for wikidata available as a dump|
|Resolved||None||T209731 Choose HDFS paths and partitioning for ORES scores|
|Open||None||T209732 Wire ORES recent_score events into Hadoop|
|Open||None||T214545 Emit synthetic mediawiki.revision-score events for both datacenters|
Ya not specific to ORES. This would affect anyone trying to schedule Oozie jobs on data imported periodically from Kafka.
@Pchelolo I wonder what you think about this.
The problem is that we can't differentiate between producers being broken and not sending any data, and there just being no data at all e.g. in an inactive datacenter. Even if we send a dummy event per hour, we still can't really tell. I suppose a dummy event would allow the Refine pipeline to progress (as it would detect data and generate partitions), but if producers were broken, this would result in data being 'readied' and jobs started before all the data is actually in place. Hm. This would mostly be fine for Refine, as new data (even in old hours...as long as it isn't TOO old) is detected and partitions are re-refined. But completed dependent jobs wouldn't automatically detect the new data, and would have to be manually re-run.
I'm not sure what is best to do here. Writing something that sends one dummy event per hour for every topic would solve this immediate problem (and might be worth doing), but could bite us later. @JAllemandou any thoughts?
ORES change propagation is configured to mostly go through one datacenter
Which of the ORES-related CP rules are you referring to? The precaching is hitting both DCs (something I would really really love to change) while the 'revision-score' event is indeed emitted only in the active DC. The events you see coming from codfw are 'renegate' events, probably due to conditions described in T207994 . These get mirrored into eqiad, so if refinery (running in eqiad only afaik) takes both prefixed revision-score topics, we should be fine.
I'm failing to understand @Ottomata how is this situation any different from any other event? Maybe I need more coffee..
It isn't different from any other event. The issue is that Adam is building an Oozie job that gets launched based on full existence of an hourly Hive partition. Since this data comes from multiple topics, he needs to wait until all the data in each topic for a given hour has been imported into Hadoop. If there is no data in a given topic, no hourly directory will be created, and his jobs will stall.
So in this case we have all the events in eqiad, and some rogue events in codfw. Is it possible to consume from 2 topics in the job and rely on mirror maker to deliver codfw events? We could quite safely ignore the codfw events, there should be a negligible number of them.
Ah, no sorry, let me try to be clearer. The problem here is that there are NO events in codfw, but the job needs to handle the case where there are events in both (during a switchover. A 'stream' is made up of multiple topics, in this case one per DC. We need to detect when data for a full hour has been read for an entire stream. If all topics in a stream have data, this is easy: once all topics have a message with a timestamp greater than the current hour, then we can consider the stream fully imported for that hour. However, if one of the topics in a stream has no data for a given hour, we can't tell if that is because of an upstream (producer related) error/delay, or if there is simply no data in that topic for that hour.
Adam's Oozie job is set up to trigger once each hour for the revision-score stream (including both DC topics) has been imported into Hadoop. When there's no data in the codfw topic, the job will stall. Right now he works around this by only relying on the eqiad topic, but this will mean his job will break and need manually fixed during a datacenter switchover.
If one of the topics in a stream has no data for a given hour, we can't tell if that is because of an upstream (producer related) error/delay, or if there is simply no data in that topic for that hour.
I believe that producer errors should be tracked/monitored upstream. Can it be 'import if all topics have a message with timestamp >= current hour or < previous hour or empty'?
< previous hour
Maybe, we'd have to find some way to do that in Oozie, but it might be possible. Even if it is, this would be error prone too! What happens if data is delayed > than 1 hour? Maybe if the launch condition is all (latest topic message timestamp >= current hour OR latest topic message timestamp < X hours ago), but now we're relying on time delays to launch jobs, which kind of defeats the purpose. Since there's no data in codfw, this means that most of the time the jobs will be always be delayed by X hours, so we might as well just remove the first condition that checks for the latest message timestamp. Years ago, this is how webrequest jobs worked, and it was really error prone. Relying on time periods instead of data existence to launch never works well.
I can't think of a better solution though...
This is a larger problem with handling late events and will bite us elsewhere in the future (stream processing, etc.).
Hmm.... Just brainstorming here...
This doesn't seem solvable even with the artificial event too. If we emit the artificial event via the same machinery and we emit normal events (which we should do) it will get delayed the same amount as normal events, or lost if the producer got issues. So in case of a producer breakage, we would never know if the data was absent (kick an empty import) or lost (kick an empty import) or delayed (wait more for the data). If we wait indefinitely for the data to show up in case of a producer breakage and data loss we will just get stuck indefinitely, so we need a cap on the time to wait for the data and abandon the idea of importing it if the time cap have been reached right?
Is ozzie stateful? Can we know what time period was the previous import for?
If we emit the artificial event via the same machinery and we emit normal events (which we should do) it will get delayed the same amount as normal events, or lost if the producer got issues.
I was actually thinking we wouldn't use the normal machinery...if we were to do this I'd make a script that directly produced 1 event per hour per topic, not via EventBus HTTP etc.
Is ozzie stateful? Can we know what time period was the previous import for?
Yes, and sort of! You can make a dependency on a previous import and know the time period for it.
abandon the idea of importing it if the time cap have been reached right?
I'm not sure if we'd abandon importing it, but perhaps abandon the dependency for it.
(My brain is too far in other lands to think too much about how this would be done in Oozie, or if we should do it right now... let's see if Joal has any ideas...)
After talking to @joal at All Hands, I'm now thinking that T211069 should be the first priority for integrating the scoring pipeline into Hadoop. We should design a stream transformer to calculate features for each revision, and store the resulting values partitioned by feature name. This involves very different challenges than what's being discussed here, and in the end we might be able to entirely circumvent this minor glitch about merging the data centers.