In T369868#10142190, we made a benchmark on top of wmf_dumps.wikitext_raw, and found out the following query planning times:
# With 8 driver cores, 66k files scanned, read 47M rows, 148k splits, 4.4GB # Elapsed time: 303.5358393192291 seconds (120 query, 183 planning) # Elapsed time: 301.08918023109436 seconds (102 query, 199 planning) # Elapsed time: 286.9110209941864 seconds (90 query, 196 planning) # Elapsed time: x seconds (x query, x planning) for i in range(0,3): start_time = time.time() spark.sql(""" SELECT count(1) as count FROM wmf_dumps.wikitext_raw_rc2 WHERE (wiki_db = 'enwiki' AND page_id IN (12924534, 29687986, 35328557, 73692977, 74530252, 74530254, 75962364, 75962367, 75971447, 75971928, 75977325, 75985100, 75985872, 76310582, 76310588, 76310589, 77328875, 77478992, 77479936, 77480638, 77480639, 77486461, 77486512, 77488096, 77488322, 77488407, 77488486)) """).show(20) end_time = time.time() print(f"Elapsed time: {end_time - start_time} seconds")
~190 seconds is a lot of time spent on query planning, especially considering the actual query time is ~100.
A quick investigation shows that:
- The Iceberg java threadpool is doing the right thing as per code. I can confirm this on a thread dump.
- Most all the Iceberg threadpool threads are waiting on:
java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:502) org.apache.hadoop.util.concurrent.AsyncGet$Util.wait(AsyncGet.java:59) org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1600) org.apache.hadoop.ipc.Client.call(Client.java:1558) org.apache.hadoop.ipc.Client.call(Client.java:1455) org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242) org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129) com.sun.proxy.$Proxy30.getBlockLocations(Unknown Source) org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:333) sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422) org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165) org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157) org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95) => holding Monitor(org.apache.hadoop.io.retry.RetryInvocationHandler$Call@768716090}) org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359) com.sun.proxy.$Proxy31.getBlockLocations(Unknown Source) org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:900) org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:889) org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:946) org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:288) org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:285) org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:295) org.apache.iceberg.hadoop.HadoopInputFile.getBlockLocations(HadoopInputFile.java:210) org.apache.iceberg.hadoop.Util.blockLocations(Util.java:111) org.apache.iceberg.hadoop.Util.blockLocations(Util.java:84) org.apache.iceberg.spark.source.SparkInputPartition.<init>(SparkInputPartition.java:62) org.apache.iceberg.spark.source.SparkBatch.lambda$planInputPartitions$0(SparkBatch.java:90) org.apache.iceberg.spark.source.SparkBatch$$Lambda$2759/410160600.run(Unknown Source) org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:69) org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) java.util.concurrent.FutureTask.run(FutureTask.java:266) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:750)
This points to Iceberg's org.apache.iceberg.hadoop.HadoopInputFile.getBlockLocations(HadoopInputFile.java:210) being a bottleneck.
In this task we should:
- Figure out if this speculation is correct.
- Figure whether we can tune the HDFS Client to utilize more network and/or more cores, We can see from HDFS Namenode that there is no spikes coming from our query, so we should be able to squeeze mroe bytes out of it.
- Explore Iceberg metadata caching mechanism as suggested by @JAllemandou and figure if it would be beneficial for us: