Page MenuHomePhabricator

Figure whether we can lower the query planning time for wmf_dumps.wikitext_raw
Closed, ResolvedPublic

Description

In T369868#10142190, we made a benchmark on top of wmf_dumps.wikitext_raw, and found out the following query planning times:

# With 8 driver cores, 66k files scanned, read 47M rows, 148k splits,  4.4GB
# Elapsed time: 303.5358393192291 seconds  (120 query, 183 planning)
# Elapsed time: 301.08918023109436 seconds (102 query, 199 planning)
# Elapsed time: 286.9110209941864 seconds  (90 query, 196 planning)
# Elapsed time: x seconds (x query, x planning)
for i in range(0,3):
    start_time = time.time()
    spark.sql("""
    SELECT count(1) as count
    FROM wmf_dumps.wikitext_raw_rc2
    WHERE

    (wiki_db = 'enwiki' AND page_id IN (12924534, 29687986, 35328557, 73692977, 74530252, 74530254, 75962364, 75962367, 75971447, 75971928, 75977325, 75985100, 75985872, 76310582, 76310588, 76310589, 77328875, 77478992, 77479936, 77480638, 77480639, 77486461, 77486512, 77488096, 77488322, 77488407, 77488486))

    """).show(20)
    end_time = time.time()
    print(f"Elapsed time: {end_time - start_time} seconds")

~190 seconds is a lot of time spent on query planning, especially considering the actual query time is ~100.

A quick investigation shows that:

  1. The Iceberg java threadpool is doing the right thing as per code. I can confirm this on a thread dump.
  1. Most all the Iceberg threadpool threads are waiting on:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.hadoop.util.concurrent.AsyncGet$Util.wait(AsyncGet.java:59)
org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1600)
org.apache.hadoop.ipc.Client.call(Client.java:1558)
org.apache.hadoop.ipc.Client.call(Client.java:1455)
org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
com.sun.proxy.$Proxy30.getBlockLocations(Unknown Source)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:333)
sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) => holding Monitor(org.apache.hadoop.io.retry.RetryInvocationHandler$Call@768716090})
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
com.sun.proxy.$Proxy31.getBlockLocations(Unknown Source)
org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:900)
org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:889)
org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:946)
org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:288)
org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:285)
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:295)
org.apache.iceberg.hadoop.HadoopInputFile.getBlockLocations(HadoopInputFile.java:210)
org.apache.iceberg.hadoop.Util.blockLocations(Util.java:111)
org.apache.iceberg.hadoop.Util.blockLocations(Util.java:84)
org.apache.iceberg.spark.source.SparkInputPartition.<init>(SparkInputPartition.java:62)
org.apache.iceberg.spark.source.SparkBatch.lambda$planInputPartitions$0(SparkBatch.java:90)
org.apache.iceberg.spark.source.SparkBatch$$Lambda$2759/410160600.run(Unknown Source)
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:69)
org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:750)

This points to Iceberg's org.apache.iceberg.hadoop.HadoopInputFile.getBlockLocations(HadoopInputFile.java:210) being a bottleneck.

In this task we should:

Event Timeline

Reedy renamed this task from Figure whether we can lower the query planing time for wmf_dumps.wikitext_raw to Figure whether we can lower the query planning time for wmf_dumps.wikitext_raw.Sep 13 2024, 1:44 AM

Found the root cause: for each of our query plannings, we are fetching HDFS's BlockLocation objects for all blocks of all of the ~66K files that compose the enwiki partition of wmf_dumps.wikitext_raw_rc2. This process simply takes time. However, we get very little benefit out of it since:

  • The query planning time overwhelms the actual query time.
  • Even though we do open all files as discussed in T369868#10142190, for most of them we only read a couple bytes out from the footers of the file and row groups.

Although not documented, Iceberg's Spark support does have a flag to disable locality calculations for Spark 3.3+: spark.sql.iceberg.locality.enabled.

Let's rerun the query from description above.

Before setting this flag:

# With 8 driver cores, 66k files scanned, read 47M rows, 148k splits,  4.4GB
# Elapsed time: 303.5358393192291 seconds  (120 query, 183 planning)
# Elapsed time: 301.08918023109436 seconds (102 query, 199 planning)
# Elapsed time: 286.9110209941864 seconds  (90 query, 196 planning)
# Elapsed time: x seconds (x query, x planning)

After:

# With 4 driver cores, 66k files scanned, read 48M rows, 149k splits,  4.4GB
# Elapsed time: 101.09043502807617 seconds (96 query, 5 planning)
# Elapsed time: 50.458165884017944 seconds (49 query, 1 planning)
# Elapsed time: 35.843684673309326 seconds (35 query, .5 planning)

Bingo!

xcollazo moved this task from Incoming to Kanban Board on the Dumps 2.0 board.
xcollazo edited projects, added Dumps 2.0 (Kanban Board); removed Dumps 2.0.
xcollazo moved this task from Sprint Backlog to In Process on the Dumps 2.0 (Kanban Board) board.

TL;DR: spark.sql.iceberg.locality.enabled is all we need, no need for fancy caching.

TL;DR: spark.sql.iceberg.locality.enabled is all we need, no need for fancy caching.

Turns out that we have had a little RPC storm since we set spark.sql.iceberg.locality.enabled = false for our dumps_merge_events_to_wikitext_raw DAG.

Here is the behavior we have seen since we set this in production, with the red arrow pointing to when I turned the DAG off:

Screenshot 2024-10-10 at 12.18.20 PM.png (1×1 px, 131 KB)

On hindsight, this behavior makes sense: We used to have the default of spark.sql.iceberg.locality.enabled = true, which means that for this use case, the vast majority of HDFS reads would be local, since the task and the files would be colocated. Thus, HDFS would short circuit the reads.

Now with spark.sql.iceberg.locality.enabled = false, that is not the case anymore, and the job requires much more RPCs. Additionally, that job reads many, many files now that it has 3 separate spark jobs.

In conclusion, we need to be more careful in the future about use cases for spark.sql.iceberg.locality.enabled = false.

For this particular issue though, we are trying to revamp this whole DAG anyway via T375402#10203228, so this storm should not continue for long.

About the RPC storm: I have found some documentation about new settings we should use for our HDFS namenode: https://community.cloudera.com/t5/Community-Articles/Scaling-the-HDFS-NameNode-part-3-RPC-scalability-features/ta-p/246719

@BTullis will the SRE team have time to try to implement this?

About the RPC storm: I have found some documentation about new settings we should use for our HDFS namenode: https://community.cloudera.com/t5/Community-Articles/Scaling-the-HDFS-NameNode-part-3-RPC-scalability-features/ta-p/246719

@BTullis will the SRE team have time to try to implement this?

@JAllemandou - Sorry I just saw this ping. We can definitely look into this. Shall we create a new ticket?

@JAllemandou - Sorry I just saw this ping. We can definitely look into this. Shall we create a new ticket?

Yes please, let's do that :)