Page MenuHomePhabricator

Get 'sparklyr' working on stats1005
Closed, DeclinedPublic

Description

The folks at RStudio recently unveiled their new package sparklyr that allows data scientists to use dplyr verbs on remote data via Spark. It also exports the ML algorithms bundled with Spark and makes them available as functions in R. See Jeff Allen's talk at useR! 2016 Stanford: https://github.com/trestletech/user2016-sparklyr/blob/master/sparklyr-user2016.pdf

We've long had an interest in using Spark with R, and this seems to finally be solution we were waiting for. Let's see if we can get it to work!

P.S. Once PAWS Internal has Spark support, it would be great to be able to use this in Jupyter notebooks too.

Event Timeline

debt triaged this task as Low priority.Jul 27 2016, 7:16 PM
mpopov renamed this task from 10%: Get 'sparklyr' working to Get 'sparklyr' working on stats1002.Feb 27 2017, 6:48 PM
mpopov removed mpopov as the assignee of this task.
mpopov added a project: Analytics.
mpopov updated the task description. (Show Details)
mpopov removed the point value for this task.
mpopov edited subscribers, added: Nuria; removed: Zppix.

We are going to take a look at this, we can probably do it if it does not involve changing all the cluster configuration, migration to spark2 needs to happen before we look at this item.

We are going to take a look at this, we can probably do it if it does not involve changing all the cluster configuration, migration to spark2 needs to happen before we look at this item.

Thank you for the status update and looking into this! Good luck migrating to spark2 if you end up going forward with that!

I downloaded spark-1.6.3-bin-hadoop2.6 (http://spark.rstudio.com/#installation uses Spark 1.6.2) and put it in my homedir on stat1002 and have the following in my .bashrc:

export HADOOP_HEAPSIZE=1024
export HADOOP_CONF_DIR=/etc/hadoop/conf
export PATH="/home/bearloga/spark-1.6.3-bin-hadoop2.6/bin:$PATH"
export SPARK_HOME="/home/bearloga/spark-1.6.3-bin-hadoop2.6"

Following the directions in http://spark.rstudio.com/#connecting-to-spark to install sparklyr (from CRAN):

# install.packages("sparklyr", repos = c(CRAN = "https://cran.rstudio.com")
library(sparklyr)
sc <- spark_connect(master = "local")

When I print sc:

$master
[1] "local[16]"

$method
[1] "shell"

$app_name
[1] "sparklyr"

$config
$config$sparklyr.cores.local
[1] 16

$config$spark.sql.shuffle.partitions.local
[1] 16

$config$spark.env.SPARK_LOCAL_IP.local
[1] "127.0.0.1"

$config$sparklyr.csv.embedded
[1] "^1.*"

$config$`sparklyr.shell.driver-class-path`
[1] ""

attr(,"config")
[1] "default"
attr(,"file")
[1] "/home/bearloga/R/x86_64-pc-linux-gnu-library/3.2/sparklyr/conf/config-template.yml"

$spark_home
[1] "/home/bearloga/spark-1.6.3-bin-hadoop2.6"

$backend
        description               class                mode                text 
"->localhost:54863"          "sockconn"                "wb"            "binary" 
             opened            can read           can write 
           "opened"               "yes"               "yes" 

$monitor
       description              class               mode               text 
"->localhost:8880"         "sockconn"               "rb"           "binary" 
            opened           can read          can write 
          "opened"              "yes"              "yes" 

$output_file
[1] "/tmp/Rtmpt8UlNE/file3a6739ae784b_spark.log"

$spark_context
<jobj[5]>
  class org.apache.spark.SparkContext
  org.apache.spark.SparkContext@70f988aa

$java_context
<jobj[6]>
  class org.apache.spark.api.java.JavaSparkContext
  org.apache.spark.api.java.JavaSparkContext@e7b8ad8

$hive_context
<jobj[7]>
  class org.apache.spark.sql.hive.HiveContext
  org.apache.spark.sql.hive.HiveContext@50d87255

attr(,"class")
[1] "spark_connection"       "spark_shell_connection" "DBIConnection"

So things seem to be OK? At least it seems to be getting the configuration right. But when I try to use one of the first commands on that page to get the small, built-in iris dataset into Spark from R:

iris_tbl <- copy_to(sc, iris)

I get the following error:

Error: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://analytics-hadoop/tmp/Rtmpt8UlNE/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70ef0b.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
	at com.databricks.spark.csv.CsvRelation.firstLine$lzycompute(CsvRelation.scala:249)
	at com.databricks.spark.csv.CsvRelation.firstLine(CsvRelation.scala:245)
	at com.databricks.spark.csv.CsvRelation.inferSchema(CsvRelation.scala:223)
	at com.databricks.spark.csv.CsvRelation.<init>(CsvRelation.scala:72)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:157)
	at com.databricks.spark.csv.DefaultSource.createRelation(DefaultSource.scala:44)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:109)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at sparklyr.Invoke$.invoke(invoke.scala:94)
	at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
	at sparklyr.StreamHandler$.read(stream.scala:55)
	at sparklyr.BackendHandler.channelRead0(handler.scala:49)
	at sparklyr.BackendHandler.channelRead0(handler.scala:14)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
	at java.lang.Thread.run(Thread.java:745)

It seems SparkR works:

# install.packages("spark-1.6.3-bin-hadoop2.6/R/lib/SparkR", type = "source", repos = NULL)
library(SparkR)
sc <- sparkR.init()
# Launching java with spark-submit command /home/bearloga/spark-1.6.3-bin-hadoop2.6/bin/spark-submit   sparkr-shell /tmp/Rtmp8RmFyc/backend_port631d498723e4 
sqlContext <- sparkRSQL.init(sc)

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a gaussian GLM model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model summary are returned in a similar format to R's native glm().
summary(model)
# $devianceResiduals
#  Min       Max     
#  -1.307112 1.412532
# ...

sparkR.stop()

Some people have posted the same problem at https://github.com/rstudio/sparklyr/issues/525 and one person has been able to figure out some kind of solution. Will try it and report after lunch :)

Sys.setenv(HADOOP_CONF_DIR = "")
Sys.setenv(HADOOP_HOME = "")
Sys.setenv(HADOOP_PREFIX = "")
Sys.setenv(YARN_CONF_DIR = "")

fixed it for me when using the latest version of sparklyr (installed from RStudio's GitHub repo), but probably at the expense of being able to do stuff with data that exists in hadoop/hive

Will double check :) also will re-try using the CRAN version

TODO: SSH tunneling to get local RStudio working with remote Spark :)

Nuria renamed this task from Get 'sparklyr' working on stats1002 to Get 'sparklyr' working on stats1005.Aug 14 2017, 3:49 PM
Nuria moved this task from Dashiki to Backlog (Later) on the Analytics board.

@mpopov I've tried out two different approaches to connect to Spark from {sparklyr} on stat1005, including yours, and failed. Please take a look and let me know if you have a clue on what's going here. Thanks a lot.

APPROACH A: Install Spark locally on stat1005 from {sparklyr}

1. Installed {sparklyr] on stat1005

2. From R:

library(sparklyr)
spark_install(version = "1.6.3", hadoop_version = "2.6") # Same version as @mpopov has tried to run

3. Try to connect:

sc <- spark_connect(master = "local")

RESULT:
`Using Spark: 1.6.3
Error in if (a[k] > b[k]) return(1) else if (a[k] < b[k]) return(-1L) : missing value where TRUE/FALSE needed
In addition: Warning message:
In compareVersion(parsedVersion, "1.7") : NAs introduced by coercion`

4. Following @mpopov :

Sys.setenv(HADOOP_CONF_DIR = "")
Sys.setenv(HADOOP_HOME = "")
Sys.setenv(HADOOP_PREFIX = "")
Sys.setenv(YARN_CONF_DIR = "")

but still

sc <- spark_connect(master = "local")

OUTPUT:
`Using Spark: 1.6.3
Error in if (a[k] > b[k]) return(1) else if (a[k] < b[k]) return(-1L) : missing value where TRUE/FALSE needed
In addition: Warning message:
In compareVersion(parsedVersion, "1.7") : NAs introduced by coercion`

5. Checked the Spark and Hadoop versions on stat1005; Spark is 1.6.0 not 1.6.3, Hadoop is 2.6

6. Install Spark 1.6.0 locally:

spark_uninstall(version = "1.6.3", hadoop_version = "2.6")
spark_install(version = "1.6.0", hadoop_version = "2.6")

RESULT:
`downloaded 0 bytes
Error in download.file(installInfo$packageRemotePath, destfile = installInfo$packageLocalPath, : cannot download all files`

APPROACH B: Follow the @mpopov method (Note: I have used spark-1.6.0, not spark-1.6.3, to match our Spark configuration; {sparklyr} documentation is explicit about the following: when connecting to a cluster, make sure that your local Spark configuration matches exactly the cluster configuration).

1. Downloaded Spark 1.6.0 pre-built for Hadoop 2.6 (spark-1.6.0-bin-hadoop2.6.tgz from http://spark.apache.org/downloads.html) to: /home/goransm/spark on stat1005; tar zxvf spark-1.6.0-bin-hadoop2.6.tgz

2. In my .bashrc (located in: /home/goransm) I've put:

export HADOOP_HEAPSIZE=1024
export HADOOP_CONF_DIR=/etc/hadoop/conf
export PATH="/home/goransm/spark/spark-1.6.0-bin-hadoop2.6/bin:$PATH"
export SPARK_HOME="/home/goransm/spark/spark-1.6.0-bin-hadoop2.6"

3. From R:

library(sparklyr)
sc <- spark_connect(master = "local")

RESULT:

`Using Spark: 1.6.0
Error in if (a[k] > b[k]) return(1) else if (a[k] < b[k]) return(-1L) : missing value where TRUE/FALSE needed
In addition: Warning message:
In compareVersion(parsedVersion, "1.7") : NAs introduced by coercion
`
4. Do the same with the Sys.setenv() calls as @mpopov did:

Sys.setenv(HADOOP_CONF_DIR = "")
Sys.setenv(HADOOP_HOME = "")
Sys.setenv(HADOOP_PREFIX = "")
Sys.setenv(YARN_CONF_DIR = "")
library(sparklyr)
sc <- spark_connect(master = "local")

RESULT:

`Using Spark: 1.6.0
Error in if (a[k] > b[k]) return(1) else if (a[k] < b[k]) return(-1L) : missing value where TRUE/FALSE needed
In addition: Warning message: In compareVersion(parsedVersion, "1.7") : NAs introduced by coercion`

  • Please advise. Thank you.
  • tested SparkR with Spark 2.1.2:
library(SparkR)
sc <- sparkR.session()
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, iris)
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")
summary(model)

Deviance Residuals: 
(Note: These are approximate quantiles with relative error <= 0.01)
     Min        1Q    Median        3Q       Max  
-1.30711  -0.26011  -0.06189   0.19111   1.41253  

Coefficients:
                    Estimate  Std. Error  t value  Pr(>|t|)  
(Intercept)         2.2514    0.36975     6.0889   9.5681e-09
Sepal_Width         0.80356   0.10634     7.5566   4.1873e-12
Species_versicolor  1.4587    0.11211     13.012   0         
Species_virginica   1.9468    0.10001     19.465   0         

(Dispersion parameter for gaussian family taken to be 0.1918059)

    Null deviance: 102.168  on 149  degrees of freedom
Residual deviance:  28.004  on 146  degrees of freedom
AIC: 183.9

Number of Fisher Scoring iterations: 1

sparkR.stop()

Everything seems fine. However, still not luck with {sparklyr}.

@Ottomata @mpopov

Good news: {sparklyr} can connect to Yarn (tested from stat1005, documenting very soon on Analytics/Systems/Cluster/Spark)

Not so good news: I have ended an R session with a {sparklyr} connection open while testing, and now I cannot access the Application Master for:

https://yarn.wikimedia.org/cluster/app/application_1523429574968_52985

So, whoever can kill it: please do it. @Ottomata? Thanks. I cannot initiate another connection to Spark from {sparklyr} until this one is gone. SparkR wouldn't behave in this way (tested).

Update 04/26/2018: either the sparklyr application died of itself, or someone has killed it. I will not investigate any further :) However, I will proceed with the {sparklyr} tests and report back here.

Anyone can kill YARN jobs that they own:

https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-site/YarnCommands.html#application

yarn kill -applicationId application_1523429574968_52985

@Ottomata Thank you - I didn't know about this one. I will need to check further why this happens with {sparklyr}.

@Ottomata In relation to T139487#4161142 ("Anyone can kill YARN jobs that they own"), from stat1005:

yarn kill -applicationId application_1523429574968_100322

results in

Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Error: Could not find or load main class kill

@GoranSMilovanovic This may work:
yarn application -kill application_1523429574968_100322

Hi, is this still relevant? Is SparkR enough?

@Ottomata No as far as I am concerned (switched to Pyspark, while SparkR would do - if I ever need it).