Page MenuHomePhabricator

pyspark2 job killed by YARN for exceeding memory limits
Closed, ResolvedPublic5 Estimated Story Points

Description

Diego reports:

With pyspark I'm getting this error a lot (even when I'm working with small datasets, for example a list of 1 million integers):

Reason: Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Apparently, the python operations within PySpark, uses this overhead. One option if to increase the memoryOverhead, another solution might be this: https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic

To reproduce:

# # Counting Links in Wikipedias using a parquet DUMP


#Setup
import pandas as pd
sqlContext.sql('use wmf')


# Define UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import re

def getWikilinksNumber(wikitext): #UDF to get wikipedia pages titles
    links  = re.findall("\[\[(.*?)\]\]",wikitext) #get wikilinks
    if len(links) >=0:
        return len(links)
    else:
        return -1

udfGetWikilinksNumber = udf(getWikilinksNumber,IntegerType())

#Loading cawiki
df = spark.read.parquet('hdfs:///user/joal/wmf/data/wmf/mediawiki/wikitext/snapshot=2018-01/cawiki')
#Apply UDF
df2 = df.withColumn('numOfLinks',udfGetWikilinksNumber(df.revision_text))


df2.count()

#Get last revision ID until 2017/06/01 for all pages in namespace =0 , noredirect in cawiki

pages = sqlContext.sql('''SELECT page_id,MAX(revision_id) as rev_id FROM mediawiki_history  WHERE snapshot="2018-01" 
                     AND page_namespace=0 AND page_is_redirect=0 AND page_creation_timestamp < "2017-06-27" AND event_timestamp < "2017-06-27" AND  wiki_db='cawiki' GROUP BY page_id''')
pages.show()


#Join last revision with udf results

df3 = df2.select('numOfLinks','revision_id').join(pages.select('rev_id'),pages.rev_id == df2.revision_id)



df3.schema
result = df3.select('numOfLinks').toPandas()


Py4JJavaError                             Traceback (most recent call last)
<ipython-input-55-875f80eba6b8> in <module>()
      1 import pandas as pd
----> 2 d = df3.select('numOfLinks').toPandas()

/usr/lib/spark2/python/pyspark/sql/dataframe.py in toPandas(self)
   1964                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   1965         else:
-> 1966             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   1967 
   1968             dtype = {}

/usr/lib/spark2/python/pyspark/sql/dataframe.py in collect(self)
    464         """
    465         with SCCallSiteSync(self._sc) as css:
--> 466             port = self._jdf.collectToPython()
    467         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    468 

/usr/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

/usr/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling o840.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 491 in stage 59.0 failed 4 times, most recent failure: Lost task 491.3 in stage 59.0 (TID 57404, analytics1051.eqiad.wmnet, executor 7274): ExecutorLostFailure (executor 7274 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:3195)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:3225)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)

Event Timeline

Ottomata triaged this task as Medium priority.Aug 8 2018, 3:00 PM
Ottomata created this task.

We did some tests in PySpark CLI with @Ottomata this evening and found memory settings that work (with some minor changes in code).

Job succeeded for both Pyspark and Scala-shell with as low as 1G per executor and 2G of memory overhead:

pyspark2 --master yarn --conf spark.dynamicAllocation.maxExecutors=128 --executor-memory 1g --conf spark.executor.memoryOverhead=2g

I think that big parquet files (wikitext can be a few Mb) take RAM to be deserialized from Parquet to Spark-internal representation, and that this RAM is allocated by Spark in the overhead section.

For the record, here is the pyspark code we used:

pages = sqlContext.sql('''
SELECT
  page_id,
  MAX(revision_id) as rev_id
FROM wmf.mediawiki_history
WHERE snapshot="2018-01"
  AND page_namespace=0
  AND page_is_redirect=0
  AND page_creation_timestamp < "2017-06-27 00:00:00"
  AND event_timestamp < "2017-06-27 00:00:00"
  AND wiki_db='cawiki'
  GROUP BY page_id''')

#Loading cawiki
df = spark.read.parquet('hdfs:///user/joal/wmf/data/wmf/mediawiki/wikitext/snapshot=2018-01/cawiki')
df1 = df.join(pages, df.revision_id == pages.rev_id)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import re

def getWikilinksNumber(wikitext): #UDF to get wikipedia pages titles
    links  = re.findall("\[\[(.*?)\]\]",wikitext) #get wikilinks
    if len(links) >=0:
        return len(links)
    else:
        return -1

udfGetWikilinksNumber = udf(getWikilinksNumber,IntegerType())
#Apply UDF
df2 = df1.withColumn('numOfLinks',udfGetWikilinksNumber(df.revision_text))
df2.show()

Please note that we don't try to read the text except after the join, and only to process is with the regex (no .show() before the last line).
Also, we join before applying the function (filter before map) - Spark is good at optimizing it's inner code.

@JAllemandou , I've dona a copy/paste of your code in the Notebook and get same error than before:

Py4JJavaError: An error occurred while calling o60.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 36 in stage 1.0 failed 4 times, most recent failure: Lost task 36.3 in stage 1.0 (TID 815, analytics1046.eqiad.wmnet, executor 80): ExecutorLostFailure (executor 80 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 2.0 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

@JAllemandou , I've dona a copy/paste of your code in the Notebook and get same error than before:

This is expected since there have been no change in memory for workers in notebooks.
@Ottomata and myself are trying to find a solution to the memory-parameterization problem.

Milimetric raised the priority of this task from Medium to High.
Milimetric lowered the priority of this task from High to Medium.
Milimetric moved this task from Incoming to Operational Excellence on the Analytics board.
Milimetric added a project: Analytics-Kanban.

Change 451781 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/jupyterhub/deploy@master] Add spark yarn scala and pyspark 'large' kernels

https://gerrit.wikimedia.org/r/451781

Change 451781 merged by Ottomata:
[analytics/jupyterhub/deploy@master] Add spark yarn scala and pyspark 'large' kernels

https://gerrit.wikimedia.org/r/451781

@diego, I've installed a PySpark YARN (large) notebook kernel that has 4g executors with 2g memoryOverhead. This should be sufficient for your job. I've also added https://wikitech.wikimedia.org/wiki/SWAP#Custom_Spark_Kernels in case you need to install a custom kernel to tweak more options to the pyspark shell.

@Ottomata ,thanks for the update. I've tried the same code (the version posted above by @JAllemandou ) and got the same error.

Py4JJavaError: An error occurred while calling o60.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 36 in stage 3.0 failed 4 times, most recent failure: Lost task 36.3 in stage 3.0 (TID 4214, analytics1050.eqiad.wmnet, executor 61): ExecutorLostFailure (executor 61 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

I'll try with some "tweaks" and let you know.

My mistake, I was not using the "large" option. This work smoothly!
Thank you @Ottomata and @JAllemandou your amazing!

Ottomata set the point value for this task to 5.