Page MenuHomePhabricator

[airflow] Normalize the use of timeouts in Airflow DAGs
Closed, ResolvedPublic3 Estimated Story Points

Description

In Airflow you can define timeouts for Sensors, Operators and also at the DAG level.
Timeouts define the time that a Sensor/Operator has to do it's job.
If the timeout threshold is passed, then the corresponding Task fails.
They are useful to prevent missed DAG runs going unnoticed, and Sensors continuing to poke their targets indefinitely.

Let's discuss which is the best practice/strategy regarding timeouts,
whether to use them or not, which ones to use / at what level.
And apply the changes to all DAGs so all they follow the same policy.
Finally, add the decisions to the Airflow developer guide.

Event Timeline

After some reading, here's what I learned:

There are 3 parameters that we can use related to timeouts:

  • dagrun_timeout (DAG parameter) -- "specify how long a DagRun should be up before timing out / failing, so that new DagRuns can be created. The timeout is only enforced for scheduled DagRuns. [datetime.timedelta]" I think this one is quite clear. The timer will start counting when the DAG is triggered for execution, and if takes more time than specified in the dagrun_timeout, the whole DAG will fail, and hopefully we'll receive an alert.
  • execution_timeout (Operator parameter) -- "max time allowed for the execution of this task instance, if it goes beyond it will raise and fail. [timedelta | None]" This one I also think is pretty clear. When the Operator starts executing the timer starts, and it will fail, mark subsequent tasks as failed_upstream and send an alert if it takes more than the specified execution_timeout. The only question would be, what about retries? Do retries reset the timer?
  • timeout (Sensor parameter) -- "Time, in seconds before the task times out and fails." In this case, It's not super clear what's the scope of the timeout: Does it start counting at each poke of the sensor (in mode=poke)? At each retry of the sensor? At each re-spawn of the sensor after a reschedule? Just once, the first time it runs? It's unclear in the documentation and I could not find any other sources that explained that, but after looking at the code I think it starts the first time it runs, and it runs across all pokes, re-schedules and retries, until it times out. Meaning, time between pokes / re-schedules / retries counts!

Some thoughts to start a conversation:

Task-specific vs DAG-wide timeouts

I believe the advantage of task-specific (operator, sensor) timeouts is that it's easier to define accurate timeout intervals, given that we know more or less how much time the task should take.
In case the task takes too much time, Airflow would identify this earlier and release the resources earlier.
As opposed of the DAG-wide timeouts, which are more difficult to define because the timeout interval has to comprehend different sensors and operators. It won't be that accurate,
and it may release resources and raise alerts later than a task-specific timeout would. In addition to that, we already have a DAG-wide SLA, which will alert us when the dag_run takes too much time to finish.
Also, a DAG-wide timeout would be more prone to false failures, when the DAG has a legit reason to take more time than expected.
That said, DAG-wide timeouts would be a simpler convention to make, since they only have to be defined at DAG creation (top of the file).

Timeouts for Sensors

Since all our sensors use mode=reschedule (and have sensible poke_intervals), they will not consume resources (slots) even if they are on for days.
So I'm not sure of the advantage of setting timeouts for them, since we already have an SLA that will alert us.
And if the Sensors are late because the source data is not present, we already should receive an SLA from the DAG generating the source data.
So it might be a bit of an overkill to specify timeouts for sensors with mode=reschedule I believe.

Timeouts for hourly/daily vs. weekly/monthly jobs

I think timeouts for hourly jobs make a lot of sense, since we know they have to finish in less than 1 hour, otherwise they will pile up.
And releasing the consumed resources is extremely important in this case. This might apply as well for daily jobs, although maybe not so critially.
Now, when it comes to weekly/monthly jobs, I don't think timeouts bring that much value. We will receive SLAs and act upon them
long before the weekly/monthly jobs pile up and block resources (or at least we should! :-)). Plus, specifiying timouts for i.e. monthly jobs
would include a lot of waiting time for the Sensors, sometimes more than 1 week! So that is hardly useful, no?


My initial conclusions would be:

  • Do not use DAG-wide dagrun_timeout, we already have SLAs for that.
  • Do not use timeout for mode=reschedule Sensors, instead carefully specify the poke_interval.
  • Always use Operator-specific execution_timeout.
  • We can default execution_timeout in default_args to say 6 hours, and override it in hourly jobs (to make it smaller) or make it bigger for tasks that take more than 6 hours.

My initial conclusions would be:

  • Do not use DAG-wide dagrun_timeout, we already have SLAs for that.

I disagree on this one. To me, the only benefit of using timeouts is to make sure there is enough running slots for new dagRuns showing up. Therefore the timeout, similarly to the SLA, would be at DagRun level. Happy to talk more about that

  • Do not use timeout for mode=reschedule Sensors, instead carefully specify the poke_interval.

+1

  • Always use Operator-specific execution_timeout.
  • We can default execution_timeout in default_args to say 6 hours, and override it in hourly jobs (to make it smaller) or make it bigger for tasks that take more than 6 hours.

I'm not sure about this one, as explained above, I'd go with dagRun timeout if we wish to standardize.

We had a discussion with the team and here's a summary:

tl;dr
We won't use timeouts for now.


Use cases of timeouts

We identified 2 use cases for timeouts in Airfow:

  1. Preventing "dag_run slot famine". In Airflow we set the parameter called max_active_runs_per_dag to 3, which means a given DAG can only execute 3 dag_runs at the same time. This is necessary and purposefully chosen to prevent that backfilling of DAGs launches too many tasks concurrently and overloads the cluster. So we could say we have 3 dag_run slots per DAG. Airflow can not launch a dag_run until one of those slots is free. But sometimes it may happen that dag_runs get stuck (i.e. the source data is not present yet, or a spark job takes way longer than it should). If this happens enough times, the 3 dag_run slots will be blocked by those stuck dag_runs, and Airflow will not launch any new dag_run for that DAG. This is what we called "dag_run slot famine". Defining timeouts for dag_runs would prevent this to happen, since the timeout would kill stuck dag_runs freeing their dag_run slot for further dag_runs to be scheduled.
  1. Preventing Spark jobs to run unnoticed for way longer than they should and potentially overload the cluster. A Spark job could get stuck for any reason and block cluster resources and thus overload the cluster. Defining timeout for operator's execution would prevent this to happen, since the Spark job would be killed after the specified amount of time, releasing the cluster resources.

We didn't consider the emails sent by timeouts to be a use case of timeouts, since we already have SLA alerts, which would cover both of the cases presented above.

Likeliness of those cases

  1. "dag_run slot famine" could be likely in hourly DAGs, since they only need 3 blocked dag_runs (and thus 3 hours only) for the DAG to become stale. On the other hand, it would be very rare for us not to notice and address such an issue in a daily/weekly/monthly DAG before it times out; since it would need to be failing for more than 3 days (in daily DAGs) or more than 3 weeks (for weekly DAGs). That said, we did not remember any case where an Airflow job (hourly or other) entered a "dag_run slot famine" state.
  2. While we think it's possible and potentially likely that a Spark job gets stuck, we don't think this would affect the performance of the cluster that much. And we also could not recall a case where this happened.

Decision

  • We considered using DAG dagrun_timeout(1) for hourly DAGs only. However, that would introduce more cognitive load for Airflow DAG config. And since we don't think this has yet been a problem, we decided to not use this kind of timeout for now, until we see the issue happening. With this, we hope to keep the creation of DAGs as simple as possible.
  • We discarded using operator execution_timeout (2), since we don't recall any incident where that was the culprit.
  • We will rely on SLA alerts to detect issues with delays in DAG execution. We should consider SLA alerts as incidents to be attended timely.

Action items

  • Push a change to airflow-dags, removing all existing timeouts specified.
  • Add this decision to the Airflow developer guide docs in Wikitech.
JArguello-WMF subscribed.

@mforns Can you please let me know when you add the decision to the Airflow developer guide? So I can close the ticket. Thanks!

For clarification: Is this decision just for the analytics instance, or for all instances?

@xcollazo This would be for the analytics and the analytics_test instances.
Although, if other teams are following our developer guide docs, they might do it as well, but that is their decision to make, I think!