Page MenuHomePhabricator

Investigate why a SELECT count(1) takes 1.4 hours to plan for wikidata_raw_rc1
Closed, ResolvedPublic

Description

While running a couple example queries for @JEbe-WMF, we found that with the following resources:

spark3-sql   \
    --master yarn \
    --executor-memory 16G \
    --executor-cores 4 \
    --driver-memory 32G \
    --driver-cores 4 \
    --conf spark.dynamicAllocation.maxExecutors=256 \
    --conf spark.sql.shuffle.partitions=10048 \
    --conf spark.driver.maxResultSize=8G \

It takes 2.6 hours to compute a count! Only 1.2 hours are spent computing the result. Thus, 1.4 hours are being spent in query planning! (It generated 298056 Spark tasks):

...
spark-sql (default)> select count(1) as count from wmf_dumps.wikitext_raw_rc1;
count
6324909273
Time taken: 9369.366 seconds, Fetched 1 row(s)

Other simpler counts that do not touch all the metadata finish successfully in a reasonable time (25min processing, few seconds of query planning, 7717 tasks):

spark-sql (default)> select count(1) as count from wmf_dumps.wikitext_raw_rc1 where wiki_db = 'eswiki';
count
142466014
Time taken: 1553.839 seconds, Fetched 1 row(s)

I speculate this is a limitation of doing query planning in the driver, as Iceberg does.

In this task we should:

  • Figure if the speculation is correct, perhaps by doing some profiling. Or perhaps there is some compaction needed?
  • Figure if the recent Iceberg 1.4.0 release solves this issue, as they now allow distributed query planning. No need, the issue is not Iceberg.

Event Timeline

I recently deleted wmf_dumps.wikitext_raw_rc1 in favor of wmf_dumps.wikitext_raw_rc2. I cannot repro this issue anymore, thus closing.

xcollazo claimed this task.

While working on T354761 I hit this issue again. Query planning takes a long while. I don't even want to attempt a count(1).

Thus reopening this for investigating root cause.

xcollazo removed xcollazo as the assignee of this task.
xcollazo removed a subscriber: Aklapper.

Looked at this a bit today. Running a count(1) is currently not possible in stat1007 as my process is killed due to memory usage. Here are some rought notes:

Ran the following:

spark3-sql   \
--master yarn   \
--executor-memory 16G   \
--executor-cores 4   \
--driver-memory 32G   \
--driver-cores 8   \
--conf spark.dynamicAllocation.maxExecutors=64   \
--conf spark.sql.shuffle.partitions=1024   \
--conf spark.driver.maxResultSize=32G \
--conf spark.sql.autoBroadcastJoinThreshold=-1

In a thread dump, I can see Spark UI hoarding threads:

12 threads with this stack:
"SparkUI-78": running, holding [0x00007f6eb9b8e6c0, 0x00007f6eb9b8e6d8, 0x00007f6eb9b8e648]
"SparkUI-79": running, holding [0x00007f6eb9bd0ae8, 0x00007f6eb9bd0b00, 0x00007f6eb9bd0a70]
"SparkUI-80": running, holding [0x00007f6eb9c833d0, 0x00007f6eb9c833e8, 0x00007f6eb9c83358]
"SparkUI-81": running, holding [0x00007f6eb9bf6cf8, 0x00007f6eb9bf6d10, 0x00007f6eb9bf6c80]
"SparkUI-82": running, holding [0x00007f6eb9b9d468, 0x00007f6eb9b9d450, 0x00007f6eb9b9d2a0]
"SparkUI-83": running, holding [0x00007f6eb9bf6448, 0x00007f6eb9bf6460, 0x00007f6eb9bf63d0]
"SparkUI-84": running, holding [0x00007f6eb9bcfa20, 0x00007f6eb9bcfa38, 0x00007f6eb9bcf9a8]
"SparkUI-85": running, holding [0x00007f6eb9bc8618, 0x00007f6eb9bc8630, 0x00007f6eb9bc85a0]
"SparkUI-86": running, holding [0x00007f6eb9bcf3b0, 0x00007f6eb9bcf3c8, 0x00007f6eb9bcf338]
"SparkUI-87": running, holding [0x00007f6eb9b8ddb8, 0x00007f6eb9b8ddd0, 0x00007f6eb9b8dd40]
"SparkUI-88": running, holding [0x00007f6eb9bf6170, 0x00007f6eb9bf6188, 0x00007f6eb9bf60f8]
"SparkUI-89": running, holding [0x00007f6eb9bcf0d8, 0x00007f6eb9bcf0f0, 0x00007f6eb9bcf060]
	at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
	at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
	at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
	at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
	at org.sparkproject.jetty.io.ManagedSelector.nioSelect(ManagedSelector.java:183)
	at org.sparkproject.jetty.io.ManagedSelector.select(ManagedSelector.java:190)
	at org.sparkproject.jetty.io.ManagedSelector$SelectorProducer.select(ManagedSelector.java:606)
	at org.sparkproject.jetty.io.ManagedSelector$SelectorProducer.produce(ManagedSelector.java:543)
	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.produceTask(EatWhatYouKill.java:360)
	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:184)
	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
	at org.sparkproject.jetty.io.ManagedSelector$$Lambda$621/1903406683.run(Unknown Source)
	at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882)
	at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036)
	at java.lang.Thread.run(Thread.java:750)

Had a suspicion, as the Spark UI has given me memory issues before when there are many many tasks so adding --conf spark.ui.enabled=false:

spark3-sql   \
--master yarn   \
--executor-memory 16G   \
--executor-cores 4   \
--driver-memory 32G   \
--driver-cores 8   \
--conf spark.dynamicAllocation.maxExecutors=64   \
--conf spark.sql.shuffle.partitions=1024   \
--conf spark.driver.maxResultSize=32G \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
--conf spark.ui.enabled=false

The process still uses a lot of memory, but it has now passed the ~5min mark at which it died before. Current mem usage:

31151 xcollazo  20   0   44.2g   9.0g  26628 S 256.6  14.3  41:38.89 java

After about ~25 mins it got killed by the stat1007 resource manager. Note it was killed, and it had not OOMed. A heapdump shows no clear offender.

Note further there was 0 computation done other than query planning here.

Will continue investigation.

Reran the query, but this time on the new stat1011:

Ran the following:

spark3-sql   \
--master yarn   \
--executor-memory 16G   \
--executor-cores 4   \
--driver-memory 32G   \
--driver-cores 8   \
--conf spark.dynamicAllocation.maxExecutors=64   \
--conf spark.sql.shuffle.partitions=1024   \
--conf spark.driver.maxResultSize=32G \
--conf spark.sql.autoBroadcastJoinThreshold=-1


spark-sql (default)> select count(1) as count from wmf_dumps.wikitext_raw_rc2;
count
6582354679
Time taken: 738.202 seconds, Fetched 1 row(s)

Exited spark and did Run #2:

spark-sql (default)> select count(1) as count from wmf_dumps.wikitext_raw_rc2;
count
6582600783
Time taken: 760.844 seconds, Fetched 1 row(s)

So the issue is unrelated to Iceberg, and more related to memory contention on stat1007. On the second run, out of the 760.844 seconds, 246 were the query itself, leaving 514 (8.6 mins) of query planning. That sounds reasonable. Thread Dumps show that there is a lot of waiting on HDFS, so most of the time cost is reading all the HDFS block metadata. Many threads were waiting on:

java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.hadoop.util.concurrent.AsyncGet$Util.wait(AsyncGet.java:59)
org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1499)
org.apache.hadoop.ipc.Client.call(Client.java:1457)
org.apache.hadoop.ipc.Client.call(Client.java:1367)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
...
org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:280)
org.apache.iceberg.hadoop.HadoopInputFile.getBlockLocations(HadoopInputFile.java:210)
org.apache.iceberg.hadoop.Util.blockLocations(Util.java:111)
org.apache.iceberg.hadoop.Util.blockLocations(Util.java:84)
org.apache.iceberg.spark.source.SparkBatchScan$ReadTask.<init>(SparkBatchScan.java:333)
org.apache.iceberg.spark.source.SparkBatchScan.lambda$planInputPartitions$0(SparkBatchScan.java:154)
org.apache.iceberg.spark.source.SparkBatchScan$$Lambda$2138/772661773.run(Unknown Source)
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:69)
org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315)

That is reasonable as well.

Rerunning with --driver-memory 16G is also successful, albeit slower:

spark-sql (default)> select count(1) as count from wmf_dumps.wikitext_raw_rc2;
count
6582600783
Time taken: 1182.185 seconds, Fetched 1 row(s)

Out of the 1182 seconds, 222 (3.7 mins) were the query itself, while 960 (16 mins) were query planning. Presumably heap memory contention is to blame for the 2x query planning time. However, the query was successful. This furthers the evidence that the issue is related to memory contention, and that stat1007 is, these days, too loaded to be able to run such a memory intensive job.

Conclusion: Iceberg is not at fault here. We need at least 16GB on driver to successfully build the query plan for a SELECT over all the table. Given the driver requirements, we should document this behavior as part of T347611.