spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bobby Evans <>
Subject Re: [DISCUSS] Spark Columnar Processing
Date Tue, 02 Apr 2019 14:00:10 GMT
Thanks to everyone for the feedback.

Overall the feedback has been really positive for exposing columnar as a
processing option to users.  I'll write up a SPIP on the proposed changes
to support columnar processing (not necessarily implement it) and then ping
the list again for more feedback and discussion.

Thanks again,


On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin <> wrote:

> I just realized I didn't make it very clear my stance here ... here's
> another try:
> I think it's a no brainer to have a good columnar UDF interface. This
> would facilitate a lot of high performance applications, e.g. GPU-based
> accelerations for machine learning algorithms.
> On rewriting the entire internals of Spark SQL to leverage columnar
> processing, I don't see enough evidence to suggest that's a good idea yet.
> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans <> wrote:
>> Kazuaki Ishizaki,
>> Yes, ColumnarBatchScan does provide a framework for doing code generation
>> for the processing of columnar data.  I have to admit that I don't have a
>> deep understanding of the code generation piece, so if I get something
>> wrong please correct me.  From what I had seen only input formats currently
>> inherent from ColumnarBatchScan, and from comments in the trait
>>   /**
>>    * Generate [[ColumnVector]] expressions for our parent to consume as
>> rows.
>>    * This is called once per [[ColumnarBatch]].
>>    */
>> It appears that ColumnarBatchScan is really only intended to pull out the
>> data from the batch, and not to process that data in a columnar fashion.
>> The Loading stage that you mentioned.
>> > The SIMDzation or GPUization capability depends on a compiler that
>> translates native code from the code generated by the whole-stage codegen.
>> To be able to support vectorized processing Hive stayed with pure java
>> and let the JVM detect and do the SIMDzation of the code.  To make that
>> happen they created loops to go through each element in a column and remove
>> all conditionals from the body of the loops.  To the best of my knowledge
>> that would still require a separate code path like I am proposing to make
>> the different processing phases generate code that the JVM can compile down
>> to SIMD instructions.  The generated code is full of null checks for each
>> element which would prevent the operations we want.  Also, the intermediate
>> results are often stored in UnsafeRow instances.  This is really fast for
>> row-based processing, but the complexity of how they work I believe would
>> prevent the JVM from being able to vectorize the processing.  If you have a
>> better way to take java code and vectorize it we should put it into OpenJDK
>> instead of spark so everyone can benefit from it.
>> Trying to compile directly from generated java code to something a GPU
>> can process is something we are tackling but we decided to go a different
>> route from what you proposed.  From talking with several compiler experts
>> here at NVIDIA my understanding is that IBM in partnership with NVIDIA
>> attempted in the past to extend the JVM to run at least partially on GPUs,
>> but it was really difficult to get right, especially with how java does
>> memory management and memory layout.
>> To avoid that complexity we decided to split the JITing up into two
>> separate pieces.  I didn't mention any of this before because this
>> discussion was intended to just be around the memory layout support, and
>> not GPU processing.  The first part would be to take the Catalyst AST and
>> produce CUDA code directly from it.  If properly done we should be able to
>> do the selection and projection phases within a single kernel.  The biggest
>> issue comes with UDFs as they cannot easily be vectorized for the CPU or
>> GPU.  So to deal with that we have a prototype written by the compiler team
>> that is trying to tackle SPARK-14083 which can translate basic UDFs into
>> catalyst expressions.  If the UDF is too complicated or covers operations
>> not yet supported it will fall back to the original UDF processing.  I
>> don't know how close the team is to submit a SPIP or a patch for it, but I
>> do know that they have some very basic operations working.  The big issue
>> is that it requires java 11+ so it can use standard APIs to get the byte
>> code of scala UDFs.
>> We split it this way because we thought it would be simplest to
>> implement, and because it would provide a benefit to more than just GPU
>> accelerated queries.
>> Thanks,
>> Bobby
>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki <>
>> wrote:
>> Looks interesting discussion.
>> Let me describe the current structure and remaining issues. This is
>> orthogonal to cost-benefit trade-off discussion.
>> The code generation basically consists of three parts.
>> 1. Loading
>> 2. Selection (map, filter, ...)
>> 3. Projection
>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table cache) is well
>> abstracted by using ColumnVector (
>> class. By combining with ColumnarBatchScan, the whole-stage code generation
>> generate code to directly get valus from the columnar storage if there is
>> no row-based operation.
>> Note: The current master does not support Arrow as a data source.
>> However, I think it is not technically hard to support Arrow.
>> 2. The current whole-stage codegen generates code for element-wise
>> selection (excluding sort and join). The SIMDzation or GPUization
>> capability depends on a compiler that translates native code from the code
>> generated by the whole-stage codegen.
>> 3. The current Projection assume to store row-oriented data, I think that
>> is a part that Wenchen pointed out
>> My slides
>> <>may
>> simplify the above issue and possible implementation.
>> FYI. NVIDIA will present an approach to exploit GPU with Arrow thru
>> Python at SAIS 2019
>> I think that it uses Python UDF support with Arrow in Spark.
>> P.S. I will give a presentation about in-memory data storages for SPark
>> at SAIS 2019
>> :)
>> Kazuaki Ishizaki
>> From:        Wenchen Fan <>
>> To:        Bobby Evans <>
>> Cc:        Spark dev list <>
>> Date:        2019/03/26 13:53
>> Subject:        Re: [DISCUSS] Spark Columnar Processing
>> ------------------------------
>> Do you have some initial perf numbers? It seems fine to me to remain
>> row-based inside Spark with whole-stage-codegen, and convert rows to
>> columnar batches when communicating with external systems.
>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <**
>> <>> wrote:
>> This thread is to discuss adding in support for data frame processing
>> using an in-memory columnar format compatible with Apache Arrow.  My main
>> goal in this is to lay the groundwork so we can add in support for GPU
>> accelerated processing of data frames, but this feature has a number of
>> other benefits.  Spark currently supports Apache Arrow formatted data as an
>> option to exchange data with python for pandas UDF processing. There has
>> also been discussion around extending this to allow for exchanging data
>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
>> processing on Arrow compatible data it could eliminate the
>> serialization/deserialization overhead when going between these systems.
>> It also would allow for doing optimizations on a CPU with SIMD instructions
>> similar to what Hive currently supports. Accelerated processing using a GPU
>> is something that we will start a separate discussion thread on, but I
>> wanted to set the context a bit.
>> Jason Lowe, Tom Graves, and I created a prototype over the past few
>> months to try and understand how to make this work.  What we are proposing
>> is based off of lessons learned when building this prototype, but we really
>> wanted to get feedback early on from the community. We will file a SPIP
>> once we can get agreement that this is a good direction to go in.
>> The current support for columnar processing lets a Parquet or Orc file
>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
>> erasure. The code generation is aware that the RDD actually holds
>> ColumnarBatchs and generates code to loop through the data in each batch as
>> InternalRows.
>> Instead, we propose a new set of APIs to work on an
>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
>> propose adding in a Rule similar to how WholeStageCodeGen currently works.
>> Each part of the physical SparkPlan would expose columnar support through a
>> combination of traits and method calls. The rule would then decide when
>> columnar processing would start and when it would end. Switching between
>> columnar and row based processing is not free, so the rule would make a
>> decision based off of an estimate of the cost to do the transformation and
>> the estimated speedup in processing time.
>> This should allow us to disable columnar support by simply disabling the
>> rule that modifies the physical SparkPlan.  It should be minimal risk to
>> the existing row-based code path, as that code should not be touched, and
>> in many cases could be reused to implement the columnar version.  This also
>> allows for small easily manageable patches. No huge patches that no one
>> wants to review.
>> As far as the memory layout is concerned OnHeapColumnVector and
>> OffHeapColumnVector are already really close to being Apache Arrow
>> compatible so shifting them over would be a relatively simple change.
>> Alternatively we could add in a new implementation that is Arrow compatible
>> if there are reasons to keep the old ones.
>> Again this is just to get the discussion started, any feedback is
>> welcome, and we will file a SPIP on it once we feel like the major changes
>> we are proposing are acceptable.
>> Thanks,
>> Bobby Evans

View raw message