spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bobby Evans <bo...@apache.org>
Subject Re: [DISCUSS] Spark Columnar Processing
Date Tue, 26 Mar 2019 13:57:33 GMT
Cloudera reports a 26% improvement in hive query runtimes by enabling
vectorization. I would expect to see similar improvements but at the cost
of keeping more data in memory.  But remember this also enables a number of
different hardware acceleration techniques.  If the data format is arrow
compatible and off-heap someone could offload the processing to native code
which typically results in a 2x improvement over java (and the cost of a
JNI call would be amortized over processing an entire batch at once).
Also, we plan on adding in GPU acceleration and ideally making it a
standard part of Spark.  In our initial prototype, we saw queries which we
could make fully columnar/GPU enabled being 5-6x faster.  But that really
was just a proof of concept and we expect to be able to do quite a bit
better when we are completely done.  Many commercial GPU enabled SQL
engines claim to be 20x to 200x faster than Spark, depending on the use
case. Digging deeply you see that they are not apples to apples
comparisons, i.e. reading from cached GPU memory and having spark read from
a file, or using parquet as input but asking spark to read CSV.  That being
said I would expect that we can achieve something close to the 20x range
for most queries and possibly more if they are computationally intensive.

Also as a side note, we initially thought that the conversion would not be
too expensive and that we could just move computationally intensive
processing onto the GPU piecemeal with conversions on both ends.  In
practice, we found that the cost of conversion quickly starts to dominate
the queries we were testing.

On Mon, Mar 25, 2019 at 11:53 PM Wenchen Fan <cloud0fan@gmail.com> wrote:

> Do you have some initial perf numbers? It seems fine to me to remain
> row-based inside Spark with whole-stage-codegen, and convert rows to
> columnar batches when communicating with external systems.
>
> On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <bobby@apache.org> wrote:
>
>> This thread is to discuss adding in support for data frame processing
>> using an in-memory columnar format compatible with Apache Arrow.  My main
>> goal in this is to lay the groundwork so we can add in support for GPU
>> accelerated processing of data frames, but this feature has a number of
>> other benefits.  Spark currently supports Apache Arrow formatted data as an
>> option to exchange data with python for pandas UDF processing. There has
>> also been discussion around extending this to allow for exchanging data
>> with other tools like pytorch, tensorflow, xgboost,... If Spark supports
>> processing on Arrow compatible data it could eliminate the
>> serialization/deserialization overhead when going between these systems.
>> It also would allow for doing optimizations on a CPU with SIMD instructions
>> similar to what Hive currently supports. Accelerated processing using a GPU
>> is something that we will start a separate discussion thread on, but I
>> wanted to set the context a bit.
>>
>> Jason Lowe, Tom Graves, and I created a prototype over the past few
>> months to try and understand how to make this work.  What we are proposing
>> is based off of lessons learned when building this prototype, but we really
>> wanted to get feedback early on from the community. We will file a SPIP
>> once we can get agreement that this is a good direction to go in.
>>
>> The current support for columnar processing lets a Parquet or Orc file
>> format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type
>> erasure. The code generation is aware that the RDD actually holds
>> ColumnarBatchs and generates code to loop through the data in each batch as
>> InternalRows.
>>
>> Instead, we propose a new set of APIs to work on an
>> RDD[InternalColumnarBatch] instead of abusing type erasure. With this we
>> propose adding in a Rule similar to how WholeStageCodeGen currently works.
>> Each part of the physical SparkPlan would expose columnar support through a
>> combination of traits and method calls. The rule would then decide when
>> columnar processing would start and when it would end. Switching between
>> columnar and row based processing is not free, so the rule would make a
>> decision based off of an estimate of the cost to do the transformation and
>> the estimated speedup in processing time.
>>
>> This should allow us to disable columnar support by simply disabling the
>> rule that modifies the physical SparkPlan.  It should be minimal risk to
>> the existing row-based code path, as that code should not be touched, and
>> in many cases could be reused to implement the columnar version.  This also
>> allows for small easily manageable patches. No huge patches that no one
>> wants to review.
>>
>> As far as the memory layout is concerned OnHeapColumnVector and
>> OffHeapColumnVector are already really close to being Apache Arrow
>> compatible so shifting them over would be a relatively simple change.
>> Alternatively we could add in a new implementation that is Arrow compatible
>> if there are reasons to keep the old ones.
>>
>> Again this is just to get the discussion started, any feedback is
>> welcome, and we will file a SPIP on it once we feel like the major changes
>> we are proposing are acceptable.
>>
>> Thanks,
>>
>> Bobby Evans
>>
>

Mime
View raw message