spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Darin McBeath <ddmcbe...@yahoo.com.INVALID>
Subject Dataset Filter performance - trying to understand
Date Thu, 01 Sep 2016 17:40:41 GMT
I've been trying to understand the performance of Datasets (and filters) in Spark 2.0. 

I have a Dataset which I've read from a parquet file and cached into memory (deser).  This
is spread across 8 partitions and consumes a total of 826MB of memory on my cluster.  I verified
that the dataset was 100% cached in memory by looking at the Spark UI.

I'm using an AWS c3.2xlarge for my 1 worker (8 cores).

There are 108,587,678 total records in my cached dataset (om).

I run the following command (against this cached Dataset) and it takes 13.56s.

om.filter(textAnnotation => textAnnotation.annotType == "ce:para").count

This returns a count of 1,039,993

When I look at the explain() for this query, I see the following:

== Physical Plan ==

*Filter <function1>.apply+- InMemoryTableScan [docId#394, annotSet#395, annotType#396,
startOffset#397L, endOffset#398L, annotId#399L, parentId#400L, orig#401, lemma#402, pos#403,
xmlId#404], [<function1>.apply]
+- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L,
annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], true, 10000, StorageLevel(disk,
memory, deserialized, 1 replicas)
+- Exchange hashpartitioning(docId#394, 8)

...
I was a bit perplexed why this takes so long as I had read that Spark could filter 1B rows
a second on a single cpu.  Granted, my row is likely more complex but I thought it should
be faster than 13+ seconds to read in 100M rows that had been cached into memory.

So, I modified the above query to the following:

om.filter("annotType == 'ce:para'").count

The query now completes in just over 1s (a huge improvement).

When I do the explain plan for this query, I see the following:

== Physical Plan ==
*Filter (isnotnull(annotType#396) && (annotType#396 = ce:para))
+- InMemoryTableScan [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L,
annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], [isnotnull(annotType#396),
(annotType#396 = ce:para)]
+- InMemoryRelation [docId#394, annotSet#395, annotType#396, startOffset#397L, endOffset#398L,
annotId#399L, parentId#400L, orig#401, lemma#402, pos#403, xmlId#404], true, 10000, StorageLevel(disk,
memory, deserialized, 1 replicas)
+- Exchange hashpartitioning(docId#394, 8)

This is very similar to the first with the notable exception of 

*Filter (isnotnull(annotType#396) && (annotType#396 = ce:para))  
instead of

*Filter <function1>.apply

I'm guessing the improved performance is because the object TextAnnotation must be created
in the first example (and not the second).  Although, this is not clear from the explain plans.
 Is that correct?  Or is there some other reason why the second approach is significantly
faster?  I would really like to get a solid understanding for why the performance of the second
query is so much faster.

I also want to clarify whether the InMemoryTableScan and inMemoryRelation are part of the
whole-stage code generation.  I'm thinking they aren't as they aren't prefixed by a "*". 
If not, is there something I could do to make take this part of whole-stage code generation?

My goal is to make the above operation as fast as possible.  I could of course increase the
partitions (and the size of my cluster) but I also want to clarify my understanding of whole-stage
code generation. 

Any thought/suggestions would be appreciated.  Also, if anyone has found good resources that
further explain the details of the DAG and whole-stage code generation, I would appreciate
those as well.

Thanks.

Darin.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message