Page MenuHomePhabricator

Spark Jupyter Notebook integration
Closed, ResolvedPublic21 Estimated Story Points

Description

Apache toree is working! However, still issues to solve:

Event Timeline

fdans moved this task from Incoming to Data Exploration Tools on the Analytics board.

Change 443658 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Set spark2 spark.sql.catalogImplementation=hive if hive enabled

https://gerrit.wikimedia.org/r/443658

Change 443658 merged by Ottomata:
[operations/puppet@production] Set spark2 spark.sql.catalogImplementation=hive if hive enabled

https://gerrit.wikimedia.org/r/443658

Change 443669 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/jupyterhub/deploy@master] Updating wheels with Apache toree 0.2.0 rc5, and jupyterlab 0.32.1

https://gerrit.wikimedia.org/r/443669

Change 443669 merged by Ottomata:
[analytics/jupyterhub/deploy@master] Updating wheels with Apache toree 0.2.0 rc5, and jupyterlab 0.32.1

https://gerrit.wikimedia.org/r/443669

Change 443735 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Install new venv with cwd set to deploy path

https://gerrit.wikimedia.org/r/443735

Change 443736 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/jupyterhub/deploy@master] Install toree kernels for all users

https://gerrit.wikimedia.org/r/443736

Change 443736 merged by Ottomata:
[analytics/jupyterhub/deploy@master] Install toree kernels for all users

https://gerrit.wikimedia.org/r/443736

Change 443735 merged by Ottomata:
[operations/puppet@production] Install new venv with cwd set to deploy path

https://gerrit.wikimedia.org/r/443735

Alright, to install for existing users and installations, we need to first do to things. 1, reinstall the jupyterhub instance venv at /srv/jupyterhub/venv:

cd /srv/jupyterhub/deploy
sudo git pull
sudo ./create_virtualenv.sh /srv/jupyterhub/venv

This will recreate the venv with updated dependencies (including Toree), and then install global Toree jupyter kernels for all users into /usr/local/share/jupyter/kernels.

Then, for each user that already has a venv, we need to update and install dependencies (including Toree) into it:

cd /srv/jupyterhub/deploy
for venv in $(ls -d /home/*/venv); do
    user=$(echo $venv | awk -F '/' '{print $3}')
    echo "Installing packages for $user"
    sudo -u $user $venv/bin/pip install \
        --upgrade \
        --no-index \
        --ignore-installed \
        --find-links=/srv/jupyterhub/deploy/artifacts/stretch/wheels \
        --requirement=/srv/jupyterhub/deploy/frozen-requirements.txt &
done

Change 444667 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/jupyterhub/deploy@master] Add note about PYSPARK_PYTHON

https://gerrit.wikimedia.org/r/444667

BTW, the PySpark on YARN notebook needs PYSPARK_PYTHON manually set to the same value of PYTHON_EXEC, e.g. /usr/bin/python3 to avoid version errors. Here's a working Toree jupyter kernel.json for PySpark in YARN:

{
  "env": {
    "PYTHON_EXEC": "/usr/bin/python3",
    "PYSPARK_PYTHON": "/usr/bin/python3",
    "DEFAULT_INTERPRETER": "PySpark",
    "PYTHONPATH": "/usr/lib/spark2/python:/usr/lib/spark2/python/lib/py4j-0.10.6-src.zip",
    "__TOREE_OPTS__": "",
    "__TOREE_SPARK_OPTS__": "--name='Jupyter PySpark' --master=yarn --conf spark.executorEnv.PYTHONPATH=/usr/lib/spark2/python:/usr/lib/spark2/python/lib/py4j-0.10.6-src.zip",
    "SPARK_HOME": "/usr/lib/spark2"
  },
  "display_name": "Spark YARN - PySpark",
  "metadata": {},
  "language": "python",
  "interrupt_mode": "signal",
  "argv": [
    "/usr/local/share/jupyter/kernels/spark_yarn_pyspark/bin/run.sh",
    "--profile",
    "{connection_file}"
  ]
}

Change 444735 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Install python(3)-tk so that Jupyter can render charts with matplotlib

https://gerrit.wikimedia.org/r/444735

Change 444735 merged by Ottomata:
[operations/puppet@production] Install python(3)-tk so that Jupyter can render charts with matplotlib

https://gerrit.wikimedia.org/r/444735

Regarding the visualization problems, autocomplete, but specially the inline plots, here there is a possible way to explore:

https://stackoverflow.com/questions/39570019/how-to-get-ipython-inbuild-magic-command-to-work-in-jupyter-notebook-pyspark-ker

Ottomata renamed this task from Spark notebook integration to Spark Jupyter Notebook integration.Jul 9 2018, 8:40 PM
Ottomata raised the priority of this task from Low to Medium.
Ottomata updated the task description. (Show Details)
Ottomata moved this task from In Code Review to In Progress on the Analytics-Kanban board.
Ottomata added a subscriber: JAllemandou.

Change 445002 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/jupyterhub/deploy@master] Use ipython for PySpark instead of Toree

https://gerrit.wikimedia.org/r/445002

Change 444667 merged by Ottomata:
[analytics/jupyterhub/deploy@master] Add note about PYSPARK_PYTHON

https://gerrit.wikimedia.org/r/444667

Change 445002 merged by Ottomata:
[analytics/jupyterhub/deploy@master] Use ipython for PySpark instead of Toree

https://gerrit.wikimedia.org/r/445002

Change 445015 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/jupyterhub/deploy@master] Fix path to brunel jar for spark scala jupyter kernels

https://gerrit.wikimedia.org/r/445015

Change 445015 merged by Ottomata:
[analytics/jupyterhub/deploy@master] Fix path to brunel jar for spark scala jupyter kernels

https://gerrit.wikimedia.org/r/445015

@diego, joal and I talked today, and we indeed decided to ditch Toree for PySpark, and just go with the ipython kernel + spark integration.
I just installed this on both notebook1003 and 1004, replacing the Toree PySpark kernels. They seem to work better now. Try em out!

@JAllemandou, I modified the Scala kernels to pass --jars /srv/jupyterhub/deploy/spark-kernel-brunel-all-2.6.jar and also --conf spark.dynamicAllocation.maxExecutors=128.

I even added some sweet logo png files, so now JupyterLab Launcher icons look more distinct.

With pyspark I'm getting this error a lot (even when I'm working with small datasets, for example a list of 1 million integers):

Reason: Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Apparently, the python operations within PySpark, uses this overhead. One option if to increase the memoryOverhead, another solution might be this: https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic

Change 451061 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/swap/deploy@master] Updating wheels with Apache toree 0.2.0 rc5, and jupyterlab 0.32.1

https://gerrit.wikimedia.org/r/451061

Change 451063 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/swap/deploy@master] Install toree kernels for all users

https://gerrit.wikimedia.org/r/451063

Change 451065 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/swap/deploy@master] Add note about PYSPARK_PYTHON

https://gerrit.wikimedia.org/r/451065

Change 451066 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/swap/deploy@master] Use ipython for PySpark instead of Toree

https://gerrit.wikimedia.org/r/451066

Change 451067 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/swap/deploy@master] Fix path to brunel jar for spark scala jupyter kernels

https://gerrit.wikimedia.org/r/451067

Change 451068 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/swap/deploy@master] Use versionless symlink for spark kernels that use py4j

https://gerrit.wikimedia.org/r/451068

Change 451069 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/jupyterhub/deploy@master] Use versionless symlink for spark kernels that use py4j

https://gerrit.wikimedia.org/r/451069

Change 451061 abandoned by Ottomata:
Updating wheels with Apache toree 0.2.0 rc5, and jupyterlab 0.32.1

Reason:
wrong repo

https://gerrit.wikimedia.org/r/451061

Change 451063 abandoned by Ottomata:
Install toree kernels for all users

Reason:
wrong repo

https://gerrit.wikimedia.org/r/451063

Change 451065 abandoned by Ottomata:
Add note about PYSPARK_PYTHON

Reason:
wrong repo

https://gerrit.wikimedia.org/r/451065

Change 451066 abandoned by Ottomata:
Use ipython for PySpark instead of Toree

Reason:
wrong repo

https://gerrit.wikimedia.org/r/451066

Change 451067 abandoned by Ottomata:
Fix path to brunel jar for spark scala jupyter kernels

Reason:
wrong repo

https://gerrit.wikimedia.org/r/451067

Change 451068 abandoned by Ottomata:
Use versionless symlink for spark kernels that use py4j

Reason:
wrong repo

https://gerrit.wikimedia.org/r/451068

Change 451069 merged by Ottomata:
[analytics/jupyterhub/deploy@master] Use versionless symlink for spark kernels that use py4j

https://gerrit.wikimedia.org/r/451069

Change 451103 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[analytics/jupyterhub/deploy@master] Fix pyspark kernels to properly apply dynamicAllocation.maxExecutors=128

https://gerrit.wikimedia.org/r/451103

Change 451103 merged by Ottomata:
[analytics/jupyterhub/deploy@master] Fix pyspark kernels to properly apply dynamicAllocation.maxExecutors=128

https://gerrit.wikimedia.org/r/451103

@diego to make it easier to troubleshoot, can you explain how to reproduce? Tanks!

Ottomata updated the task description. (Show Details)

@JAllemandou I think you were right about the magic jar. I think that will have to be done manually no matter what. At least we have it deployed with SWAP now, and can add it from the local filesystem:

https://wikitech.wikimedia.org/wiki/SWAP#Spark_with_Brunel

Many thanks @Ottomata ! Those notebooks are awesome :)

@Ottomata for example this:

# # Counting Links in Wikipedias using a parquet DUMP


#Setup
import pandas as pd
sqlContext.sql('use wmf')


# Define UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import re

def getWikilinksNumber(wikitext): #UDF to get wikipedia pages titles
    links  = re.findall("\[\[(.*?)\]\]",wikitext) #get wikilinks
    if len(links) >=0:
        return len(links)
    else:
        return -1

udfGetWikilinksNumber = udf(getWikilinksNumber,IntegerType())

#Loading cawiki
df = spark.read.parquet('hdfs:///user/joal/wmf/data/wmf/mediawiki/wikitext/snapshot=2018-01/cawiki')
#Apply UDF
df2 = df.withColumn('numOfLinks',udfGetWikilinksNumber(df.revision_text))


df2.count()

#Get last revision ID until 2017/06/01 for all pages in namespace =0 , noredirect in cawiki

pages = sqlContext.sql('''SELECT page_id,MAX(revision_id) as rev_id FROM mediawiki_history  WHERE snapshot="2018-01" 
                     AND page_namespace=0 AND page_is_redirect=0 AND page_creation_timestamp < "2017-06-27" AND event_timestamp < "2017-06-27" AND  wiki_db='cawiki' GROUP BY page_id''')
pages.show()


#Join last revision with udf results

df3 = df2.select('numOfLinks','revision_id').join(pages.select('rev_id'),pages.rev_id == df2.revision_id)



df3.schema
result = df3.select('numOfLinks').toPandas()


Py4JJavaError                             Traceback (most recent call last)
<ipython-input-55-875f80eba6b8> in <module>()
      1 import pandas as pd
----> 2 d = df3.select('numOfLinks').toPandas()

/usr/lib/spark2/python/pyspark/sql/dataframe.py in toPandas(self)
   1964                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   1965         else:
-> 1966             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   1967 
   1968             dtype = {}

/usr/lib/spark2/python/pyspark/sql/dataframe.py in collect(self)
    464         """
    465         with SCCallSiteSync(self._sc) as css:
--> 466             port = self._jdf.collectToPython()
    467         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    468 

/usr/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

/usr/lib/spark2/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling o840.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 491 in stage 59.0 failed 4 times, most recent failure: Lost task 491.3 in stage 59.0 (TID 57404, analytics1051.eqiad.wmnet, executor 7274): ExecutorLostFailure (executor 7274 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 2.1 GB of 2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:3195)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3192)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:3225)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3192)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Ottomata updated the task description. (Show Details)

Diego's problem is a larger pyspark issue, not related to Jupyter Notebooks. I've created T201519: pyspark2 job killed by YARN for exceeding memory limits to track it.

Need to be able to set custom Spark settings before Kernel is launched

I'm not sure we can do this. :/ I'm trying to figure some nice way to allow a user in the Jupyter interface to set some environment variables before the notebook kernel is launched, but I can't seem to do it. The only solution I can think of is adding documentation for users on how to create their own Jupyter kernels in their ~/.local/share/jupyter/kernels directory that specify the desired Spark settings.

@JAllemandou do you think this would be enough?

Ottomata set the point value for this task to 21.Aug 9 2018, 3:02 PM

I merged https://gerrit.wikimedia.org/r/#/c/analytics/jupyterhub/deploy/+/451781/ which adds 2 new 'large' kernels that bump executor memory to 4g and memoryOverhead to 2g. These should be enough for Diego's job.

I also added https://wikitech.wikimedia.org/wiki/SWAP#Custom_Spark_Kernels to document how users can create their own custom kernels to set Spark options themselves.