Do you have some initial perf numbers? It seems fine to me to remain row-based inside Spark with whole-stage-codegen, and convert rows to columnar batches when communicating with external systems.On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <firstname.lastname@example.org> wrote:
This thread is to discuss adding in support for data frame processing using an in-memory columnar format compatible with Apache Arrow. My main goal in this is to lay the groundwork so we can add in support for GPU accelerated processing of data frames, but this feature has a number of other benefits. Spark currently supports Apache Arrow formatted data as an option to exchange data with python for pandas UDF processing. There has also been discussion around extending this to allow for exchanging data with other tools like pytorch, tensorflow, xgboost,... If Spark supports processing on Arrow compatible data it could eliminate the serialization/deserialization overhead when going between these systems. It also would allow for doing optimizations on a CPU with SIMD instructions similar to what Hive currently supports. Accelerated processing using a GPU is something that we will start a separate discussion thread on, but I wanted to set the context a bit.
Jason Lowe, Tom Graves, and I created a prototype over the past few months to try and understand how to make this work. What we are proposing is based off of lessons learned when building this prototype, but we really wanted to get feedback early on from the community. We will file a SPIP once we can get agreement that this is a good direction to go in.
The current support for columnar processing lets a Parquet or Orc file format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type erasure. The code generation is aware that the RDD actually holds ColumnarBatchs and generates code to loop through the data in each batch as InternalRows.
Instead, we propose a new set of APIs to work on an RDD[InternalColumnarBatch] instead of abusing type erasure. With this we propose adding in a Rule similar to how WholeStageCodeGen currently works. Each part of the physical SparkPlan would expose columnar support through a combination of traits and method calls. The rule would then decide when columnar processing would start and when it would end. Switching between columnar and row based processing is not free, so the rule would make a decision based off of an estimate of the cost to do the transformation and the estimated speedup in processing time.
This should allow us to disable columnar support by simply disabling the rule that modifies the physical SparkPlan. It should be minimal risk to the existing row-based code path, as that code should not be touched, and in many cases could be reused to implement the columnar version. This also allows for small easily manageable patches. No huge patches that no one wants to review.
As far as the memory layout is concerned OnHeapColumnVector and OffHeapColumnVector are already really close to being Apache Arrow compatible so shifting them over would be a relatively simple change. Alternatively we could add in a new implementation that is Arrow compatible if there are reasons to keep the old ones.
Again this is just to get the discussion started, any feedback is welcome, and we will file a SPIP on it once we feel like the major changes we are proposing are acceptable.Thanks,Bobby Evans