spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cheng Lian <lian.cs....@gmail.com>
Subject Re: Spark SQL table Join, one task is taking long
Date Wed, 03 Dec 2014 08:18:55 GMT
Hey Venkat,

This behavior seems reasonable. According to the table name, I guess 
here |DAgents| should be the fact table and |ContactDetails| is the dim 
table. Below is an explanation of a similar query, you may see |src| as 
|DAgents| and |src1| as |ContactDetails|.

|0: jdbc:hive2://localhost:10000> explain extended select * from src, src1 where src.key
= src1.key and src.key = 100;
+------------------------------------------------------------------------------------+
|                                        plan                                        |
+------------------------------------------------------------------------------------+
| == Parsed Logical Plan ==                                                          |
| 'Project [*]                                                                       |
|  'Filter (('src.key = 'src1.key) && ('src.key = 100))                          
   |
|   'Join Inner, None                                                                |
|    'UnresolvedRelation None, src, None                                             |
|    'UnresolvedRelation None, src1, None                                            |
|                                                                                    |
| == Analyzed Logical Plan ==                                                        |
| Project [key#81,value#82,key#83,value#84]                                          |
|  Filter ((key#81 = key#83) && (key#81 = 100))                                  
   |
|   Join Inner, None                                                                 |
|    MetastoreRelation default, src, None                                            |
|    MetastoreRelation default, src1, None                                           |
|                                                                                    |
| == Optimized Logical Plan ==                                                       |
| Project [key#81,value#82,key#83,value#84]                                          |
|  Join Inner, Some((key#81 = key#83))                                               |
|   Filter (key#81 = 100)                                                            |
|    MetastoreRelation default, src, None                                            |
|   MetastoreRelation default, src1, None                                            |
|                                                                                    |
| == Physical Plan ==                                                                |
| Project [key#81,value#82,key#83,value#84]                                          |
|  ShuffledHashJoin [key#81], [key#83], BuildRight                                   |
|   Exchange (HashPartitioning [key#81], 200)                                        |
|    Filter (key#81 = 100)                                                           |
|     HiveTableScan [key#81,value#82], (MetastoreRelation default, src, None), None  |
|   Exchange (HashPartitioning [key#83], 200)                                        |
|    HiveTableScan [key#83,value#84], (MetastoreRelation default, src1, None), None  |
|                                                                                    |
| Code Generation: false                                                             |
| == RDD ==                                                                          |
+------------------------------------------------------------------------------------+
|

Please notice the |Filter| node in the physical plan. In your case, all 
the filtered rows are shuffled into a single partition because 
|DAgents.f1| is both the predicate key and the shuffle key, and that 
partition is handled by the task that lasts for more than 1 second. All 
other tasks in the count stage cost only a few ms because they don’t 
receive any rows from |DAgents|.

If |ContactDetails| is small enought, you can cache |ContactDetails| 
first and set |spark.sql.autoBroadcastJoinShreshold| larger than the 
size of |ContactDetails|, a broadcast join rather than a would be 
performed, and would usually result better performance.

Cheng

On 12/2/14 6:35 AM, Venkat Subramanian wrote:

> Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB
> Ram each. Default serialization, Standalone, no security
>
> Data was sqooped from relational DB to HDFS and Data is partitioned across
> HDFS uniformly. I am reading a  fact table about 8 GB in size and one small
> dim table from HDFS and then doing a join on them based on a criteria. .
> Running the Driver on Spark shell on Spark master.
>
> ContactDetail and DAgents are read as RDD and registered as table already.
> Each of these tables have 60 to 90 fields and I am using Product class.
>
> val CDJoinQry= sqlContext.sql("SELECT  * FROM ContactDetail, DAgents  WHERE
> ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902")
>
> CDJoinQry.map(ta => ta(4)).count   // result is a small number
>
> This works fine and returns the result fine. Hadoop mapPartition reads and
> creation of RDDs are all fine But in the Count stage, I see that one of
> task (out of 200 ) does a huge amount of Shuffle Write (some 1 GB or more)
> and takes about 1.1 seconds to complete out of the 1.2 seconds of total
> execution time. This task is usually around in the 3/4 th (say 160/200) of
> the total tasks. At the time of that task running, one of the CPU in one
> worker node goes to 100% for the duration of the task. Rest of the tasks
> take few ms and does only < 5 MBs of Shuffle write.  I have run it
> repeatedly and this happens regardless of which worker node this particular
> task is running on. I turned on Spark debug on all nodes to understand, but
> it was difficult to figure out where the delay is from the logs. There are
> no errors or re-trys in the logs.
>
> Not sure if I can post logs here for someone to look at, if so I can (about
> 10 Mb). Also, not sure if this normal in such a table join that one task
> would take most amount of time. Let me know if you have any suggestions.
>
> Regards,
>
> Venkat
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
​

Mime
View raw message