spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Venkat Subramanian <vsubr...@gmail.com>
Subject Re: Spark SQL table Join, one task is taking long
Date Thu, 04 Dec 2014 20:57:34 GMT
Hi Cheng,

Thank you very much for taking your time and providing a detailed
explanation.
I tried a few things you suggested and some more things.

The ContactDetail table (8 GB) is the fact table and DAgents is the Dim
table (<500 KB), reverse of what you are assuming, but your ideas still
apply.

I tried the following:

a) Cached the smaller Dim table to memory.
 sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "10000000")
 sqlContext.cacheTable("DAgents")

UI -> Stage -> Storage shows it to be cached in RDD when I run it.

val CDJoinQry= sqlContext.sql("SELECT  * FROM ContactDetail, DAgents  WHERE
ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902")

CDJoinQry.map(ta => ta(4)).count

I see no difference in terms of performance. It takes the same amount of
time for the query ~1.2 min

b)  I reversed both the order of tables and where clause in the query 

val CDJoinQry= sqlContext.sql("SELECT  * FROM DAgents, ContactDetail  WHERE
DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6")

The performance went  bad. It took 6-7 min to complete.

Just changing the order of table in Select for this join, keeping the same
where clause order, perf was similar (1.2-1.4 min).

c)  Using query in a), I tried to keep the storage in columnar fashion with 
sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")

I see no difference in terms of performance. It takes the same amount of
time for the query ~1.2 min.
Not sure if it even works.

d) I tried changing the comma separated HDFS files to Parquet format in HDFS
and reading it as parquet and then running query on it.

DAgents.saveAsParquetFile("DAgents.parquet")
FCDRDD.saveAsParquetFile("ContactDetail.parquet")


val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet")
DAgentsParquetRDD.registerAsTable("DAgentsParquet")

val FContactDetailParquetRDD =
sqlContext.parquetFile("ContactDetail.parquet")
FContactDetailParquetRDD.registerAsTable("ContactDetailParquet")

val CDJoinQryParquet= sqlContext.sql("SELECT  * FROM ContactDetailParquet,
DAgentsParquet  WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and
DAgentsParquet.f1 = 902")
CDJoinQryParquet.map(ta => ta(4)).count

*The query time is actually more for this join query.* It ended up taking
3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse
than non parquet for this join.

I changed the query where table order and where clause was reversed and ran
it for parquet

val CDJoinQryParquetReversed= sqlContext.sql("SELECT  * FROM DAgentsParquet,
ContactDetailParquet  WHERE   DAgentsParquet.f1 = 902 and
DAgentsParquet.f1=ContactDetailParquet.f6 ")
CDJoinQryParquetReversed.map(ta => ta(4)).count

it took > 18 min and had to kill it as it kept on running.

*But queries where there is no join, Parquet's performance was extremely
good.*
For example, this query below where there is no join, ran in 8 seconds,
whereas the same query in non parquet  took 30 seconds.
val CDJoinQryParquet0= sqlContext.sql("SELECT  * FROM ContactDetailParquet
WHERE ContactDetailParquet.f6 = 902")
CDJoinQryParquet0.map(ta => ta(4)).count

*Some potential conclusions (pl. comment) :*
* Order in where clause seems to matter in Spark SQL optimizer. In
relational DBs  that I have worked with, when I noticed, order of where
clause is typically a hint . Would be nice of Spark SQL optimizer is fixed
to ignore order of clauses and optimize it automatically.
* I tried changing just the table order  in Select statement for a join and
it also seems to matter when reading data from HDFS (for parquet and to a
less extent for non parquet in my case) even when the where clause order is
same. Would be nice of SQL optimizer  optimizes it automatically.
* Table joins for huge table(s) are costly. Fact and Dimension concepts from
star schema don't translate well to Big Data (Hadoop, Spark). It may be
better to de-normalize and store huge tables to avoid Joins. Joins seem to
be evil. (Have tried de-normalizing when using Cassandra, but that has its
own problem of resulting in full table scan when running ad-hoc queries when
the keys are not known)

Regards,

Venkat



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message