spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Venkat Subramanian <>
Subject Re: Spark SQL table Join, one task is taking long
Date Thu, 04 Dec 2014 20:57:34 GMT
Hi Cheng,

Thank you very much for taking your time and providing a detailed
I tried a few things you suggested and some more things.

The ContactDetail table (8 GB) is the fact table and DAgents is the Dim
table (<500 KB), reverse of what you are assuming, but your ideas still

I tried the following:

a) Cached the smaller Dim table to memory.
 sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "10000000")

UI -> Stage -> Storage shows it to be cached in RDD when I run it.

val CDJoinQry= sqlContext.sql("SELECT  * FROM ContactDetail, DAgents  WHERE
ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") => ta(4)).count

I see no difference in terms of performance. It takes the same amount of
time for the query ~1.2 min

b)  I reversed both the order of tables and where clause in the query 

val CDJoinQry= sqlContext.sql("SELECT  * FROM DAgents, ContactDetail  WHERE
DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6")

The performance went  bad. It took 6-7 min to complete.

Just changing the order of table in Select for this join, keeping the same
where clause order, perf was similar (1.2-1.4 min).

c)  Using query in a), I tried to keep the storage in columnar fashion with 
sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")

I see no difference in terms of performance. It takes the same amount of
time for the query ~1.2 min.
Not sure if it even works.

d) I tried changing the comma separated HDFS files to Parquet format in HDFS
and reading it as parquet and then running query on it.


val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet")

val FContactDetailParquetRDD =

val CDJoinQryParquet= sqlContext.sql("SELECT  * FROM ContactDetailParquet,
DAgentsParquet  WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and
DAgentsParquet.f1 = 902") => ta(4)).count

*The query time is actually more for this join query.* It ended up taking
3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse
than non parquet for this join.

I changed the query where table order and where clause was reversed and ran
it for parquet

val CDJoinQryParquetReversed= sqlContext.sql("SELECT  * FROM DAgentsParquet,
ContactDetailParquet  WHERE   DAgentsParquet.f1 = 902 and
DAgentsParquet.f1=ContactDetailParquet.f6 ") => ta(4)).count

it took > 18 min and had to kill it as it kept on running.

*But queries where there is no join, Parquet's performance was extremely
For example, this query below where there is no join, ran in 8 seconds,
whereas the same query in non parquet  took 30 seconds.
val CDJoinQryParquet0= sqlContext.sql("SELECT  * FROM ContactDetailParquet
WHERE ContactDetailParquet.f6 = 902") => ta(4)).count

*Some potential conclusions (pl. comment) :*
* Order in where clause seems to matter in Spark SQL optimizer. In
relational DBs  that I have worked with, when I noticed, order of where
clause is typically a hint . Would be nice of Spark SQL optimizer is fixed
to ignore order of clauses and optimize it automatically.
* I tried changing just the table order  in Select statement for a join and
it also seems to matter when reading data from HDFS (for parquet and to a
less extent for non parquet in my case) even when the where clause order is
same. Would be nice of SQL optimizer  optimizes it automatically.
* Table joins for huge table(s) are costly. Fact and Dimension concepts from
star schema don't translate well to Big Data (Hadoop, Spark). It may be
better to de-normalize and store huge tables to avoid Joins. Joins seem to
be evil. (Have tried de-normalizing when using Cassandra, but that has its
own problem of resulting in full table scan when running ad-hoc queries when
the keys are not known)



View this message in context:
Sent from the Apache Spark User List mailing list archive at

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message