Page MenuHomePhabricator

Make Airflow SparkSQL operator set fileoutputcommitter.algorithm.version=2 to avoid concurrent write issues
Closed, ResolvedPublic

Description

See https://phabricator.wikimedia.org/T347076 for the whole context.

We wish to add a global spark config being:

spark.mapreduce.fileoutputcommitter.algorithm.version = 2

This will allow spark to not fail when concurrent jobs write to the same table.

Event Timeline

BTullis triaged this task as High priority.
BTullis moved this task from Incoming to In Progress on the Data-Platform-SRE board.

Change 975006 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Set a non-default mapreduce file committer algorithm for spark

https://gerrit.wikimedia.org/r/975006

I have now implemented this via https://gerrit.wikimedia.org/r/975006

I used the parameter: spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version which I believe is correct.
It's worth noting that there is a comment here: https://spark.apache.org/docs/3.1.2/configuration.html#execution-behavior
...which leads to this bugreport about the v2 algorithm.

https://issues.apache.org/jira/browse/MAPREDUCE-7282

image.png (146×913 px, 20 KB)

Thanks a lot @BTullis - The problem you linked is indeed a known issue. We rely on hive-metastore and _SUCCESS files which should prevent the issue on prod jobs. For other jobs, this is indeed a problem.
I see 2 possible sultions here:

  • We keep the change you made and see if it causes problems.
  • We don't apply the change but provide a solution for people backfilling by telling them to use that parameter in their jobs.

I'm not sure which is best

After giving it a few more thought, it seems that NOT changing the parameter globally to enforce data-correctness in folders is the best idea. We would document in wikitech and airflow the concurrency issue and give the solution of that setting for backfilling times.
I'm interested for feedback if anyone thinks it's not the best idea :)

(IIRC, the change in the commit strategy proposed in v2 were made to better support object stores, given than a mv in HDFS is cheap and just metadata, while a mv in an object store typically maps to cp and rm.)

So I would be wary of letting go of correctness here to better support concurrent writes, which seems to not be common in our Hadoop instance after all?

If we have particular tables we'd like to write concurrently, couldn't we expedite their move to Iceberg, and thus kill two birds with one stone?

+1 for leaving writing to Hive tables alone (and erring towards correctness and jobs failing and hopefully comments that we can find)
+1 to instead focusing on the Iceberg migration

Not changing global config and instead just documenting the solution / how to use that setting temporarily for backfills sounds good to me. I think this is important when spinning up a new dataset because if each run is very expensive, there should be a way to minimize number of re-tries.

Alternatively what if we reconsider the decision made in T300870: Airflow concurrency limits and change max_active_runs_per_dag from 3 to 1. That decision was made to yield faster backfills and before the concurrency issue was known, so it makes sense to re-evaluate with the new knowledge.

https://gerrit.wikimedia.org/r/plugins/gitiles/operations/puppet/+/refs/heads/production/modules/profile/manifests/airflow.pp mentions

This is configurable at the DAG level with max_active_runs, which is defaulted as max_active_runs_per_dag.

Then we provide guidance to users for how to backfill efficiently and safely by making these two temporary changes for their DAGs:

  • Setting max_active_runs to 3 (overriding the global 1)
  • Enabling v2 algorithm

And if the user doesn't want to opt-in to that approach that's fine, they can just forget about it and Airflow will just steadily backfill one run at a time.

Change 975006 abandoned by Btullis:

[operations/puppet@production] Set a non-default mapreduce file committer algorithm for spark

Reason:

Decided against modifying the default settings.

https://gerrit.wikimedia.org/r/975006

Not changing global config and instead just documenting the solution / how to use that setting temporarily for backfills sounds good to me. I think this is important when spinning up a new dataset because if each run is very expensive, there should be a way to minimize number of re-tries.

Alternatively what if we reconsider the decision made in T300870: Airflow concurrency limits and change max_active_runs_per_dag from 3 to 1. That decision was made to yield faster backfills and before the concurrency issue was known, so it makes sense to re-evaluate with the new knowledge.

https://gerrit.wikimedia.org/r/plugins/gitiles/operations/puppet/+/refs/heads/production/modules/profile/manifests/airflow.pp mentions

This is configurable at the DAG level with max_active_runs, which is defaulted as max_active_runs_per_dag.

Then we provide guidance to users for how to backfill efficiently and safely by making these two temporary changes for their DAGs:

  • Setting max_active_runs to 3 (overriding the global 1)
  • Enabling v2 algorithm

And if the user doesn't want to opt-in to that approach that's fine, they can just forget about it and Airflow will just steadily backfill one run at a time.

I'm happy to change the default max_active_runs back to 1, or leave it at 3, depending on what you all think.

I have added documentation here: https://wikitech.wikimedia.org/wiki/Data_Engineering/Systems/Airflow/Developer_guide#Concurrent_DAG_Runs
I think we can go and change our global config to use 1 as the default parallelisation of dag_runs per dag.

Change 976700 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] airflow: change max_active_runs_per_dag back to 1

https://gerrit.wikimedia.org/r/976700

Change 976700 merged by Btullis:

[operations/puppet@production] airflow: change max_active_runs_per_dag back to 1

https://gerrit.wikimedia.org/r/976700

BTullis moved this task from Needs Review to Done on the Data-Platform-SRE board.

Declining as per discussion. I have reset the default value of max_active_runs_per_dag back to 1 as suggested.

Thanks, Ben!

I'd like to expand the section Joseph added. (Thank you, Joseph!) I have some questions/suggestions in https://wikitech.wikimedia.org/wiki/Talk:Data_Engineering/Systems/Airflow/Developer_guide#Documenting_Spark_config_changes_for_concurrent_runs if @JAllemandou and others with more knowledge of these things want to chime in.

It's in my plan to update the docs @mpopov, it just takes longer than I would like (like everything else I do lately).

Updating paper trail.

max_active_runs_per_dag was reset back to 3 in this patch, after this slack conversation.

I'm coming across this now as I attempt to coordinate incident T376882: 2024-10-10 Data Loss Incident - webrequest Hive table .

cc @Antoine_Quhen and @mforns.

I'll paste the slack convo here for posterity:


Dan Andreescu:
Hi, we need a little debate on Airflow config.Our Airflow instance currently has global max_active_runs_per_dag set to 1 as of this change.  In that change, Ben said:

We recently increased the default value of max_active_runs_per_dag from\
1 to 3 in order to facilitate faster backfill operations. However, after\
we have now reviewed that decision in light of further experience and\
have decided that default value of 1 is more sensible.

I looked back through history and the only other change I could see was Andrew changing the default of 16 to 3 here.  So I don't think this was ever set to 1.I don't think 1 is a good idea as a default, Marcel pointed out the following problem:

  • dag X runs instance A, gets stuck processing but doesn't die
  • instances B, C, and so on are then also stuck
  • If this happens at midnight, by the time we find and fix the problem, we could have hours of backfilling to do, which will now also happen more slowly

If we have a couple of hours of bad data that causes a job to be stuck, setting max_active_runs_per_dag to 2 or 1 seems like it would run into this problem.  I'm curious what about potential corruption that values > 1 could run into.

Ben Tullis:
Whoops! Sorry for confusing things.
The potential concurrency issue was mentioned here: 

Dan Andreescu:
I see... hm, I still kind of lean towards 3 and wonder what yall think about the scenario above.

Marcel Ruiz Forns:
Do we know how frequently is the Spark bug triggering?

Joseph Allemandou:

  • dag X runs instance A, gets stuck processing but doesn't die
  • instances B, C, and so on are then also stuck
  • If this happens at midnight, by the time we find and fix the problem, we could have hours of backfilling to do, which will now also happen more slowly

This is actually already the case for all pageview-actor dependent jobs (many!). The pageview_actor dataset depends on the webrequest_actor_X ones, and one of them, the previous webrequest_actor_metrics_rollup depends on the previous 24h of data being present. This means, that when one hour of webrequest fails, the pageview_actor-dependent datasets are xstuck at that point, until that webrequest-hour is processed, and when this happens, all hours are processed one by one due to the 24hdependency issue. I htink it's worth keeping the 1 default value which prevents errors during backfilling, and review if/when issues arise.

Marcel Ruiz Forns:
Another drawback of max_dag_run=1 is that, if/when we want to re-run a past dag_run, we have to wait until the current dag_run is finished. Or else, clear the current dag_run, but that could generate confusing alerts, plus is an extra manual step...

The 24h window of webrequest_actor_metrics_rollup is uncomfortable, but I think should not be an argument here. There are other hourly pipelines that could be blocked unnecessarily by max_dag_run=1, no?

Unless ofc, the spark issue happens frequently...

Also, does the spark bug fail? Or is it generating corrupted data silently?

Joseph Allemandou:

Also, does the spark bug fail? Or is it generating corrupted data silently?

No data corruption, it just fails. The reason for setting max_dag_ruin was to prevent those failures, which seem to only happen when we backfill (I've not experienced any of those errors myself).\
Similarly, the arguments about current lack of concurrency are from my point of view not rooted in experience: I have not personally experienced a feel of lateness due to lack of concurrency (the pageview-actor being a counter example, but that's unrelated).\
I'm ok to move the per-dag-concurrency value back to 3 since from my point of view, it wouldn't change anything almost.

Ben Tullis:
Has the spark bug been addressed in any later versions, or is it specific to 3.1.2? We've been speaking about flicking the switch to upgrade production spark to 3.3.2 or 3.4.1 early in the new year, if that helps.I don't have any strong opinions on this setting or bug. I'm mainly deploying the settings requested.

Joseph Allemandou:
The spark behavior is not considered a bug - it's how spark-sql works. The solution for us is to starst using Iceberg !:slightly_smiling_face:

Marcel Ruiz Forns:
So, what do we choose? I'm happy too to leave it at 1, and change it later if we feel it's necessary.

Ben Tullis:
If the chance of data corruption is vanishingly low, I would probably vote for setting it back to 3, to help with the speed of re-runs.

Dan Andreescu:
I vote 3

Marcel Ruiz Forns:
OK with 3 too

Joseph Allemandou:
Back to 3 it is then !:slightly_smiling_face: Let's see if someone experience issues with the next change

Ben Tullis:
Ok, I can make a patch to change it tomorrow morning, or maybe it can wait until Monday.

Ben Tullis:
After a long delay, I have created the revert to the airflow configuration change here: https://gerrit.wikimedia.org/r/c/operations/puppet/+/1004184 (edited) 

In the discussion above I made a mistake: I stated that the jobs fail while they don't. They generate corrupted data, as in a portion of the entire dataset (no data-mix between jobs).
In this wikitech doc the problem is explained and it is said that the spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 2 should be used if the job fails during backfill, but actually we should have set the parameter by default on every job.

actually we should have set the parameter by default on every job.

Do you mean always? Or just when backfilling or writing in parallel?

Let's reopen this then.

Ottomata renamed this task from Add a spark global config for better file commit strategy to Globally configure spark to use fileoutputcommitter.algorithm.version=1 to avoid concurrent write issues.Oct 11 2024, 4:06 PM
Ottomata renamed this task from Globally configure spark to use fileoutputcommitter.algorithm.version=1 to avoid concurrent write issues to Globally configure spark to use fileoutputcommitter.algorithm.version=2 to avoid concurrent write issues.Oct 11 2024, 4:10 PM

actually we should have set the parameter by default on every job.

Do you mean always? Or just when backfilling or writing in parallel?

I think we should default to using version 2: it's too easy to forget to move to v2 when backfilling, and errors can occur anytime when 2 jobs run in parallel for whatever reason.

Change #975006 restored by Btullis:

[operations/puppet@production] Set a non-default mapreduce file committer algorithm for spark

https://gerrit.wikimedia.org/r/975006

Okay!

And, IIUC, the only potential downside is incomplete data in the output directory even if a job fails. This should generally be fine for Hive tables.

Will enabling this setting always be safe for Iceberg writes?

Will enabling this setting always be safe for Iceberg writes?

Iceberg uses its own commit mechanism, unrelated to this flag.

News on this front: After talking with @Antoine_Quhen and @Ottomata the preferred solution is to make this flag passed by our Airflow SparkSQL operator instead of being hard-coded in Saprk config.
The difference is for users: if we put it in the global spark-conf, users can have the cases where there is corrupted (partial) data in the result folder they're writing to, and we prefer to avoid this.
It's less of a problem for production jobs as failed spark jobs will be automatically retried, or raise an error.

JAllemandou renamed this task from Globally configure spark to use fileoutputcommitter.algorithm.version=2 to avoid concurrent write issues to Make Airflow SaprkSQL operator set fileoutputcommitter.algorithm.version=2 to avoid concurrent write issues.Oct 17 2024, 1:55 PM
JAllemandou updated the task description. (Show Details)

Change #975006 abandoned by Btullis:

[operations/puppet@production] Set a non-default mapreduce file committer algorithm for spark

Reason:

See: https://phabricator.wikimedia.org/T351388#10237936

https://gerrit.wikimedia.org/r/975006

BTullis renamed this task from Make Airflow SaprkSQL operator set fileoutputcommitter.algorithm.version=2 to avoid concurrent write issues to Make Airflow SparkSQL operator set fileoutputcommitter.algorithm.version=2 to avoid concurrent write issues.Oct 18 2024, 2:47 PM

...
The difference is for users: if we put it in the global spark-conf, users can have the cases where there is corrupted (partial) data in the result folder they're writing to, and we prefer to avoid this.
It's less of a problem for production jobs as failed spark jobs will be automatically retried, or raise an error.

If this flag has been identified to help only one of our jobs, why not just set it for the job related to this ticket?

If this flag has been identified to help only one of our jobs, why not just set it for the job related to this ticket?

This bug affects anything that writes into a hive table in parallel. The difference between the algorithms:

  • v1: parallel writes can result in incomplete partition data (missing data files) even if all writes say they succeed
  • v2: parallel writes can result in incomplete partition data in table when jobs fail. Parallel writes where all jobs succeed will not result in incomplete data.

v2 is preferable especially for production / airflow managed jobs. Airflow should retry and report failures. We didn't think it was a good idea to globally set this in spark confs, as then user launched ad hoc parallel writes aren't always checked for failures. If their write failed, they might assume it everything is fine, and see incomplete data. We thought it would be preferable to not trick users into thinking their write succeeded if they see any output data in the partition.

But, the proper proper fix is to just make Hadoop's FileOutputCommitter use distinct _temporary directories for different jobs.

This is now merged. I'm seeking clarification whether I should deploy the change to all instances, or whether I should leave it for all teams to deploy at their own convenience.

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:51:06Z] <btullis@deploy2002> Started deploy [airflow-dags/analytics_test@ba61f77]: T351388

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:51:20Z] <btullis@deploy2002> Finished deploy [airflow-dags/analytics_test@ba61f77]: T351388 (duration: 00m 31s)

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:51:45Z] <btullis@deploy2002> Started deploy [airflow-dags/analytics@ba61f77]: T351388

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:52:49Z] <btullis@deploy2002> Finished deploy [airflow-dags/analytics@ba61f77]: T351388 (duration: 01m 08s)

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:53:28Z] <btullis@deploy2002> Started deploy [airflow-dags/search@ba61f77]: T351388

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:53:53Z] <btullis@deploy2002> Finished deploy [airflow-dags/search@ba61f77]: T351388 (duration: 00m 29s)

Mentioned in SAL (#wikimedia-analytics) [2024-10-23T15:54:04Z] <btullis> deploying all airflow instances to pick up changes in T351388

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:54:20Z] <btullis@deploy2002> Started deploy [airflow-dags/research@ba61f77]: T351388

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:55:03Z] <btullis@deploy2002> Finished deploy [airflow-dags/research@ba61f77]: T351388 (duration: 00m 45s)

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:55:15Z] <btullis@deploy2002> Started deploy [airflow-dags/platform_eng@ba61f77]: T351388

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:55:44Z] <btullis@deploy2002> Finished deploy [airflow-dags/platform_eng@ba61f77]: T351388 (duration: 00m 31s)

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:55:59Z] <btullis@deploy2002> Started deploy [airflow-dags/analytics_product@ba61f77]: T351388

Mentioned in SAL (#wikimedia-operations) [2024-10-23T15:57:12Z] <btullis@deploy2002> Finished deploy [airflow-dags/analytics_product@ba61f77]: T351388 (duration: 01m 15s)