Page MenuHomePhabricator

Unify all Product Analytics ETL jobs
Open, MediumPublic


At the moment, Product Analytics has about 10 active ETL jobs with widely differing setups (Oozie, shared systemd timer, individual cron jobs, etc.). We have decided to standardize on using Puppet-managed systemd timers, with the code stored in the analytics/wmf-product/jobs repo. This is the approach used by the movement_metrics job.

There are two main phases here (which will get subtasks soon):

  • Standardize the process. We will want to use shared code where possible (e.g. a shared shell script, a Puppet 'class' that streamlines the process of creating a systemd timer) and write good documentation.
  • Migrate all existing jobs. Once we have a standard process, we will need to change all the existing jobs (e.g. wikipediapreview_stats, Morten's cron jobs for Growth) to use it.

Related Objects

Event Timeline

I'm driving the planning and team formation right now.

Good luck on this perilous mission 🫡

mpopov triaged this task as Medium priority.Sep 13 2022, 5:06 PM
mpopov added a subscriber: EChetty.

Update: Here is an almost-complete table of our ETLs to understand all our requirements:

JobSchedulecron/systemd specificationSoftware & Data Dependencies
Movement MetricsMonthly – On the 7th of every month at 12AM UTC*-*-7 00:00:00Hive, Pageview Hourly, Virtual Pageview Hourly, MediaWiki History
IP Masking (5 ETLs in 1 notebook)Monthly (but unscheduled)ManualHive, MediaWiki db, MediaWiki History, MediaWiki Logging (Sqooped)
Media Search in VisualEditorDaily at 8AM UTC0 8 * * *Hive/PySpark, event db
Media Search on CommonsDaily at 8AM UTC0 8 * * *Hive/PySpark, event db
Media Search - Filter usageDaily at 8AM UTC0 8 * * *Hive/PySpark, event db
Media Search - otherDaily at 8AM UTC0 8 * * *Hive/PySpark, event db
SDAW Search ImprovementsWeekly – Every Monday at 9AM UTC0 9 * * 1Hive/PySpark, event db
SDC - CaptionsMonthly – On the 6th of every month at 9AM UTC0 9 6 * *Hive/PySpark, MediaWiki History
SDC - EditsMonthly – On the 6th of every month at 9AM UTC0 9 6 * *Hive/PySpark, MediaWiki History
SDC - OverallMonthly – On the 6th of every month at 9AM UTC0 9 6 * *Hive/PySpark, MediaWiki Page History, Structured Data db (DAG)
Search - Refine data into cchen_search.search_eventMonthly – On the 6th of every month at 9AM UTC0 9 6 * *Hive/PySpark, event db
Search - MetricsMonthly – On the 6th of every month at 9AM UTC0 9 6 * *Hive/PySpark, cchen_search.search_event
WikipediaPreview usageDaily at 12AM UTC (probably???)OozieHive, Webrequests
Welcome Survey responsesMonthly – On the 4th of every month at 6:45AM UTC45 6 4 * *PySpark (writes, coalesces), event db, MediaWiki db
Apple Relay usage & impactDaily at 11:15AM UTC15 11 * * *PySpark (uses a UDF), Pageview Actor table
Computer Aided Tagging (report)Weekly – Every Monday at 12:05AM UTC5 0 * * monMariaDB, MediaWiki db, write access to /srv/published

Before my leave/vacay, Emil and I discussed some options he put together ( based on those.

Option 1

Data Engineering (DE) sets up a containerized system wherein Product Analytics (PA) can put notebooks / pure queries with optional requirements.txt files into a (possibly public) repo in GitLab with hourly/daily/weekly/monthly subdirectories which are executed at those cadences.

When a job is run, an environment is created with specific versions of packages as specified by requirements.txt (if there is one) or just the defaults. It is up to the notebook to check:

  • If it needs to be run at all. That is, if it's a daily job it should check if that day's data exists in the target table.
  • If it does need to be run, can it run. It should check if its data dependencies are available.

PA can self-review to verify the jobs run as expected.

The executed notebooks with all output would be published to a definitely private repo in GitLab and can be reviewed for debugging, etc. – this is blocked by T305082.

Option 2

PA gets its own Airflow instance and writes all the Airflow jobs in Python, which involves a big time investment to learn how to write low-level jobs. Those jobs would have Sensors so that we could directly set up dependencies on specific MediaWiki History snapshots and event/webrequest partitions, for example.

DE would have to review all jobs, which is not sustainable. Somebody in PA would need to become an Airflow expert to take over code review duties.

DE also prefers limiting how many Airflow instances there are to lessen maintenance burden.

Option 3

We divert resources from finalizing Kubeflow on the DSE cluster to get Elyra, which would make for the easiest end-user experience but may not even work in the end at all.

I will follow-up with @nettrom_WMF & @cchen and discuss these options with them.

@EChetty: Are those accurate summaries of the options?

Also, I've made a note that @mforns is interested in being involved in this endeavor. (Yay! :D)

Perfect! This makes sense to me.

Just to further emphasise that Option 3 would be highly experimental - and so stability and availability might be an issue.

Thanks @mpopov for the summary!

A variant of option 2)
I think in a conversation with @EChetty we also contemplated a variant of option 2), which would be to write Airflow DAGs without sensors.
This would considerably reduce the complexity of the DAGs, since a good chunk of it is carried by the sensors.
The downside is that the DAGs would schedule jobs at a cron-like schedule, instead of waiting on source data to be present.
Once the PA team would be more familiar with Airflow, we could always add sensors and gain data dependency awareness.

Another idea
To ease the creation of simple DAGs, we could implement a wizard, that would ask only the pertinent questions, and would generate the DAG code automatically, following all conventions (and using sensors).
I think this would be valuable, even for us Data Engineering. However, the DAG would still need to be tested (with the dev instance), operated, troubleshot, etc. The wizard would not be so magical to do those things :-(

I've finally had a chance to connect with Connie and Morten about this. We're in agreement that option 1 is the best and easiest to maintain option.

Awesome! Thank you @mpopov ! This work was scheduled to start with next weeks sprint but we may need more time than that to figure everything out.

@mforns -> lets go through epics and figure out the details here.

I'd like to advice against this decision a bit. Although solution 1 might seem simpler, I can see some problems that could hit us, for instance:

  • Leaving the responsibility of deciding whether a job can be run (all the source data is there), or needs to be run (the particular time interval has not yet been processed) is not simple. We have non-short non-trivial code in Oozie, Refine, Reportupdater and now Airflow that takes care of that (this is one of the reasons why we want to get rid of all schedulers but Airflow). I don't think it would be good to have yet another version of this logic in the notebooks. Not just because of the duplication of code, but also because that would make the notebook code much more complex (even if we managed to reuse it across notebooks, we'd still need to write a common library, etc.). Also, this feature would only be valid for notebooks, not for plain queries, which would not have the ability to be triggered by presence of source data.
  • I can see potential problems when we need to execute re-runs or back-fills of the jobs. If the containerized system is just triggering jobs at a given schedule, we (DE or PA) would have to re-run/back-fill the jobs manually. Or else, if we implement support for parametrized re-runs/back-fills, we'd be duplicating code that already exists in Oozie, Refine, Reportupdater and Airflow. I think that would to conflict with the effort that we've been doing to unify our scheduling systems.
  • The ability to troubleshoot the jobs also worries me. To have easy access to logs, a working alerting system, and a record of which jobs ran successfully vs. which ones failed, is important. Again, if we implemented that, we'd be duplicating code/systems.
  • In general, I feel like we'd be implementing yet another scheduler system, when our plan has been the opposite for the last couple years. We could go for a very light-weight solution, but then we'd miss fundamental job management capabilities.

I think we can still find a solution that can be simple enough for PA, while fitting into DA's strategy. One option I think we should still discuss is the 'wizard idea' (see my comment above). A simple CLI tool, that would ask the user some questions (with reduced options and validation), and then it would automatically generate (hopefully production-ready) Airflow DAG code. This would hopefully lower the barrier for people not familiar with Airflow, while keeping our current tools of choice (Airflow), and thus give PA the full set of job management features. Let's discuss, please! :-)

To ease the creation of simple DAGs, we could implement a wizard

Instead of a wizard, perhaps we could just create an abstraction (task groups?) around simple input/output jobs? Parameterize input and output frequency and locations (hive table / hdfs path), and the job the user wants to run?

I guess this would require some standardization of how jobs are written too?

(Anyway, feel free to ignore me too, this is a drive by comment :) )