spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Reynold Xin" <r...@databricks.com>
Subject Re: [DISCUSS] Spark Columnar Processing
Date Thu, 11 Apr 2019 17:34:35 GMT
I just realized we had an earlier SPIP on a similar topic: https://issues.apache.org/jira/browse/SPARK-24579

Perhaps we should tie the two together. IIUC, you'd want to expose the existing ColumnBatch
API, but also provide utilities to directly convert from/to Arrow.

On Thu, Apr 11, 2019 at 7:13 AM, Bobby Evans < bobby@apache.org > wrote:

> 
> The SPIP has been up for almost 6 days now with really no discussion on
> it.  I am hopeful that means it's okay and we are good to call a vote on
> it, but I want to give everyone one last chance to take a look and
> comment.  If there are no comments by tomorrow I this we will start a vote
> for this.
> 
> 
> Thanks,
> 
> 
> Bobby
> 
> On Fri, Apr 5, 2019 at 2:24 PM Bobby Evans < bobby@ apache. org (
> bobby@apache.org ) > wrote:
> 
> 
>> I just filed SPARK-27396 as the SPIP for this proposal.  Please use that
>> JIRA for further discussions.
>> 
>> 
>> Thanks for all of the feedback,
>> 
>> 
>> Bobby
>> 
>> On Wed, Apr 3, 2019 at 7:15 PM Bobby Evans < bobby@ apache. org (
>> bobby@apache.org ) > wrote:
>> 
>> 
>>> I am still working on the SPIP and should get it up in the next few days. 
>>> I have the basic text more or less ready, but I want to get a high-level
>>> API concept ready too just to have something more concrete.  I have not
>>> really done much with contributing new features to spark so I am not sure
>>> where a design document really fits in here because from http:/ / spark. apache.
>>> org/ improvement-proposals. html (
>>> http://spark.apache.org/improvement-proposals.html ) and http:/ / spark. apache.
>>> org/ contributing. html ( http://spark.apache.org/contributing.html ) it
>>> does not mention a design anywhere.  I am happy to put one up, but I was
>>> hoping the API concept would cover most of that.
>>> 
>>> 
>>> Thanks,
>>> 
>>> 
>>> Bobby
>>> 
>>> On Tue, Apr 2, 2019 at 9:16 PM Renjie Liu < liurenjie2008@ gmail. com (
>>> liurenjie2008@gmail.com ) > wrote:
>>> 
>>> 
>>>> Hi, Bobby:
>>>> Do you have design doc? I'm also interested in this topic and want to help
>>>> contribute.
>>>> 
>>>> On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans < bobby@ apache. org (
>>>> bobby@apache.org ) > wrote:
>>>> 
>>>> 
>>>>> Thanks to everyone for the feedback.
>>>>> 
>>>>> 
>>>>> Overall the feedback has been really positive for exposing columnar as
a
>>>>> processing option to users.  I'll write up a SPIP on the proposed changes
>>>>> to support columnar processing (not necessarily implement it) and then
>>>>> ping the list again for more feedback and discussion.
>>>>> 
>>>>> 
>>>>> Thanks again,
>>>>> 
>>>>> 
>>>>> Bobby
>>>>> 
>>>>> On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin < rxin@ databricks. com
(
>>>>> rxin@databricks.com ) > wrote:
>>>>> 
>>>>> 
>>>>>> I just realized I didn't make it very clear my stance here ... here's
>>>>>> another try:
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> I think it's a no brainer to have a good columnar UDF interface.
This
>>>>>> would facilitate a lot of high performance applications, e.g. GPU-based
>>>>>> accelerations for machine learning algorithms.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On rewriting the entire internals of Spark SQL to leverage columnar
>>>>>> processing, I don't see enough evidence to suggest that's a good
idea yet.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans < bobby@ apache.
org (
>>>>>> bobby@apache.org ) > wrote:
>>>>>> 
>>>>>>> Kazuaki Ishizaki,
>>>>>>> 
>>>>>>> 
>>>>>>> Yes, ColumnarBatchScan does provide a framework for doing code
generation
>>>>>>> for the processing of columnar data.  I have to admit that I
don't have a
>>>>>>> deep understanding of the code generation piece, so if I get
something
>>>>>>> wrong please correct me.  From what I had seen only input formats
>>>>>>> currently inherent from ColumnarBatchScan, and from comments
in the trait
>>>>>>> 
>>>>>>> 
>>>>>>>   /**
>>>>>>>    * Generate [[ColumnVector]] expressions for our parent to
consume as
>>>>>>> rows.
>>>>>>>    * This is called once per [[ColumnarBatch]].
>>>>>>>    */
>>>>>>> https:/ / github. com/ apache/ spark/ blob/ 956b52b1670985a67e49b938ac1499ae65c79f6e/
>>>>>>> sql/ core/ src/ main/ scala/ org/ apache/ spark/ sql/ execution/
ColumnarBatchScan.
>>>>>>> scala#L42-L43 (
>>>>>>> https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43
>>>>>>> )
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> It appears that ColumnarBatchScan is really only intended to
pull out the
>>>>>>> data from the batch, and not to process that data in a columnar
fashion. 
>>>>>>> The Loading stage that you mentioned.
>>>>>>> 
>>>>>>> 
>>>>>>> > The SIMDzation or GPUization capability depends on a compiler
that
>>>>>>> translates native code from the code generated by the whole-stage
codegen.
>>>>>>> 
>>>>>>> To be able to support vectorized processing Hive stayed with
pure java and
>>>>>>> let the JVM detect and do the SIMDzation of the code.  To make
that happen
>>>>>>> they created loops to go through each element in a column and
remove all
>>>>>>> conditionals from the body of the loops.  To the best of my
knowledge that
>>>>>>> would still require a separate code path like I am proposing
to make the
>>>>>>> different processing phases generate code that the JVM can compile
down to
>>>>>>> SIMD instructions.  The generated code is full of null checks
for each
>>>>>>> element which would prevent the operations we want.  Also, the
>>>>>>> intermediate results are often stored in UnsafeRow instances. 
This is
>>>>>>> really fast for row-based processing, but the complexity of how
they work
>>>>>>> I believe would prevent the JVM from being able to vectorize
the
>>>>>>> processing.  If you have a better way to take java code and
vectorize it
>>>>>>> we should put it into OpenJDK instead of spark so everyone can
benefit
>>>>>>> from it.
>>>>>>> 
>>>>>>> 
>>>>>>> Trying to compile directly from generated java code to something
a GPU can
>>>>>>> process is something we are tackling but we decided to go a different
>>>>>>> route from what you proposed.  From talking with several compiler
experts
>>>>>>> here at NVIDIA my understanding is that IBM in partnership with
NVIDIA
>>>>>>> attempted in the past to extend the JVM to run at least partially
on GPUs,
>>>>>>> but it was really difficult to get right, especially with how java
does
>>>>>>> memory management and memory layout.
>>>>>>> 
>>>>>>> 
>>>>>>> To avoid that complexity we decided to split the JITing up into
two
>>>>>>> separate pieces.  I didn't mention any of this before because
this
>>>>>>> discussion was intended to just be around the memory layout support,
and
>>>>>>> not GPU processing.  The first part would be to take the Catalyst
AST and
>>>>>>> produce CUDA code directly from it.  If properly done we should
be able to
>>>>>>> do the selection and projection phases within a single kernel. 
The
>>>>>>> biggest issue comes with UDFs as they cannot easily be vectorized
for the
>>>>>>> CPU or GPU.  So to deal with that we have a prototype written
by the
>>>>>>> compiler team that is trying to tackle SPARK-14083 which can
translate
>>>>>>> basic UDFs into catalyst expressions.  If the UDF is too complicated
or
>>>>>>> covers operations not yet supported it will fall back to the
original UDF
>>>>>>> processing.  I don't know how close the team is to submit a
SPIP or a
>>>>>>> patch for it, but I do know that they have some very basic operations
>>>>>>> working.  The big issue is that it requires java 11+ so it can
use
>>>>>>> standard APIs to get the byte code of scala UDFs.  
>>>>>>> 
>>>>>>> 
>>>>>>> We split it this way because we thought it would be simplest
to implement,
>>>>>>> and because it would provide a benefit to more than just GPU
accelerated
>>>>>>> queries.
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> 
>>>>>>> Bobby
>>>>>>> 
>>>>>>> On Tue, Mar 26, 2019 at 11:59 PM Kazuaki Ishizaki < ISHIZAKI@
jp. ibm. com
>>>>>>> ( ISHIZAKI@jp.ibm.com ) > wrote:
>>>>>>> 
>>>>>>> 
>>>>>>>> Looks interesting discussion.
>>>>>>>> Let me describe the current structure and remaining issues.
This is
>>>>>>>> orthogonal to cost-benefit trade-off discussion.
>>>>>>>> 
>>>>>>>> The code generation basically consists of three parts.
>>>>>>>> 1. Loading
>>>>>>>> 2. Selection (map, filter, ...)
>>>>>>>> 3. Projection
>>>>>>>> 
>>>>>>>> 1. Columnar storage (e.g. Parquet, Orc, Arrow , and table
cache) is well
>>>>>>>> abstracted by using ColumnVector ( https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>>>>>>>> (
>>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
>>>>>>>> ) ) class. By combining with ColumnarBatchScan, the whole-stage
code
>>>>>>>> generation generate code to directly get valus from the columnar
storage
>>>>>>>> if there is no row-based operation.
>>>>>>>> Note: The current master does not support Arrow as a data
source. However,
>>>>>>>> I think it is not technically hard to support Arrow.
>>>>>>>> 
>>>>>>>> 2. The current whole-stage codegen generates code for element-wise
>>>>>>>> selection (excluding sort and join). The SIMDzation or GPUization
>>>>>>>> capability depends on a compiler that translates native code
from the code
>>>>>>>> generated by the whole-stage codegen.
>>>>>>>> 
>>>>>>>> 3. The current Projection assume to store row-oriented data,
I think that
>>>>>>>> is a part that Wenchen pointed out
>>>>>>>> 
>>>>>>>> My slides https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use/41
>>>>>>>> (
>>>>>>>> https://www.slideshare.net/ishizaki/making-hardware-accelerator-easier-to-use
>>>>>>>> ) may simplify the above issue and possible implementation.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> FYI. NVIDIA will present an approach to exploit GPU with
Arrow thru Python
>>>>>>>> at SAIS 2019 https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>>>>>>>> (
>>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=110
>>>>>>>> ). I think that it uses Python UDF support with Arrow in
Spark.
>>>>>>>> 
>>>>>>>> P.S. I will give a presentation about in-memory data storages
for SPark at
>>>>>>>> SAIS 2019 https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>>>>>>>> (
>>>>>>>> https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=40
>>>>>>>> ) :)
>>>>>>>> 
>>>>>>>> Kazuaki Ishizaki
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> From:        Wenchen Fan < cloud0fan@ gmail. com (
cloud0fan@gmail.com ) >
>>>>>>>> 
>>>>>>>> To:        Bobby Evans < bobby@ apache. org ( bobby@apache.org
) >
>>>>>>>> Cc:        Spark dev list < dev@ spark. apache. org
( dev@spark.apache.org
>>>>>>>> ) >
>>>>>>>> Date:        2019/03/26 13:53
>>>>>>>> Subject:        Re: [DISCUSS] Spark Columnar Processing
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Do you have some initial perf numbers? It seems fine to me
to remain
>>>>>>>> row-based inside Spark with whole-stage-codegen, and convert
rows to
>>>>>>>> columnar batches when communicating with external systems.
>>>>>>>> 
>>>>>>>> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans < bobby@apache.org
(
>>>>>>>> bobby@apache.org ) > wrote:
>>>>>>>> This thread is to discuss adding in support for data frame
processing
>>>>>>>> using an in-memory columnar format compatible with Apache
Arrow.  My main
>>>>>>>> goal in this is to lay the groundwork so we can add in support
for GPU
>>>>>>>> accelerated processing of data frames, but this feature has
a number of
>>>>>>>> other benefits.  Spark currently supports Apache Arrow formatted
data as
>>>>>>>> an option to exchange data with python for pandas UDF processing.
There
>>>>>>>> has also been discussion around extending this to allow for
exchanging
>>>>>>>> data with other tools like pytorch, tensorflow, xgboost,...
If Spark
>>>>>>>> supports processing on Arrow compatible data it could eliminate
the
>>>>>>>> serialization/deserialization overhead when going between
these systems. 
>>>>>>>> It also would allow for doing optimizations on a CPU with
SIMD
>>>>>>>> instructions similar to what Hive currently supports. Accelerated
>>>>>>>> processing using a GPU is something that we will start a
separate
>>>>>>>> discussion thread on, but I wanted to set the context a bit.
>>>>>>>> Jason Lowe, Tom Graves, and I created a prototype over the
past few months
>>>>>>>> to try and understand how to make this work.  What we are
proposing is
>>>>>>>> based off of lessons learned when building this prototype,
but we really
>>>>>>>> wanted to get feedback early on from the community. We will
file a SPIP
>>>>>>>> once we can get agreement that this is a good direction to
go in.
>>>>>>>> 
>>>>>>>> The current support for columnar processing lets a Parquet
or Orc file
>>>>>>>> format return a ColumnarBatch inside an RDD[InternalRow]
using Scala’s
>>>>>>>> type erasure. The code generation is aware that the RDD actually
holds
>>>>>>>> ColumnarBatchs and generates code to loop through the data
in each batch
>>>>>>>> as InternalRows.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Instead, we propose a new set of APIs to work on an
>>>>>>>> RDD[InternalColumnarBatch] instead of abusing type erasure.
With this we
>>>>>>>> propose adding in a Rule similar to how WholeStageCodeGen
currently works.
>>>>>>>> Each part of the physical SparkPlan would expose columnar
support through
>>>>>>>> a combination of traits and method calls. The rule would
then decide when
>>>>>>>> columnar processing would start and when it would end. Switching
between
>>>>>>>> columnar and row based processing is not free, so the rule
would make a
>>>>>>>> decision based off of an estimate of the cost to do the transformation
and
>>>>>>>> the estimated speedup in processing time.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> This should allow us to disable columnar support by simply
disabling the
>>>>>>>> rule that modifies the physical SparkPlan.  It should be
minimal risk to
>>>>>>>> the existing row-based code path, as that code should not
be touched, and
>>>>>>>> in many cases could be reused to implement the columnar version. 
This
>>>>>>>> also allows for small easily manageable patches. No huge
patches that no
>>>>>>>> one wants to review.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> As far as the memory layout is concerned OnHeapColumnVector
and
>>>>>>>> OffHeapColumnVector are already really close to being Apache
Arrow
>>>>>>>> compatible so shifting them over would be a relatively simple
change.
>>>>>>>> Alternatively we could add in a new implementation that is
Arrow
>>>>>>>> compatible if there are reasons to keep the old ones.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Again this is just to get the discussion started, any feedback
is welcome,
>>>>>>>> and we will file a SPIP on it once we feel like the major
changes we are
>>>>>>>> proposing are acceptable.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Bobby Evans
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Renjie Liu
>>>> Software Engineer, MVAD
>>>> 
>>> 
>>> 
>> 
>> 
> 
>
Mime
View raw message