Page MenuHomePhabricator

Implement a backfill job for the dumps hourly table
Closed, ResolvedPublic

Description

In T335860, we implemented a pyspark job that runs a MERGE INTO that transforms event data into a table that will eventually have all the mediawiki revision history.

This process currently ingests only recent events, and so we need a mechanism for backfilling it.

@Ottomata points to existing Spike effort to do this in Flink:

But now that we have a bit more experience with Iceberg and Spark MERGE INTO, I speculate a simple spark job manipulating wmf.mediawiki_history could do this without great effort. Table docs seem to suggest we got everything we need there, and so this will just reuse the same MERGE INTO pattern.

So in this spike we should:

  • Play with wmf.mediawiki_history on a Notebook, see if we indeed have what we need there.
  • Prototype a MERGE INTO that would ingest wmf.mediawiki_history into our hourly table (Also: figure out a better name for the target table other than 'hourly table'!).
  • Do a run, see how long it takes? ( Backfill should only be run sporadically, but we should tune it anyways for when it is needed )

Details

ReferenceSource BranchDest BranchAuthorTitle
repos/data-engineering/airflow-dags!495fix-ivy-issuemainxcollazoFix ivy issue on mediawiki dumps, and some cosmetic fixes.
repos/data-engineering/airflow-dags!484add-mediawiki-dumps-backfillmainxcollazoAdd DAG to backfill wmf_dumps.wikitext_raw.
repos/data-engineering/dumps/mediawiki-content-dump!4optimize-writesmainxcollazoRefactor `wikitext_raw` table to support backfilling
Customize query in GitLab

Related Objects

Event Timeline

xcollazo changed the task status from Open to In Progress.Jul 11 2023, 3:27 PM
xcollazo claimed this task.
xcollazo triaged this task as High priority.
xcollazo removed a project: Data Pipelines.

An issue I have bumped into while working on this:

While iterating on the MERGE INTO, we hit T342587, in which the MERGE job generates ~55000 small files

The old trick of adding a COALESCE hint did not fix the small files generated by the MERGE INTO. This is so because MERGE generates a custom query plan specific for Iceberg. The COALESCE is added, but not at the right node. Example query plan:

...
== Physical Plan ==
ReplaceData IcebergBatchWrite(table=spark_catalog.xcollazo_iceberg.wikitext_raw_rc1, format=AVRO), org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy$$Lambda$2463/2129057277@2ea82cef
+- Sort [wiki_db#40 ASC NULLS FIRST, icebergdaytransform(revision_timestamp#34) ASC NULLS FIRST], false, 0
   +- MergeInto MergeIntoParams(isnotnull(_row_from_source_#102),isnotnull(_row_from_target_#127),ArrayBuffer((s_changelog_kind#16 IN (insert,update) AND (s_meta_dt#17 >= row_last_update#41)), ((s_changelog_kind#16 = delete) ...
      +- SortMergeJoin [s_wiki_db#15, s_revision_id#7L], [wiki_db#40, revision_id#32L], FullOuter, (((revision_timestamp#34 >= 1682812800000000) AND (revision_timestamp#34 <= 1682899200000000)) OR ((revision_timestamp#34 >= 1682726400000000) AND (revision_timestamp#34 <= 1682812800000000)))
         :- *(5) Sort [s_wiki_db#15 ASC NULLS FIRST, s_revision_id#7L ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(s_wiki_db#15, s_revision_id#7L, 1024), ENSURE_REQUIREMENTS, [id=#166]
         :     +- *(4) Project [s_page_id#0L, s_page_namespace#1L, s_page_title#2, s_page_redirect_title#3, s_user_id#4L, s_user_text#5, s_user_is_visible#6, s_revision_id#7L, s_revision_parent_id#8L, s_revision_timestamp#9, s_revision_is_minor_edit#10, s_revision_comment#11, s_revision_comment_is_visible#12, s_revision_content_slots#13, s_revision_content_is_visible#14, s_wiki_db#15, s_changelog_kind#16, s_meta_dt#17, true AS _row_from_source_#102]
         :        +- Coalesce 2                                                                 <<<<<<<<<<<<<<<<<<<<<<<< COALESCE added as hint
         :           +- Project [_gen_alias_158#158L AS s_page_id#0L, _gen_alias_159#159L AS ...
         :              +- *(3) Filter (isnotnull(row_num#19) AND (row_num#19 = 1))
         :                 +- Window [row_number() windowspecdefinition(wiki_id#54, _gen_alias_154#154L, _w0#64 DESC NULLS LAST, changelog_priority#18 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_num#19], [wiki_id#54, _gen_alias_154#154L], [_w0#64 DESC NULLS LAST, changelog_priority#18 DESC NULLS LAST]
         :                    +- *(2) Sort [wiki_id#54 ASC NULLS FIRST, _gen_alias_154#154L ASC NULLS FIRST, _w0#64 DESC NULLS LAST, changelog_priority#18 DESC NULLS LAST], false, 0
         :                       +- Exchange hashpartitioning(wiki_id#54, _gen_alias_154#154L, 1024), ENSURE_REQUIREMENTS, [id=#153]
         :                          +- *(1) Project [changelog_kind#44, created_redirect_page#46.page_title AS ...
         :                             +- *(1) Filter (isnotnull(meta#48) AND NOT (meta#48.domain = canary))
         :                                +- FileScan parquet event.rc1_mediawiki_page_content_change[changelog_kind#44,created_redirect_page#46,dt#47,meta#48,page#49,performer#51,revision#53,wiki_id#54,datacenter#57,year#58L,month#59L,day#60L,hour#61L] Batched: false, DataFilters: [isnotnull(meta#48), NOT (meta#48.domain = canary)], Format: Parquet, Location:...
         +- *(17) Sort [wiki_db#40 ASC NULLS FIRST, revision_id#32L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(wiki_db#40, revision_id#32L, 1024), ENSURE_REQUIREMENTS, [id=#216]
...

We can go around this with rewrite_data_files(), but it is annoying. Turns out this is a known issue on Iceberg's MERGE INTO that has been solved, but on Spark 3.2+. See https://github.com/apache/iceberg/pull/6828. TLDR: This is fixed on Iceberg support for Spark 3.3, and backported to Iceberg support for Spark 3.2, but not Spark 3.1.

Although this is not a blocker, I believe it is a compelling reason to upgrade Spark.

We can pickup Spark 3.3.X, which is stable. Or we could go for Spark 3.4.1, which gives us the longest runway, but is presumably not as stable as 3.3 branch as it has been released in the last couple months.

Will open a separate ticket for this.

More fun:

I have a Spark configuration that seems to hit the sweet spot for backfilling enwiki via MERGE INTO. Executor and memory wise, it currently uses ~20% of cluster resources:

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.driver.memory": "32g",
        "spark.dynamicAllocation.maxExecutors": 64,
        "spark.executor.memory": "26g",
        "spark.executor.memoryOverhead": "6g",
        "spark.executor.cores": 4,
        "spark.sql.shuffle.partitions": 8192,
        "spark.shuffle.io.retryWait": "10s",

        # maxResultSize default is 1g, and its giviing us problems with MERGE INTO tasks
        "spark.driver.maxResultSize": "8g",

        # extra settings as per https://wikitech.wikimedia.org/wiki/Data_Engineering/Systems/Cluster/Spark#Executors
        "spark.shuffle.file.buffer": "1m",
        "spark.shuffle.service.index.cache.size": "256m",
        "spark.io.compression.lz4.blockSize": "512KB"
    }
)

These settings make the 2nd shuffle from the MERGE INTO (which is the one that sorts and writes files) work well, with minimum spilling and reasonable progress. The shuffle was holding steady at 1253/8192 (15%) done, with an ETA of ~13 hours to complete. Unfortunately, I calculated the MERGE INTO would have generated ~35M files when done. This is untenable for our current NameNode heap limitations.

So I killed the job. As shared separately on T338057#9096980, I will now attempt to run Spark 3.3.2 on our cluster to see whether indeed this problem is solved with a more recent Spark version.

After 7 takes I got to be able to spawn a succesufl jupyterlab instance with Spark 3.3.2 via:

export http_proxy=http://webproxy:8080
export https_proxy=http://webproxy:8080
no_proxy=127.0.0.1,::1,localhost,.wmnet,.wikimedia.org,.wikipedia.org,.wikibooks.org,.wikiquote.org,.wiktionary.org,.wikisource.org,.wikispecies.org,.wikiversity.org,.wikidata.org,.mediawiki.org,.wikinews.org,.wikivoyage.org
export HTTP_PROXY=$http_proxy
export HTTPS_PROXY=$https_proxy
export NO_PROXY=$no_proxy

# installing pyspark=3.3.2 on top of conda-analytics was giving me all kind of troubles while resolving deps. We should really move forward the mamba solver.
# anyhow, we can cheat by doing a brand new conda env:
# the below copies needed bits from https://gitlab.wikimedia.org/repos/data-engineering/conda-analytics/-/blob/main/conda-environment.yml as verbatim as possible so that we don't make jupyterhub unhappy
source /opt/conda-analytics/etc/profile.d/conda.sh
conda create -n spark33-take-7 python=3.10.8 pyspark=3.3.2 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0
pip install git+https://github.com/wikimedia/wmfdata-python.git@v2.0.0

However, wmfdata was not happy:

File ~/.conda/envs/spark33-take-7/lib/python3.10/site-packages/wmfdata/presto.py:15
      8 from wmfdata.utils import (
      9     check_kerberos_auth,
     10     ensure_list
     11 )
     13 # Disable a warning issued by urllib3 because the certificate for an-coord1001.eqiad.wmnet
     14 # does not specify the hostname in the subjectAltName field (T158757)
---> 15 SubjectAltNameWarning = urllib3.exceptions.SubjectAltNameWarning
     16 urllib3.disable_warnings(SubjectAltNameWarning)
     18 def run(commands, catalog="analytics_hive"):

AttributeError: module 'urllib3.exceptions' has no attribute 'SubjectAltNameWarning'

Looks like urllib got bumped:

(spark33-take-7) xcollazo@stat1007:~$ conda list | grep urllib
urllib3                   2.0.4              pyhd8ed1ab_0    conda-forge

original from https://gitlab.wikimedia.org/repos/data-engineering/conda-analytics/-/blob/main/conda-environment.lock.yml#L193:

- urllib3=1.26.11

So fixed urllib3=1.26.11. Take 8! :

conda create -n spark33-take-8 python=3.10.8 pyspark=3.3.2 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0 urllib3=1.26.11
conda activate spark33-take-8
pip install git+https://github.com/wikimedia/wmfdata-python.git@v2.0.0

Now Spark was unhappy because there were no confs:

(spark33-take-8) xcollazo@stat1007:~/.conda/envs/spark33-take-8/lib/python3.10/site-packages/pyspark$ pwd
/home/xcollazo/.conda/envs/spark33-take-8/lib/python3.10/site-packages/pyspark
ln -s /etc/spark3/conf conf

Now iceberg is not happy:

...
23/08/16 21:44:52 WARN SparkSession: Cannot use org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions to configure session extensions.
java.lang.ClassNotFoundException: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
...

So let's copy Iceberg jar over:

# extras to make Iceberg work on 3.3.2:
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1"

Progress. log4j seems unhappy, but the spark instance is running, so just for documenting so that we pursue this later:

yarn logs -appOwner xcollazo -applicationId application_1688722260742_237626
...
log4j:ERROR Could not instantiate appender named "RFA".
log4j:ERROR Could not instantiate class [org.apache.log4j.rolling.RollingFileAppender].
java.lang.ClassNotFoundException: org.apache.log4j.rolling.RollingFileAppender
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
...
mkdir newconf
cp ./conf/* newconf
rm conf
mv newconf/ conf

I then commented out spark.yarn.archive from spark-defaults.conf. Got desired behavior:

...
23/08/17 19:05:09 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
...

This gave me a Yarn Spark app with no ERRORs, no significant WARNs other warns. For the record, here is winning PySpark Spark Session definition:

import wmfdata

# this seems like a winning config!
# The 2nd shuffle is holding steady at 1253/8192 (15%) done.
# Unfortunately, I calculated the MERGE INTO would generate ~35M files when done.
# so I will kill it, and retry with Spark 3.3.2
spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.driver.memory": "32g",
        "spark.dynamicAllocation.maxExecutors": 64,
        "spark.executor.memory": "26g",
        "spark.executor.memoryOverhead": "6g",
        "spark.executor.cores": 4,
        "spark.sql.shuffle.partitions": 8192,
        "spark.shuffle.io.retryWait": "10s",

        # maxResultSize default is 1g, and its giviing us problems with MERGE INTO tasks
        "spark.driver.maxResultSize": "8g",

        # extra settings as per https://wikitech.wikimedia.org/wiki/Data_Engineering/Systems/Cluster/Spark#Executors
        "spark.shuffle.file.buffer": "1m",
        "spark.shuffle.service.index.cache.size": "256m",
        "spark.io.compression.lz4.blockSize": "512KB",

        # extras to make Iceberg work on 3.3.2:
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
        # we need to clear spark.yarn.archive so that we don't pick up old 3.1.2 one
        # for this I commented out spark.yarn.archive from spark-defaults.conf
    }
)

Attempted to run the MERGE INTO, but then found out that the external shuffle server is not happy with my Spark 3.3 executors:

yarn logs -appOwner xcollazo -applicationId application_1688722260742_237781

...
23/08/17 19:25:19 ERROR YarnCoarseGrainedExecutorBackend: Executor self-exiting due to : Unable to create executor due to Unable to register with external shuffle server due to : java.io.IOException: Connection from analytics1077.eqiad.wmnet/10.64.53.33:7337 closed
org.apache.spark.SparkException: Unable to register with external shuffle server due to : java.io.IOException: Connection from analytics1077.eqiad.wmnet/10.64.53.33:7337 closed
...

The job cannot secure executors, so it does not progress. However!, though I cannot confirm with hard data yet, the query plan does shows that we get exactly what we wanted to see in terms of data compaction:

== Physical Plan ==
ReplaceData (33)
+- AdaptiveSparkPlan (32)                     <<<<<<<<<< This is good!
   +- == Current Plan ==
      Sort (17)
      +- Exchange (16)
         +- Project (15)
            +- MergeRows (14)                 <<<<<<<<<< This is good!
               +- Sort (13)
                  +- SortMergeJoin FullOuter (12)
                     :- Sort (6)
                     :  +- ShuffleQueryStage (5)
                     :     +- Exchange (4)
                     :        +- * Project (3)
                     :           +- * Project (2)
                     :              +- BatchScan (1)
                     +- Sort (11)
                        +- ShuffleQueryStage (10)
                           +- Exchange (9)
                              +- * Project (8)
                                 +- Scan hive wmf.mediawiki_wikitext_history (7)

Ok then, let's disable the external shuffle server, hard code the executors to 64 and retry via:

import wmfdata

# this seems like a winning config!
# The 2nd shuffle is holding steady at 1253/8192 (15%) done.
# Unfortunately, I calculated the MERGE INTO would generate ~35M files when done.
# so I will kill it, and retry with Spark 3.3.2
spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.driver.memory": "32g",
        "spark.dynamicAllocation.maxExecutors": 64,
        "spark.executor.memory": "26g",
        "spark.executor.memoryOverhead": "6g",
        "spark.executor.cores": 4,
        "spark.sql.shuffle.partitions": 8192,
        "spark.shuffle.io.retryWait": "10s",

        # maxResultSize default is 1g, and its giviing us problems with MERGE INTO tasks
        "spark.driver.maxResultSize": "8g",

        # extra settings as per https://wikitech.wikimedia.org/wiki/Data_Engineering/Systems/Cluster/Spark#Executors
        "spark.shuffle.file.buffer": "1m",
        "spark.shuffle.service.index.cache.size": "256m",
        "spark.io.compression.lz4.blockSize": "512KB",

        ##
        # extras to make Iceberg work on 3.3.2:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
        # we need to clear spark.yarn.archive so that we don't pick up old 3.1.2 one
        # for this I commented out spark.yarn.archive from spark-defaults.conf
        # 3.1 Shuffle service is not happy with Spark 3.3, so have to disable it (and dynamic allocation)
        "spark.shuffle.service.enabled": False,
        "spark.dynamicAllocation.enabled": False,
        "spark.executor.instances": 64,
    }
)

The MERGE INTO is now happily running! Finally! It will take long hours to run, so will report whether the small files problem is solved in a separate update.

The MERGE INTO is now happily running! Finally! It will take long hours to run, so will report whether the small files problem is solved in a separate update.

Outstanding work! And such a useful update, containing all of your troubleshooting steps along the way. Thanks @xcollazo.

Is there a reason why you selected 3.3.2 over 3.4.1 for this testing, or is it still an open question?

Awesome testing @xcollazo :) Thank you so much for the effort!

Outstanding work! And such a useful update, containing all of your troubleshooting steps along the way. Thanks @xcollazo.

Making it work was surprisingly fun! Let's sync up later to make the upgrade smooth for other folks as well.

Is there a reason why you selected 3.3.2 over 3.4.1 for this testing, or is it still an open question?

In my first iteration I was not only bumping to Spark 3.4, but also bumping a bunch other conda dependencies. I got bit by this due to conflicts. I omitted all those details above to keep the summary readable. I don't think Spark 3.4 would have particularly given me more trouble, but given the conflicts, decided to conservatively bump to just Spark 3.3 so that I could confirm the fix for my issue rather than get myself in more trouble.

If you think there is value in confirming on 3.4, I can do that as well?

Ok so my back fill MERGE INTO failed due to a bunch of:

FetchFailed(null, shuffleId=1, mapIndex=-1, mapId=-1, reduceId=-1, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Unable to deserialize broadcasted map statuses for shuffle 1: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_20_piece20 of broadcast_20
	at org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$7(MapOutputTracker.scala:1432)
	at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
	at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:1420)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1286)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:1256)
...

This was on the 2nd shuffle, were it had failed before, but which I thought I had tuned properly. Presumably this happens because spark.shuffle.service.enabled = False? Not sure, and will debug separately, but for the purpose of validating whether Spark 3.3 solves the small files issue, I am going to pivot to observe behavior from the Streaming MERGE INTO.

Here is an example fiel count of running 48 hours of streaming MERGE INTOs:

$ hdfs dfs -count -v /user/hive/warehouse/xcollazo_iceberg.db/wikitext_raw_rc1/data
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
   DIR_COUNT   FILE_COUNT       CONTENT_SIZE PATHNAME
        1844         6598         8370525858 /user/hive/warehouse/xcollazo_iceberg.db/wikitext_raw_rc1/data

Here is an example of how files look:

$ pwd
/mnt/hdfs/user/hive/warehouse/xcollazo_iceberg.db/wikitext_raw_rc1/data/wiki_db=enwiki/revision_timestamp_day=2023-08-18
$ ls -lsha
total 1.7G
4.0K drwxrwx--- 13 xcollazo analytics-privatedata-users 4.0K Aug 18 12:46 .
4.0K drwxrwx--- 10 xcollazo analytics-privatedata-users 4.0K Aug 18 12:46 ..
 73M -rw-r-----  1 xcollazo analytics-privatedata-users  73M Aug 18 12:37 07845-152782-92a6b8e0-9a27-4dab-9f14-cf052414f8d1-00001.avro
137M -rw-r-----  1 xcollazo analytics-privatedata-users 137M Aug 18 12:38 07845-161818-c4a01106-86a4-4bbd-8955-742755dae24e-00001.avro
193M -rw-r-----  1 xcollazo analytics-privatedata-users 193M Aug 18 12:39 07845-171035-21e06efa-503d-4dfb-b1c3-5eeb4269c10d-00001.avro
 50M -rw-r-----  1 xcollazo analytics-privatedata-users  50M Aug 18 12:40 07845-180020-926ec518-4d57-4740-867e-70cdbcb9db72-00001.avro
103M -rw-r-----  1 xcollazo analytics-privatedata-users 103M Aug 18 12:41 07845-189018-fdf9246f-9f46-422a-8ec4-83040c02051f-00001.avro
 51M -rw-r-----  1 xcollazo analytics-privatedata-users  51M Aug 18 12:41 07845-197973-f80d9412-7a5c-4003-959e-70785fd0a77d-00001.avro
 49M -rw-r-----  1 xcollazo analytics-privatedata-users  49M Aug 18 12:42 07845-206995-09c9190f-ec61-477b-baa9-ef885c5b5efb-00001.avro
461M -rw-r-----  1 xcollazo analytics-privatedata-users 461M Aug 18 12:44 07845-216060-df5b8777-d34e-4d1d-b688-cb7939d439e5-00001.avro
513M -rw-r-----  1 xcollazo analytics-privatedata-users 513M Aug 18 12:46 07845-225163-ef528990-69c2-44ee-92bd-11e192373f9c-00001.avro
6.2M -rw-r-----  1 xcollazo analytics-privatedata-users 6.2M Aug 18 12:46 07845-225163-ef528990-69c2-44ee-92bd-11e192373f9c-00002.avro
 69M -rw-r-----  1 xcollazo analytics-privatedata-users  69M Aug 18 12:47 07845-234345-8db75db9-de0a-4405-ad95-841b7a97eeb3-00001.avro

This is exactly what we were hoping for! Instead of thousands of ~50kb files, we now have 11 files ranging from ~6 to ~500mb. Note that the files are not perfectly 256MBs as if we had called rewrite_data_files(), but we don't have the small files problem which is what we were looking for.

Anecdotically, because the query planning is much simpler now (way less files to consider), we can also query the table faster. The MERGE INTO also anecdotically ran in just a couple minutes each, but I believe that is unrelated to Spark 3.3 and more to do with the fact that we have the fix for T341134 in now.

xcollazo renamed this task from [Spike] Implement a backfill job for the hourly table in Spark to Implement a backfill job for the hourly table in Spark.Aug 21 2023, 4:41 PM
xcollazo renamed this task from Implement a backfill job for the hourly table in Spark to Implement a backfill job for the dumps hourly table that can handle enwiki.Aug 22 2023, 2:23 PM
xcollazo moved this task from Sprint Backlog to BLOCKED on the Data Products (Sprint 00) board.
xcollazo renamed this task from Implement a backfill job for the dumps hourly table that can handle enwiki to Implement a backfill job for the dumps hourly table.Aug 22 2023, 2:30 PM

I have progressed in tuning the MERGE INTO for enwiki, but I continue to face FetchFailed exceptions, even with heavy retrying and with bumping executor memory to the maximum possible in our Yarn config:

...
        "spark.executor.memory": "42g", # let's try another combination since I'm running into trouble on Spark 3.3.2
        "spark.executor.memoryOverhead": "6g",
        "spark.executor.cores": 4,
        "spark.sql.shuffle.partitions": 65536,
        "spark.shuffle.io.retryWait": "15s",
        "spark.shuffle.io.maxRetries": "15",
...

I discussed this with @Milimetric and @Antoine_Quhen and we came up with the following alternatives:

  • Consider if we can run the enwiki backfill per year.
  • Given there doesn't seem to be heap memory pressure, consider if a different ratio of spark.executor.memory and spark.executor.memoryOverhead is more appropriate.

I'm interested to have a quicck chat about this.
My gut feeling tells me it could be skew related :)

I'm interested to have a quicck chat about this.
My gut feeling tells me it could be skew related :)

AFAICT, there is no skew and the partitions look very even. But happy to review all this! Will grab some time on your calendar.

Right now you can get a glimpse of a successful MERGE INTO for enwiki for 2006-01 at https://yarn.wikimedia.org/proxy/application_1688722260742_266963/SQL/execution/?id=0, but we have no history service so that will be gone soon :(

Consider if we can run the enwiki backfill per year.

I am actually making good progress on this one. This is the revision count split on enwiki:

+--------+--------------------------+
|count   |year(s_revision_timestamp)|
+--------+--------------------------+
|74164   |2001                      |
|615317  |2002                      |
|1585332 |2003                      |
|6887962 |2004                      |
|20290268|2005                      |
|55832475|2006                      |
|72988546|2007                      |
|72387449|2008                      |
|67698352|2009                      |
|64458748|2010                      |
|57944246|2011                      |
|56965999|2012                      |
|53527702|2013                      |
|47910730|2014                      |
|53495988|2015                      |
|56389372|2016                      |
|56987104|2017                      |
|54870638|2018                      |
|54645614|2019                      |
|61123653|2020                      |
|62769112|2021                      |
|65737033|2022                      |
|31541448|2023                      |
+--------+--------------------------+

I've backfilled from 2001 to 2005 with no issues, but 2006 failed (see revision count above).

I am now trying backfills for enwiki on a per month basis, and so far so good, as the revision count is much more managable:

select count(1) as count, substring(revision_timestamp, 1, 7) as year_month from (
SELECT
    to_timestamp(revision_timestamp)    AS revision_timestamp
    FROM wmf.mediawiki_wikitext_history s
    WHERE snapshot = '2023-06'
    AND wiki_db IN ('enwiki')
    AND year(revision_timestamp) in(2006,2007,2008)
)
GROUP by 2
ORDER BY 2 ASC


count	year_month
3660740	2006-01
3547601	2006-02
4093508	2006-03
3971815	2006-04
4561751	2006-05
4543033	2006-06
4746652	2006-07
5353728	2006-08
4911169	2006-09
5378453	2006-10
5513727	2006-11
5550298	2006-12
6181999	2007-01
6099719	2007-02
6711879	2007-03
6706054	2007-04
6569806	2007-05
5849414	2007-06
5819057	2007-07
5725497	2007-08
5749013	2007-09
6156859	2007-10
5772493	2007-11
5646756	2007-12
6230637	2008-01
6038910	2008-02
6684857	2008-03
6218755	2008-04
6239168	2008-05
5846449	2008-06
5784692	2008-07
5816747	2008-08
6095753	2008-09
6203417	2008-10
5663360	2008-11
5564704	2008-12
Time taken: 847.588 seconds, Fetched 36 row(s)

The caveat here is that we have to do full table scans of the source table wmf.mediawiki_wikitext_history for each month of each year. This is incredibly wasteful, but until I can go around the FetchFailed exception (maybe via T338057#9114257?), this will have to do.

(Further work on this task is blocked until we have T344910. In the mean time we will focus on getting simplewiki working end to end via T344709.)

Even though we still don't have T344910, we have had significant progress in this task.

After a conversation about FetchFailed with @JAllemandou, he recommended I try lowering per executor resources, with the rationale being that perhaps the cluster hard disks and/or network were being saturated and lowering the load could help. Well that worked! I've just submitted for review a configuration that works for ingesting on a yearly cadence. Details at https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/484. But in summary, the winning combination is as follows:

# this seems like a winning config!
spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.driver.memory": "32g",
        "spark.executor.memory": "20g",
        "spark.executor.memoryOverhead": "4g",
        "spark.executor.cores": 2,
        "spark.sql.shuffle.partitions": 65536,
        "spark.shuffle.io.retryWait": "15s",
        "spark.shuffle.io.maxRetries": "15",
        "spark.network.timeout": "600s",

        # maxResultSize default is 1g, and its giviing us problems with MERGE INTO tasks
        "spark.driver.maxResultSize": "8g",

        # extra settings as per https://wikitech.wikimedia.org/wiki/Data_Engineering/Systems/Cluster/Spark#Executors
        "spark.shuffle.file.buffer": "1m",
        "spark.shuffle.service.index.cache.size": "256m",
        "spark.io.compression.lz4.blockSize": "512KB",

        ##
        # extras to make Iceberg work on 3.3.2:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
        # we need to clear spark.yarn.archive so that we don't pick up old 3.1.2 one
        # for this I commented out spark.yarn.archive from spark-defaults.conf
        # 3.1 Shuffle service is not happy with Spark 3.3, so have to disable it (and dynamic allocation)
        "spark.shuffle.service.enabled": False,
        "spark.dynamicAllocation.enabled": False,
        "spark.executor.instances": 105,

        # extra settings to mmke adaptive query execution work in our favor
        "spark.sql.adaptive.coalescePartitions.enabled" : False,
    }
)

The way we currently run is in 4 groups, as follows:

groups = [
    {"name": "group0", "exclusive": False, "members": ["enwiki"]},  # (size is 100% of revisions of enwiki)
    {"name": "group1", "exclusive": False, "members": ["wikidatawiki"]},  # (size is 71%  of revisions of enwiki)
    {"name": "group2", "exclusive": False, "members": ["commonswiki"]},  # (size is 42%  of revisions of enwiki)
    {
        "name": "group3",
        "exclusive": True,
        "members": ["enwiki", "wikidatawiki", "commonswiki"],
    },  # (size is 97%  of revisions of enwiki)
]

The per year jobs still struggle depending on year and group, but ultimately they succeed. A couple example runs:

# commonswiki 2022 - successful in ~8mins, no intermediate failures
# all wikis but "enwiki", "wikidatawiki", "commonswiki", "simplewiki" 2022 - successful in ~2.5h, many intermedidiate FetchFailed exceptions
# enwiki 2006 - successful in ~2.5h, many intermedidiate FetchFailed exceptions
# wikidatawiki 2022 -  successful in ~2.7h, no intermediate failures, lots of spilling on last sort, looks like a skew issue.

Adding some further notes on Spark assembly fix for completeness:

As we had found over at T340861#9100284, the Spark 3.1 assembly is incompatible with Spark 3.3. The incompatibility that triggers a stack is log4j, which was upgraded from 1.x line to 2.x in between those two releases. The easiest fix is to just not set spark.yarn.archive so that it gets automatically generated. However, in production right now we can only pickup confs at /etc/spark3/conf/spark-defaults.conf, and they specify the Spark 3.1 assembly:

xcollazo@stat1007:~/artifacts$ cat /etc/spark3/conf/spark-defaults.conf | grep yarn.archive
spark.yarn.archive                                  hdfs:///user/spark/share/lib/spark-3.1.2-assembly.jar

Thus, we have to roll our own:

tar -xzvf mediawiki-content-dump-0.1.0.dev0-change-strategy-to-years.conda.tgz 
cd lib/python3.10/site-packages/pyspark/jars
zip -r ~/artifacts/spark-3.3.2-assembly.zip .
hdfs dfs -copyFromLocal ~/artifacts/spark-3.3.2-assembly.zip /user/xcollazo/artifacts
hdfs dfs -chmod +r /user/xcollazo/artifacts/spark-3.3.2-assembly.zip

And then make it part of the Spark job conf:

conf={
    ...
    "spark.yarn.archive": "hdfs:///user/xcollazo/artifacts/spark-3.3.2-assembly.zip",  # override 3.1's assembly
},

I've updated T344910 so that as part of that work we also make 'official' assemblies available for Spark 3.3.2.

Mentioned in SAL (#wikimedia-operations) [2023-09-14T18:27:04Z] <xcollazo@deploy1002> Started deploy [airflow-dags/analytics@7160e27]: Deploy latest DAGs to analytics Airflow instance T340861

Mentioned in SAL (#wikimedia-operations) [2023-09-14T18:27:45Z] <xcollazo@deploy1002> Finished deploy [airflow-dags/analytics@7160e27]: Deploy latest DAGs to analytics Airflow instance T340861 (duration: 00m 40s)

Mentioned in SAL (#wikimedia-analytics) [2023-09-14T18:28:54Z] <xcollazo> Deployed latest DAGs to analytics Airflow instance T340861

The results we got with the intermediate table are great, giving us a fully backfilled table in ~48 hours:

Ok folks we have a fully backfilled wmf_dumps.wikitext_raw_rc1 table!

Here is runtime data, without considering the failures that we fixed on the way:

TaskTime
wait_for_data_in_mw_wikitext_history0:00:03
wait_for_data_in_raw_mediawiki_revision0:00:04
spark_create_intermediate_table18:05:18
spark_backfill_merge_into_20010:01:23
spark_backfill_merge_into_20020:03:25
spark_backfill_merge_into_20030:06:47
spark_backfill_merge_into_20040:11:37
spark_backfill_merge_into_20050:31:24
spark_backfill_merge_into_20060:55:03
spark_backfill_merge_into_20071:16:07
spark_backfill_merge_into_20081:24:49
spark_backfill_merge_into_20090:46:01
spark_backfill_merge_into_20100:43:58
spark_backfill_merge_into_20111:36:13
spark_backfill_merge_into_20120:49:18
spark_backfill_merge_into_20130:48:45
spark_backfill_merge_into_20140:46:30
spark_backfill_merge_into_20152:58:49
spark_backfill_merge_into_20161:15:22
spark_backfill_merge_into_20171:31:38
spark_backfill_merge_into_20181:41:19
spark_backfill_merge_into_20194:08:39
spark_backfill_merge_into_20202:04:40
spark_backfill_merge_into_20212:22:15
spark_backfill_merge_into_20222:16:11
spark_backfill_merge_into_20231:47:22
drop_intermediate_table0:00:25

Total time: 48:13:25

It would be great to also have T344910, but given the results so far without the Spark external shuffler, we cannot call this task blocked anymore. So I will close this as done, and once we have T344910 we will come back and clean up the tasks, especially around having elastic resource use instead of fixed.