From Wenchen Fan <>
Subject Re: [DISCUSS] Spark Columnar Processing
Date Tue, 26 Mar 2019 04:46:25 GMT
Do you have some initial perf numbers? It seems fine to me to remain
row-based inside Spark with whole-stage-codegen, and convert rows to
columnar batches when communicating with external systems.

On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <> wrote:

> This thread is to discuss adding in support for data frame processing
> using an in-memory columnar format compatible with Apache Arrow.  My main
> goal in this is to lay the groundwork so we can add in support for GPU
> accelerated processing of data frames, but this feature has a number of
> other benefits.  Spark currently supports Apache Arrow formatted data as an
> option to exchange data with python for pandas UDF processing. There has
> also been discussion around extending this to allow for exchanging data
> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
> processing on Arrow compatible data it could eliminate the
> serialization/deserialization overhead when going between these systems.
> It also would allow for doing optimizations on a CPU with SIMD instructions
> similar to what Hive currently supports. Accelerated processing using a GPU
> is something that we will start a separate discussion thread on, but I
> wanted to set the context a bit.
> Jason Lowe, Tom Graves, and I created a prototype over the past few months
> to try and understand how to make this work.  What we are proposing is
> based off of lessons learned when building this prototype, but we really
> wanted to get feedback early on from the community. We will file a SPIP
> once we can get agreement that this is a good direction to go in.
> The current support for columnar processing lets a Parquet or Orc file
> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
> erasure. The code generation is aware that the RDD actually holds
> ColumnarBatchs and generates code to loop through the data in each batch as
> InternalRows.
> Instead, we propose a new set of APIs to work on an
> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
> propose adding in a Rule similar to how WholeStageCodeGen currently works.
> Each part of the physical SparkPlan would expose columnar support through a
> combination of traits and method calls. The rule would then decide when
> columnar processing would start and when it would end. Switching between
> columnar and row based processing is not free, so the rule would make a
> decision based off of an estimate of the cost to do the transformation and
> the estimated speedup in processing time.
> This should allow us to disable columnar support by simply disabling the
> rule that modifies the physical SparkPlan.  It should be minimal risk to
> the existing row-based code path, as that code should not be touched, and
> in many cases could be reused to implement the columnar version.  This also
> allows for small easily manageable patches. No huge patches that no one
> wants to review.
> As far as the memory layout is concerned OnHeapColumnVector and
> OffHeapColumnVector are already really close to being Apache Arrow
> compatible so shifting them over would be a relatively simple change.
> Alternatively we could add in a new implementation that is Arrow compatible
> if there are reasons to keep the old ones.
> Again this is just to get the discussion started, any feedback is welcome,
> and we will file a SPIP on it once we feel like the major changes we are
> proposing are acceptable.
> Thanks,
> Bobby Evans

