spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hyukjin Kwon (JIRA)" <>
Subject [jira] [Commented] (SPARK-18924) Improve collect/createDataFrame performance in SparkR
Date Thu, 14 Feb 2019 10:13:01 GMT


Hyukjin Kwon commented on SPARK-18924:

Oops, sorry guys. I just found this. I made a PR via SPARK-26762. Let me resolve this one
as a duplicate of that.

> Improve collect/createDataFrame performance in SparkR
> -----------------------------------------------------
>                 Key: SPARK-18924
>                 URL:
>             Project: Spark
>          Issue Type: Improvement
>          Components: SparkR
>            Reporter: Xiangrui Meng
>            Priority: Critical
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * [SerDe.scala|]
> * [SQLUtils.scala|]
> The SerDe on the R side is implemented in:
> * [deserialize.R|]
> * [serialize.R|]
> The serialization between JVM and R suffers from huge storage and computation overhead.
For example, a short round trip of 1 million doubles surprisingly took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(1000000)))))
>    user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R workflow is
a use case we should pay attention to. SparkR will never be able to cover all existing features
from CRAN packages. It is also unnecessary for Spark to do so because not all features need
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before SparkR. I'm not
sure whether we have a license issue in depending on those libraries. Another option is to
switch to low-level R'C interface or Rcpp, which again might have license issue. I'm not an
expert here. If we have to implement our own, there still exist much space for improvement,
discussed below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, which collects
rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based on column
types, primitive types in particular. We need to handle null/NA values properly. A simple
data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector in a single
method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(10000000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>    user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 10000000))
>    user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in general, it
should be feasible to obtain 20x or more performance gain.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message