spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ulanov, Alexander" <alexander.ula...@hp.com>
Subject RE: Force inner join to shuffle the smallest table
Date Fri, 26 Jun 2015 02:15:10 GMT
The problem is that it shuffles the wrong table which even compressed won’t fit to my disk.

Actually, I found the source of the problem, although I could not reproduce it with synthetic
data (but remains true for my original data: big table 2B rows, small 500K):

When I do join on two fields like this “select big.f1, big.f2 from small inner join big
on big.s=small.s and big.d=small.d” then the query planner does ShuffledHashJoin with the
bigger table and the query fails due to Shuffle write of the whole big table (out of space):
== Physical Plan ==
Project [bedId#28,sensorId#30L,time#31,label#32,fft#34]
ShuffledHashJoin [time#38,bedId#35], [time#31,bedId#28], BuildLeft
  Exchange (HashPartitioning [time#38,bedId#35], 2000), []
    PhysicalRDD [bedId#35,time#38], MapPartitionsRDD[75] at explain at <console>:25
  Exchange (HashPartitioning [time#31,bedId#28], 2000), []
   PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31], MapPartitionsRDD[77] at explain
at <console>:25

When I do join on one field like this “select big.f1, big.f2 from small inner join big on
big.s=small.s” then the query planner does BroadcastHashJoin which writes just what’s
needed and therefore executes without problems:

== Physical Plan ==
Project [bedId#28,sensorId#30L,time#31,label#32,fft#34]
BroadcastHashJoin [time#38], [time#31], BuildLeft
  Limit 498340
   PhysicalRDD [time#38], MapPartitionsRDD[66] at explain at <console>:25
  PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31], MapPartitionsRDD[68] at explain
at <console>:25

Could Spark SQL developers suggest why it happens?

Best regards, Alexander


From: Stephen Carman [mailto:scarman@coldlight.com]
Sent: Wednesday, June 24, 2015 12:33 PM
To: Ulanov, Alexander
Cc: CC GP; dev@spark.apache.org
Subject: Re: Force inner join to shuffle the smallest table

Have you tried shuffle compression?

spark.shuffle.compress (true|false)

if you have a filesystem capable also I’ve noticed file consolidation helps disk usage a
bit.

spark.shuffle.consolidateFiles (true|false)

Steve

On Jun 24, 2015, at 3:27 PM, Ulanov, Alexander <alexander.ulanov@hp.com<mailto:alexander.ulanov@hp.com>>
wrote:

It also fails, as I mentioned in the original question.

From: CC GP [mailto:chandrika.gopalakrishna@gmail.com]
Sent: Wednesday, June 24, 2015 12:08 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: Force inner join to shuffle the smallest table

Try below and see if it makes a difference:

val result = sqlContext.sql(“select big.f1, big.f2 from small inner join big on big.s=small.s
and big.d=small.d”)

On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander <alexander.ulanov@hp.com<mailto:alexander.ulanov@hp.com>>
wrote:
Hi,

I try to inner join of two tables on two fields(string and double). One table is 2B rows,
the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4.
val big = sqlContext.paquetFile(“hdfs://big”)
data.registerTempTable(“big”)
val small = sqlContext.paquetFile(“hdfs://small”)
data.registerTempTable(“small”)
val result = sqlContext.sql(“select big.f1, big.f2 from big inner join small on big.s=small.s
and big.d=small.d”)

This query fails in the middle due to one of the workers “disk out of space” with shuffle
reported 1.8TB which is the maximum size of my spark working dirs (on total 7 worker nodes).
This is surprising, because the “big” table takes 2TB disk space (unreplicated) and “small”
about 5GB and I would expect that optimizer will shuffle the small table. How to force Spark
to shuffle the small table? I tried to write “small inner join big” however it also fails
with 1.8TB of shuffle.

Best regards, Alexander

This e-mail is intended solely for the above-mentioned recipient and it may contain confidential
or privileged information. If you have received it in error, please notify us immediately
and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance
on it. In addition, the contents of an attachment to this e-mail may contain software viruses
which could damage your own computer system. While ColdLight Solutions, LLC has taken every
reasonable precaution to minimize this risk, we cannot accept liability for any damage which
you sustain as a result of software viruses. You should perform your own virus checks before
opening the attachment.
Mime
View raw message