Diego reports:
With pyspark I'm getting this error a lot (even when I'm working with small datasets, for example a list of 1 million integers):
Reason: Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Apparently, the python operations within PySpark, uses this overhead. One option if to increase the memoryOverhead, another solution might be this: https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic
To reproduce:
# # Counting Links in Wikipedias using a parquet DUMP #Setup import pandas as pd sqlContext.sql('use wmf') # Define UDF from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType import re def getWikilinksNumber(wikitext): #UDF to get wikipedia pages titles links = re.findall("\[\[(.*?)\]\]",wikitext) #get wikilinks if len(links) >=0: return len(links) else: return -1 udfGetWikilinksNumber = udf(getWikilinksNumber,IntegerType()) #Loading cawiki df = spark.read.parquet('hdfs:///user/joal/wmf/data/wmf/mediawiki/wikitext/snapshot=2018-01/cawiki') #Apply UDF df2 = df.withColumn('numOfLinks',udfGetWikilinksNumber(df.revision_text)) df2.count() #Get last revision ID until 2017/06/01 for all pages in namespace =0 , noredirect in cawiki pages = sqlContext.sql('''SELECT page_id,MAX(revision_id) as rev_id FROM mediawiki_history WHERE snapshot="2018-01" AND page_namespace=0 AND page_is_redirect=0 AND page_creation_timestamp < "2017-06-27" AND event_timestamp < "2017-06-27" AND wiki_db='cawiki' GROUP BY page_id''') pages.show() #Join last revision with udf results df3 = df2.select('numOfLinks','revision_id').join(pages.select('rev_id'),pages.rev_id == df2.revision_id) df3.schema result = df3.select('numOfLinks').toPandas() Py4JJavaError Traceback (most recent call last) <ipython-input-55-875f80eba6b8> in <module>() 1 import pandas as pd ----> 2 d = df3.select('numOfLinks').toPandas() /usr/lib/spark2/python/pyspark/sql/dataframe.py in toPandas(self) 1964 raise RuntimeError("%s\n%s" % (_exception_message(e), msg)) 1965 else: -> 1966 pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) 1967 1968 dtype = {} /usr/lib/spark2/python/pyspark/sql/dataframe.py in collect(self) 464 """ 465 with SCCallSiteSync(self._sc) as css: --> 466 port = self._jdf.collectToPython() 467 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer()))) 468 /usr/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value( -> 1160 answer, self.gateway_client, self.target_id, self.name) 1161 1162 for temp_arg in temp_args: /usr/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() /usr/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 raise Py4JJavaError( 319 "An error occurred while calling {0}{1}{2}.\n". --> 320 format(target_id, ".", name), value) 321 else: 322 raise Py4JError( Py4JJavaError: An error occurred while calling o840.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 491 in stage 59.0 failed 4 times, most recent failure: Lost task 491.3 in stage 59.0 (TID 57404, analytics1051.eqiad.wmnet, executor 7274): ExecutorLostFailure (executor 7274 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:938) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:3195) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192) at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:3225) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)