Page MenuHomePhabricator

Add a spark global config for better file commit strategy
Closed, DeclinedPublic

Description

See https://phabricator.wikimedia.org/T347076 for the whole context.

We wish to add a global spark config being:

mapreduce.fileoutputcommitter.algorithm.version = 2

This will allow spark to not fail when concurrent jobs write to the same table.

Event Timeline

BTullis triaged this task as High priority.
BTullis moved this task from Incoming to In Progress on the Data-Platform-SRE board.

Change 975006 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Set a non-default mapreduce file committer algorithm for spark

https://gerrit.wikimedia.org/r/975006

I have now implemented this via https://gerrit.wikimedia.org/r/975006

I used the parameter: spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version which I believe is correct.
It's worth noting that there is a comment here: https://spark.apache.org/docs/3.1.2/configuration.html#execution-behavior
...which leads to this bugreport about the v2 algorithm.

https://issues.apache.org/jira/browse/MAPREDUCE-7282

image.png (146×913 px, 20 KB)

Thanks a lot @BTullis - The problem you linked is indeed a known issue. We rely on hive-metastore and _SUCCESS files which should prevent the issue on prod jobs. For other jobs, this is indeed a problem.
I see 2 possible sultions here:

  • We keep the change you made and see if it causes problems.
  • We don't apply the change but provide a solution for people backfilling by telling them to use that parameter in their jobs.

I'm not sure which is best

After giving it a few more thought, it seems that NOT changing the parameter globally to enforce data-correctness in folders is the best idea. We would document in wikitech and airflow the concurrency issue and give the solution of that setting for backfilling times.
I'm interested for feedback if anyone thinks it's not the best idea :)

(IIRC, the change in the commit strategy proposed in v2 were made to better support object stores, given than a mv in HDFS is cheap and just metadata, while a mv in an object store typically maps to cp and rm.)

So I would be wary of letting go of correctness here to better support concurrent writes, which seems to not be common in our Hadoop instance after all?

If we have particular tables we'd like to write concurrently, couldn't we expedite their move to Iceberg, and thus kill two birds with one stone?

+1 for leaving writing to Hive tables alone (and erring towards correctness and jobs failing and hopefully comments that we can find)
+1 to instead focusing on the Iceberg migration

Not changing global config and instead just documenting the solution / how to use that setting temporarily for backfills sounds good to me. I think this is important when spinning up a new dataset because if each run is very expensive, there should be a way to minimize number of re-tries.

Alternatively what if we reconsider the decision made in T300870: Airflow concurrency limits and change max_active_runs_per_dag from 3 to 1. That decision was made to yield faster backfills and before the concurrency issue was known, so it makes sense to re-evaluate with the new knowledge.

https://gerrit.wikimedia.org/r/plugins/gitiles/operations/puppet/+/refs/heads/production/modules/profile/manifests/airflow.pp mentions

This is configurable at the DAG level with max_active_runs, which is defaulted as max_active_runs_per_dag.

Then we provide guidance to users for how to backfill efficiently and safely by making these two temporary changes for their DAGs:

  • Setting max_active_runs to 3 (overriding the global 1)
  • Enabling v2 algorithm

And if the user doesn't want to opt-in to that approach that's fine, they can just forget about it and Airflow will just steadily backfill one run at a time.

Change 975006 abandoned by Btullis:

[operations/puppet@production] Set a non-default mapreduce file committer algorithm for spark

Reason:

Decided against modifying the default settings.

https://gerrit.wikimedia.org/r/975006

Not changing global config and instead just documenting the solution / how to use that setting temporarily for backfills sounds good to me. I think this is important when spinning up a new dataset because if each run is very expensive, there should be a way to minimize number of re-tries.

Alternatively what if we reconsider the decision made in T300870: Airflow concurrency limits and change max_active_runs_per_dag from 3 to 1. That decision was made to yield faster backfills and before the concurrency issue was known, so it makes sense to re-evaluate with the new knowledge.

https://gerrit.wikimedia.org/r/plugins/gitiles/operations/puppet/+/refs/heads/production/modules/profile/manifests/airflow.pp mentions

This is configurable at the DAG level with max_active_runs, which is defaulted as max_active_runs_per_dag.

Then we provide guidance to users for how to backfill efficiently and safely by making these two temporary changes for their DAGs:

  • Setting max_active_runs to 3 (overriding the global 1)
  • Enabling v2 algorithm

And if the user doesn't want to opt-in to that approach that's fine, they can just forget about it and Airflow will just steadily backfill one run at a time.

I'm happy to change the default max_active_runs back to 1, or leave it at 3, depending on what you all think.

I have added documentation here: https://wikitech.wikimedia.org/wiki/Data_Engineering/Systems/Airflow/Developer_guide#Concurrent_DAG_Runs
I think we can go and change our global config to use 1 as the default parallelisation of dag_runs per dag.

Change 976700 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] airflow: change max_active_runs_per_dag back to 1

https://gerrit.wikimedia.org/r/976700

Change 976700 merged by Btullis:

[operations/puppet@production] airflow: change max_active_runs_per_dag back to 1

https://gerrit.wikimedia.org/r/976700

BTullis moved this task from Needs Review to Done on the Data-Platform-SRE board.

Declining as per discussion. I have reset the default value of max_active_runs_per_dag back to 1 as suggested.

Thanks, Ben!

I'd like to expand the section Joseph added. (Thank you, Joseph!) I have some questions/suggestions in https://wikitech.wikimedia.org/wiki/Talk:Data_Engineering/Systems/Airflow/Developer_guide#Documenting_Spark_config_changes_for_concurrent_runs if @JAllemandou and others with more knowledge of these things want to chime in.

It's in my plan to update the docs @mpopov, it just takes longer than I would like (like everything else I do lately).