Page MenuHomePhabricator

[M] Upgrade code base to Spark 3
Closed, ResolvedPublic

Description

Data-Engineering upgraded the analytics cluster to Spark 3, see T295072: Install spark3 in analytics clusters.

NOTE: Spark 2 support will be dropped after March 31st, 2023.

Tasks

  • in stat machines, switch to the latest conda environment setup
  • run the pipeline and check whether changes to the code base are needed
  • update the production conda-environment.yaml
  • add the pipeline to the list of jobs to migrate

Event Timeline

MarkTraceur renamed this task from Upgrade code base to Spark 3 to [M] Upgrade code base to Spark 3.Dec 1 2022, 5:57 PM
xcollazo changed the task status from Open to In Progress.Dec 6 2022, 4:12 PM

@xcollazo should I move this to the Code Review column on the workboard?

@CBogen the MR above is one part of the migration to Spark3. I still have to produce the changes to the Airflow DAG. To reflect that I moved it to "Doing".

The changes needed on Airflow side have been incorporated on @Cparle's draft MR at https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/136#note_14898. See link for details of changes.

This migration is now under review.

I have setup a meeting with @mfossati to discuss the perf degradation.

Below are my notes regarding the performance hit after migrating to Spark3. The TL;DR is that the query plan has changed and unfortunately Spark is now doing extra work that seems unnecesary, but this is difficult to override. Given our time constraints, my recommendation is to live with the perf hit.

(1) To repro the issue I did the following:

Ran the pipeline.py script as follows:

time python section_topics/pipeline.py 2022-12-05 --filter 2022-12-05_badly_parsed_revision_ids

(2) For Spark 3.1.2, got the following runtimes:

~90 mins
https://yarn.wikimedia.org/proxy/application_1663082229270_706297
~66mins
https://yarn.wikimedia.org/cluster/app/application_1663082229270_710061
~76mins
https://yarn.wikimedia.org/cluster/app/application_1663082229270_710369

For an average of: ~77 mins.

(3) For Spark 2.4.4, got the following runtimes:

~53mins
https://yarn.wikimedia.org/cluster/app/application_1663082229270_711086
~53mins
https://yarn.wikimedia.org/cluster/app/application_1663082229270_730355
~52mins
https://yarn.wikimedia.org/cluster/app/application_1663082229270_730480

For an average of: ~53 mins.

(4) A cursory check of the query plans for Spark2 and Spark3 first did not find any significant changes. I looked for the usual culprits: More Exchange nodes, more Sort nodes. They all matched.

(5) However, after deeper inspection, I found that a BatchEvalPython node for running the parse() UDF is added twice for the Spark 2 run, while it is added 4 times on Spark3. Relevant query plan section below. Notice how Spark3 pushed down the BatchEvalPython node down into the join.

(Note that Spark 3 modified the query plan layout so they are difficult to compare visually)

Spark2:

...
+- SortMergeJoin [wiki_db#18, lower(link#107)], [wiki_db#59, lower(page_title#43)], LeftOuter
   :- *(9) Sort [wiki_db#18 ASC NULLS FIRST, lower(link#107) ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(wiki_db#18, lower(link#107), 2048)
   :     +- *(8) Project [wiki_db#18, page_qid#84, section#93.index AS section_index#101, link#107]
   :        +- Generate explode(section#93.links), [wiki_db#18, page_qid#84, section#93], true, [link#107]
   :           +- *(7) Project [wiki_db#18, page_qid#84, section#93]
   :              +- Generate explode(pythonUDF0#602), [wiki_db#18, page_qid#84, pythonUDF0#602], false, [section#93]
   :                 +- BatchEvalPython [parse(wiki_db#18, revision_text#14)], [wiki_db#18, page_qid#84, revision_text#14, pythonUDF0#602]
   :                    +- *(6) Project [wiki_db#18, item_id#24 AS page_qid#84, revision_text#14]
   :                       +- *(6) SortMergeJoin [page_id#0L, wiki_db#18], [page_id#26L, wiki_db#25], Inner
   :                          :- *(3) Sort [page_id#0L ASC NULLS FIRST, wiki_db#18 ASC NULLS FIRST], false, 0
   :                          :  +- Exchange hashpartitioning(page_id#0L, wiki_db#18, 2048)
   :                          :     +- *(2) Project [wiki_db#18, page_id#0L, revision_text#14]
   :                          :        +- *(2) BroadcastHashJoin [wiki_db#18, revision_id#7L], [wiki_db#63, revision_id#64L], LeftAnti, BuildRight
   :                          :           :- *(2) Project [wiki_db#18, revision_id#7L, page_id#0L, revision_text#14]
   :                          :           :  +- *(2) Filter ((((isnotnull(page_namespace#1) && isnotnull(page_redirect_title#3)) && (page_namespace#1 = 0)) && (page_redirect_title#3 = )) && isnotnull(page_id#0L))
   :                          :           :     +- Scan hive wmf.mediawiki_wikitext_current [page_id#0L, page_namespace#1, page_redirect_title#3, revision_id#7L, revision_text#14, wiki_db#18], HiveTableRelation `wmf`.`mediawiki_wikitext_current`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [page_id#0L, page_namespace#1, page_title#2, page_redirect_title#3, page_restrictions#4, user_id#5L, user_text#6, revision_id#7L, revision_parent_id#8L, revision_timestamp#9, revision_minor_edit#10, revision_comment#11, revision_text_bytes#12L, revision_text_sha1#13, revision_text#14, revision_content_model#15, revision_content_format#16], [snapshot#17, wiki_db#18], [isnotnull(snapshot#17), (snapshot#17 = 2022-11), wiki_db#18 INSET ( ... REDACTED ... ), isnotnull(wiki_db#18)]
   :                          :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, bigint, true]))
   :                          :              +- *(1) Project [wiki_db#63, revision_id#64L]
   :                          :                 +- *(1) Filter ((wiki_db#63 INSET ( ... REDACTED ... ) && isnotnull(wiki_db#63)) && isnotnull(revision_id#64L))
   :                          :                    +- *(1) FileScan parquet [wiki_db#63,revision_id#64L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://analytics-hadoop/user/xcollazo/2022-12-05_badly_parsed_revision_ids], PartitionFilters: [], PushedFilters: [In(wiki_db, [tcywiki,pwnwiki,roa_rupwiki,fjwiki,tenwiki,gvwiki,mywiki,kuwiki,gdwiki,tlwiki,stqwi..., ReadSchema: struct<wiki_db:string,revision_id:bigint>
   :                          +- *(5) Sort [page_id#26L ASC NULLS FIRST, wiki_db#25 ASC NULLS FIRST], false, 0
   :                             +- Exchange hashpartitioning(page_id#26L, wiki_db#25, 2048)
   :                                +- *(4) Project [wiki_db#25, item_id#24, page_id#26L]
   :                                   +- *(4) Filter ((((isnotnull(page_namespace#28L) && wiki_db#25 INSET ( ... REDACTED ... )) && (page_namespace#28L = 0)) && isnotnull(page_id#26L)) && isnotnull(wiki_db#25))
   :                                      +- *(4) FileScan parquet wmf.wikidata_item_page_link[item_id#24,wiki_db#25,page_id#26L,page_namespace#28L,snapshot#30] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://analytics-hadoop/wmf/data/wmf/wikidata/item_page_link/snapshot=202..., PartitionCount: 1, PartitionFilters: [isnotnull(snapshot#30), (snapshot#30 = 2022-12-05)], PushedFilters: [IsNotNull(page_namespace), In(wiki_db, [tcywiki,pwnwiki,roa_rupwiki,fjwiki,tenwiki,gvwiki,mywiki..., ReadSchema: struct<item_id:string,wiki_db:string,page_id:bigint,page_namespace:bigint>
   +- *(11) Sort [wiki_db#59 ASC NULLS FIRST, lower(page_title#43) ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(wiki_db#59, lower(page_title#43), 2048)
         +- *(10) Project [wiki_db#59, page_title#43, page_redirect_title#44]
            +- *(10) Filter (((isnotnull(page_redirect_title#44) && isnotnull(page_namespace#42)) && (page_namespace#42 = 0)) && NOT (page_redirect_title#44 = ))
               +- Scan hive wmf.mediawiki_wikitext_current [page_namespace#42, page_redirect_title#44, page_title#43, wiki_db#59], HiveTableRelation `wmf`.`mediawiki_wikitext_current`, org.apache.hadoop.hive.serde2.avro.AvroSerDe, [page_id#41L, page_namespace#42, page_title#43, page_redirect_title#44, page_restrictions#45, user_id#46L, user_text#47, revision_id#48L, revision_parent_id#49L, revision_timestamp#50, revision_minor_edit#51, revision_comment#52, revision_text_bytes#53L, revision_text_sha1#54, revision_text#55, revision_content_model#56, revision_content_format#57], [snapshot#58, wiki_db#59], [isnotnull(snapshot#58), (snapshot#58 = 2022-11), wiki_db#59 INSET ( ... REDACTED ... ), isnotnull(wiki_db#59)]
...

Spark3:

...
   +- SortMergeJoin LeftOuter (35)
      :- * Sort (29)
      :  +- Exchange (28)
      :     +- * Project (27)
      :        +- Generate (26)
      :           +- * Project (25)
      :              +- Generate (24)
      :                 +- * Project (23)
      :                    +- BatchEvalPython (22)
      :                       +- * Project (21)
      :                          +- * SortMergeJoin Inner (20)
      :                             :- * Sort (13)
      :                             :  +- Exchange (12)
      :                             :     +- * Project (11)
      :                             :        +- * BroadcastHashJoin LeftAnti BuildRight (10)
      :                             :           :- * Project (5)
      :                             :           :  +- * Filter (4)
      :                             :           :     +- BatchEvalPython (3)
      :                             :           :        +- * Filter (2)
      :                             :           :           +- Scan hive wmf.mediawiki_wikitext_current (1)
      :                             :           +- BroadcastExchange (9)
      :                             :              +- * Filter (8)
      :                             :                 +- * ColumnarToRow (7)
      :                             :                    +- Scan parquet  (6)
      :                             +- * Sort (19)
      :                                +- Exchange (18)
      :                                   +- * Project (17)
      :                                      +- * Filter (16)
      :                                         +- * ColumnarToRow (15)
      :                                            +- Scan parquet wmf.wikidata_item_page_link (14)
      +- * Sort (34)
         +- Exchange (33)
            +- * Project (32)
               +- * Filter (31)
                  +- Scan hive wmf.mediawiki_wikitext_current (30)
...



(3) BatchEvalPython
Input [7]: [page_id#0L, page_namespace#1, page_redirect_title#3, revision_id#7L, revision_text#14, snapshot#17, wiki_db#18]
Arguments: [parse(wiki_db#18, revision_text#14)], [pythonUDF0#525]

(22) BatchEvalPython
Input [3]: [wiki_db#18, page_qid#84, revision_text#14]
Arguments: [parse(wiki_db#18, revision_text#14)], [pythonUDF0#526]

(150) BatchEvalPython
Input [8]: [page_id#394L, page_namespace#395, page_title#396, page_redirect_title#397, revision_id#401L, revision_text#408, snapshot#411, wiki_db#412]
Arguments: [parse(wiki_db#412, revision_text#408)], [pythonUDF0#541]

(161) BatchEvalPython
Input [6]: [wiki_db#412, revision_id#401L, page_qid#84, page_id#394L, page_title#396, revision_text#408]
Arguments: [parse(wiki_db#412, revision_text#408)], [pythonUDF0#542]

(6) Since the code on pipeline.py uses 'currying' to send runtime parameters to a PySpark UDF, I tried removing this in favor of a global variable to see if that was the cause of the Spark 3 plan change. This was not successful.

Conclusions:
I looked for flags and or special cases in Spark 3 regarding Python UDFs that could explain the behavior, but found none. At this point, the only way to change this behavior would be to implement our own optimizer rule that blocks the pushdown, which in my opinion is overkill. Since we want to hit a deadline, and since we only run this pipeline once per week, I think the best course of action is to just live with the performance hit.

Thank you so much for the extensive digging, @xcollazo ! That's really helpful to better understand what's under Spark's hood.
The parse UDF is a vital part of the pipeline: it does the main heavy lifting and will take even more responsibilities with T323505: [L] Exclude sections-tables from having section topics, T323597: [M] Exclude date format topics from section topics pipeline, and T323036: [L] Exclude media topics from section topics dataset , so it's very unfortunate that Spark 3 handles it worse than before.

I'll merge https://gitlab.wikimedia.org/repos/structured-data/section-topics/-/merge_requests/8 right after https://gitlab.wikimedia.org/repos/structured-data/section-topics/-/merge_requests/9.

The parse UDF is a vital part of the pipeline: it does the main heavy lifting and will take even more responsibilities

Ok, let's keep an eye on that query plan if the runtime becomes excessive.

mfossati updated the task description. (Show Details)

Code merged and list of jobs updated. Closing.

I looked into this a little because I wasn't sure why Spark was pushing down the parse UDF. Looking at the query plan for Spark 3, I think what's happening is that because the job does an explode on the UDF's result, Spark pushes down the UDF to filter out any rows with nulls or empty arrays early:

(3) BatchEvalPython
Input [7]: [page_id#0L, page_namespace#1, page_redirect_title#3, revision_id#7L, revision_text#14, snapshot#17, wiki_db#18]
Arguments: [parse(wiki_db#18, revision_text#14)], [pythonUDF0#552]

(4) Filter [codegen id : 3]
Input [8]: [page_id#0L, page_namespace#1, page_redirect_title#3, revision_id#7L, revision_text#14, snapshot#17, wiki_db#18, pythonUDF0#552]
Condition : ((size(pythonUDF0#552, true) > 0) AND isnotnull(pythonUDF0#552))

and then when it actually executes the generator (explode), it just reruns the UDF on all rows without filtering:

(22) BatchEvalPython
Input [3]: [wiki_db#18, page_qid#118, revision_text#14]
Arguments: [parse(wiki_db#18, revision_text#14)], [pythonUDF0#553]

(23) Project [codegen id : 8]
Output [3]: [wiki_db#18, page_qid#118, pythonUDF0#553]
Input [4]: [wiki_db#18, page_qid#118, revision_text#14, pythonUDF0#553]

(24) Generate
Input [3]: [wiki_db#18, page_qid#118, pythonUDF0#553]
Arguments: explode(pythonUDF0#553), [wiki_db#18, page_qid#118], false, [section#146]

If I replace the explode with explode_outer (which, unlike explode, does not filter out empty arrays or nulls and instead produces null) and filter afterwards for nulls, the UDF is no longer pushed down and the job goes from 4 BatchEvalPython nodes to 2. So if, in the future, this job needs to be optimized, this could be one thing to try.

Many thanks for spotting this, @MunizaA ! I'll make sure your feedback is tracked in our backlog.

Nice find @MunizaA!

Looking at the query plan for Spark 3, I think what's happening is that because the job does an explode on the UDF's result, Spark pushes down the UDF to filter out any rows with nulls or empty arrays early:

So this gets me thinking: if Spark3's plan is smart enough to recognize it has to push down, is the plan from Spark2 semantically incorrect?

@xcollazo I think the reason why Spark2 does not push this filter down is because it does not infer filters from generators as the optimizer rule InferFiltersFromGenerate in Spark 3.1.2 does not seem to exist in Spark 2.4.4

@xcollazo I think the reason why Spark2 does not push this filter down is because it does not infer filters from generators as the optimizer rule InferFiltersFromGenerate in Spark 3.1.2 does not seem to exist in Spark 2.4.4

Ah, there you go. Mistery solved.