Page MenuHomePhabricator

SparkR on Spark 2.3.0 - Testing on Large Data Sets
Closed, ResolvedPublic2 Story Points

Description

Upon connecting to the cluster with SparkR from stat1005 the following behavior - possibly indicating that R is not installed on all workers - is observed:

1. Connect (note: library(SparkR) is already loaded)

sparkR.session(master = "yarn", appName = "SparkR", sparkHome = "/usr/lib/spark2/", sparkConfig = list(spark.driver.memory = "4g", spark.driver.cores = "1", spark.executor.memory = "2g"))

It seems Ok:

Spark package found in SPARK_HOME: /usr/lib/spark2/
Launching java with spark-submit command /usr/lib/spark2//bin/spark-submit   --driver-memory "4g" sparkr-shell /tmp/RtmpPd5D4Z/backend_portb1d9abf347 
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
18/04/17 10:24:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Java ref type org.apache.spark.sql.SparkSession id 1

2. Do something (anything, in fact, will result in the same problem):

df <- createDataFrame(iris)
Warning messages:                                                                                                                                                                            
1: In FUN(X[[i]], ...) :
  Use Sepal_Length instead of Sepal.Length  as column name
2: In FUN(X[[i]], ...) :
  Use Sepal_Width instead of Sepal.Width  as column name
3: In FUN(X[[i]], ...) :
  Use Petal_Length instead of Petal.Length  as column name
4: In FUN(X[[i]], ...) :
  Use Petal_Width instead of Petal.Width  as column name

and here we go then:

head(filter(df, df$Sepal_length > 0))

This results in what is essentially reported as:

18/04/17 10:28:25 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, analytics1045.eqiad.wmnet, executor 2): java.io.IOException: Cannot run program "Rscript": error=2, No such file or directory

(verbose output is suppressed here).

Upon searching for a possible cause for some time, I have note that typically the first advise is to ask oneself whether R is installed on all worker nodes.

NOTE. The same happens upon starting a SparkR session as documented on Analytics/Systems/Cluster/Spark:

spark2R --master yarn --executor-memory 2G --executor-cores 1 --driver-memory 4G

Please advise.

NOTE: Since the above described problem was resolved, the ticket will be now used to report upon the results of SparkR tests.

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald TranscriptApr 17 2018, 10:33 AM
GoranSMilovanovic updated the task description. (Show Details)

Change 427159 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Install R on Hadoop workers

https://gerrit.wikimedia.org/r/427159

Change 427159 merged by Ottomata:
[operations/puppet@production] Install R on Hadoop workers

https://gerrit.wikimedia.org/r/427159

@Ottomata Andrew, seems like we have R on workers, thank you.
However, any operation, for example:

head(filter(df, df$Sepal_length > 0))

now results in:

java.net.SocketTimeoutException: Accept timed out

@Ottomata Fixed with the arguments to sparkR.session() and the preceding Sys.setenv() R call as in the following:

library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
sparkR.session(master = "yarn", appName = "SparkR", sparkHome = "/usr/lib/spark2/", sparkConfig = list(
    spark.driver.memory = "4g", spark.driver.cores = "1", spark.executor.memory = "2g", 
    spark.shuffle.service.enabled=TRUE, spark.dynamicAllocation.enabled=TRUE))
df <- createDataFrame(iris)
model <- spark.glm(data = df, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")
summary(model)
sparkR.stop()

Reference: Not able to retrieve data from SparkR created DataFrame, Stack Overflow.

I will be documenting this at Analytics/Systems/Cluster/Spark on Wikitech.

Thanks for support. Closing the ticket.

GoranSMilovanovic closed this task as Resolved.Apr 17 2018, 4:39 PM
GoranSMilovanovic claimed this task.

Change 427170 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Don't need to pin backports for R on jessie anymore

https://gerrit.wikimedia.org/r/427170

Change 427170 merged by Ottomata:
[operations/puppet@production] Don't need to pin backports for R on jessie anymore

https://gerrit.wikimedia.org/r/427170

STill working on this, am having versioning problems on different nodes...

GoranSMilovanovic reopened this task as Open.Apr 17 2018, 5:04 PM

Ok! Got it. Should be good now.

Ottomata claimed this task.Apr 17 2018, 5:54 PM
Ottomata triaged this task as Normal priority.
Ottomata set the point value for this task to 2.
Ottomata edited projects, added Analytics-Kanban; removed Analytics.
Ottomata moved this task from Next Up to Done on the Analytics-Kanban board.
GoranSMilovanovic added a comment.EditedApr 17 2018, 6:05 PM
  • Elementary tests with {SparkR} conducted: works.
  • In the following hours I will be testing MLlib procedures with larger data sets.
  • Reporting back here and documenting on Analytics/Systems/Cluster/Spark on Wikitech.
  • Finally, I will try to make {sparklyr} connect (see T139487) too; we will probably learn something even if it fails to connect again.

I am closing the ticket as soon as the tests are completed.

GoranSMilovanovic renamed this task from SparkR on Spark 2.3.0 - Cannot run program "Rscript" to SparkR on Spark 2.3.0 - Testing on Large Data Sets.Apr 18 2018, 12:55 AM
GoranSMilovanovic updated the task description. (Show Details)

Mentioned in SAL (#wikimedia-operations) [2018-04-18T08:44:35Z] <elukey> execute cumin 'analytics10[28-69]*' 'rm /etc/apt/preferences.d/r_* && apt-get update' to clear jessie backports apt config - T192348

elukey added a subscriber: elukey.EditedApr 18 2018, 9:10 AM

This morning I removed the old apt config for jessie backports (since after https://gerrit.wikimedia.org/r/427170 it seemed not needed and puppet was broken on Jessie hosts) but now this is the situation for the Hadoop workers:

Jessie hosts (analytics1028->69)

elukey@analytics1030:~$ dpkg -l | egrep 'r-(base|cran|rec)' | awk '{print $1" "$2" "$3}'
ii r-base 3.1.1-1+deb8u1
ii r-base-core 3.1.1-1+deb8u1
ii r-base-dev 3.1.1-1+deb8u1
ii r-cran-boot 1.3-13-1
ii r-cran-class 7.3-11-1
ii r-cran-cluster 1.15.3-1
ii r-cran-codetools 0.2-9-1
ii r-cran-dbi 0.3.1-1
ii r-cran-foreign 0.8.61-1
ii r-cran-kernsmooth 2.23-13-1
ii r-cran-lattice 0.20-29-1
ii r-cran-mass 7.3-34-1
ii r-cran-matrix 1.1-4-1
ii r-cran-mgcv 1.8-3-1
ii r-cran-nlme 3.1.117-1
ii r-cran-nnet 7.3-8-1
ii r-cran-rmysql 0.9-3-2
ii r-cran-rpart 4.1-8-1
ii r-cran-spatial 7.3-8-1
ii r-cran-survival 2.37-7-1
ii r-recommended 3.1.1-1+deb8u1

Stretch hosts:

elukey@analytics1070:~$ dpkg -l | egrep 'r-(base|cran|rec)' | awk '{print $1" "$2" "$3}'
ii r-base 3.3.3-1
ii r-base-core 3.3.3-1
ii r-base-dev 3.3.3-1
ii r-cran-boot 1.3-18-2
ii r-cran-class 7.3-14-1
ii r-cran-cluster 2.0.5-1
ii r-cran-codetools 0.2-15-1
ii r-cran-dbi 0.5-1-1
ii r-cran-foreign 0.8.67-1
ii r-cran-kernsmooth 2.23-15-2
ii r-cran-lattice 0.20-34-1
ii r-cran-mass 7.3-45-1
ii r-cran-matrix 1.2-7.1-1
ii r-cran-mgcv 1.8-16-1
ii r-cran-nlme 3.1.129-1
ii r-cran-nnet 7.3-12-1
ii r-cran-rmysql 0.10.11-1
ii r-cran-rpart 4.1-10-2
ii r-cran-spatial 7.3-11-1
ii r-cran-survival 2.40-1-1
ii r-recommended 3.3.3-1

If this is fine we can leave things in this way and then wait for the complete upgrade of the Hadoop cluster to Debian Stretch (probably not happening very soon, next quarter is my best bet), but if the different versions is a problem we'd need to re-add the jessie backports config again.

@Ottomata Hope I didn't get your last patch wrong, now puppet runs but let's chat about what is the ideal final state (also @GoranSMilovanovic let us know your thoughts).

@elukey The ideal situation would be to have the same version of R installed across all worker nodes. At this point, we have R 3.1.1 (2014) on Jessie hosts and R 3.3.3 (2017) on Stretch hosts. However, there were no backwards compatibility issues in R for years already, so let's not worry about this now. If it turns out that the discrepancy causes any trouble upon testing, I will report back - but I don't think it will. Thanks for support!

Ok! I am going to chat with Andrew about https://gerrit.wikimedia.org/r/427159, since adding r-base-core to the packages pinned to jessie-backports seems to work (tested this morning with `sudo apt-get install r-base-core -t jessie-backports on an1028 before cleaning up).

Whaa? I did the very cumin rm apt-get update thing you did yesterday! Puppet ran everywhere fine then...or at least I thought it did.

I think we should just leave this as is (unless it causes problems for Goran etc.) and wait until we update to Stretch.

@Ottomata Everything seems to be fine thus far. However, I am not testing any really large datasets yet ( nycflights13, and that would be some 336776 rows, 16 columns).

Q: Should I set spark.executor.instances and spark.executor.cores myself, or should I let Spark do it for me? I am getting to an impression that it is running a Classification Decision Tree (MLlib) on the above mentioned data set on one worker node only, utilizing one core only - while at the same time complaining at every stage how the task is "large" (e.g. ...contains a task of very large size (73531 KB). The maximum recommended task size is 100 KB.)

Gonna ping @JAllemandou on this one ^ :)

Hi @GoranSMilovanovic.
I am not fluent in sparkR but here are a few thoughts:

GoranSMilovanovic added a comment.EditedApr 20 2018, 4:09 AM

@JAllemandou

  • Spark launches in the Dynamic Allocation mode - checked;
  • With different settings (e.g. spark.executor.instances, spark.dynamicAllocation.minExecutors), it will use several executors, but from what I see from the Spark UI on yarn.wikimedia.org it always uses only one executor at a time; however
  • that was a test on a rather small data set (~ 330K rows, 5 columns or so).

Now, testing on a somewhat larger data set, >5M rows, more columns, approx. .5Gb:

# - flights dataset: > 5M rows, ~0.5Gb

# - R packages
library(SparkR)
library(data.table)

# - set environmental variables
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

# - Start SparkR session
# - First, needed to increase the spark.rpc.message.maxSize in the sparkR.session() call

sparkR.session(master = "yarn", 
               appName = "SparkR", 
               sparkHome = "/usr/lib/spark2/", 
               sparkConfig = list(spark.driver.memory = "4g",
                                  spark.driver.cores = "2",
                                  spark.shuffle.service.enabled = TRUE,
                                  spark.dynamicAllocation.enabled = TRUE,
                                  spark.executor.instances = "8",
                                  spark.dynamicAllocation.minExecutors = "4",
                                  spark.executor.cores  = "2",
                                  spark.executor.memory = "4g",
                                  spark.rpc.message.maxSize	= "512"
               )
)

# - use fread() from {data.table} to load the flights data set
dFrame <- fread("/srv/home/goransm/testData/flights.csv", header = TRUE)

# - create SparkDataFrame from dFrame:
df <- createDataFrame(dFrame)

produces the following problem, even after increasing spark.rpc.message.maxSize to 512; the default should be 128):

[Stage 0:>                                                                                                                                                                       (0 + 0) / 1]Exception in thread "dispatcher-event-loop-1" java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:481)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:462)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:462)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:286)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:281)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:366)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$9.apply(TaskSchedulerImpl.scala:364)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:364)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:361)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:361)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:247)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:136)
        at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)

Interesting enough, when the session was started with Hive support I was able to grab a large portion of a Hive table and make it a SparkDataFrame with no problems at all:

### --- Start SparkR session
sparkR.session(master = "yarn", 
               appName = "SparkR", 
               sparkHome = "/usr/lib/spark2/", 
               sparkConfig = list(spark.driver.memory = "4g",
                                  spark.driver.cores = "1",
                                  spark.shuffle.service.enabled = TRUE,
                                  spark.dynamicAllocation.enabled = TRUE,
                                  spark.enableHiveSupport = TRUE,
                                  spark.executor.instances = "4",
                                  spark.dynamicAllocation.minExecutors = "4",
                                  spark.executor.cores  = "4",
                                  spark.executor.memory = "2g"
               )
)

# - a portion of the goransm.wdcm_maintable as a SparkDataFrame
results <- sql('USE goransm')
results <- sql('SELECT * FROM wdcm_maintable WHERE category="Human"')
> head(results)
  eu_entity_id   eu_project eu_count category                                                                                                                                                
1     Q1000002       dewiki        1    Human
2     Q1000005  commonswiki        1    Human
3     Q1000005       cswiki        1    Human
4     Q1000005  cswikiquote        1    Human
5     Q1000005 cswikisource        1    Human
6     Q1000005       dewiki        1    Human

Hi @GoranSMilovanovic ,
The problem I see in your code is that you instanciate the dataframe as a R structure, and then convert it to spark.
The first steps involves creating the dataframe and loading the datagrame in R, which involves only the driver. Since your driver has 4g RAM, you have a memory error.
When dealing with big datasets, you should use spark reading functions (they don't load the full datasets in the driver).
I found

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

in https://spark.apache.org/docs/latest/sparkr.html
Maybe you could try that?
Cheers

GoranSMilovanovic added a comment.EditedApr 25 2018, 12:56 AM

Hi @JAllemandou
I think it all needs to go to HDFS first. After starting a SparkR session with Hive support enabled:

# - copy flight.csv to HDFS
system('hdfs dfs -put /home/goransm/testData/flights.csv hdfs://analytics-hadoop/user/goransm/flights.csv', 
       wait = T)

# - load flights
df <- read.df("flights.csv", 
              "csv", 
              header = "true", 
              inferSchema = "true", 
              na.strings = "NA")

and all is fine. I've tried to pull some tricks with spark.addFile () but with no success whatsoever. If I understand things correctly, a local file must be present on each node of the cluster while running Spark in cluster mode on Yarn, and spark.addFile () should be able to do that (again: if I understand the relevant discussions correctly); however, it kept looking for files in the HDFS paths and complained whenever it couldn't find the desired file there. I have simply accepted the fact.

I was able to perform a (trivial) Multinomial Logistic Regression test on the flights.csv dataset w. some >5M rows, now documented in the Large(er)_file_from_HDFS section of our Spark documentation; very nice performance, indeed.

I think we can close this now. In the meantime, I'm on T139487 - getting {sparklyr} to work, if possible.

As ever, thanks for all the support.

GoranSMilovanovic closed this task as Resolved.Apr 25 2018, 1:00 AM

@elukey The ideal situation would be to have the same version of R installed across all worker nodes.

As FYI T192557 tracks the reimage of all the Hadoop worker nodes to Debian Stretch (loooong way to go but we should get there in a couple of months top).