spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ogoh <oke...@gmail.com>
Subject SparkSQL's performance gets degraded depending on number of partitions of Hive tables..is it normal?
Date Mon, 01 Jun 2015 19:26:16 GMT

Hello, 
I posted this question a while back but am posting it again to get your
attention.

I am using SparkSQL 1.3.1 and Hive 0.13.1 on AWS & YARN (tested under both
1.3.0 & 1.3.1). 
My hive table is partitioned.
I noticed that the query response time is bad depending on the number of
partitions though the query targets a small subset of the partitions. TRACE
level logs (ThriftServer's) showed that it runs commands like getFileInfo,
getListing, getBlockLocation for each every partitions ( also runs
getBlockLocation for each every files) though they are not part of the
queried partitions.

I don't know why it is necessary. Is it a bug of SparkSql? Is there a way to
avoid that?
Below is the detail of reporting this issue including logs.

Thanks,


----------

My Hive table as an external table is partitioned with date and hour. 
I expected that a query with certain partitions will read only the data
files of the partitions. 
I turned on TRACE level logging for ThriftServer since the query response
time even for narrowed partitions was very long. 
And I found that all the available partitions are checked during some steps. 

The logs showed as a execution flow  such as: 
== 
Step 1: Contacted HiveMetastore to get partition info  (cmd :
get_partitions) 

Step 2: Came up with an execution rule 

Step 3: Contact namenode to make at least 4 calls (data is in HDFS) for all
available partitions of the table : 
   getFileInfo once, getListing once, and the repeat them again for each
partition. 

Step 4: Contact NameNode to find blocklocation of all the partitions 

Step 5: Contact DataNode for each file of all the partitions 

Step 6:  Contact NameNode  again for all the partitions 

Step 7: SparkSQL generated some optimal plan 

Step 8: Contacted corresponding datanodes for the narrowed partitions (it
seems) 
And more..... 
=== 

Why Step3, 4, 5, and 6 should check all partitions? 
After removing partitions from the table, the query was much quicker while
processing same volume of data. 

I don't know if it is normal or Hive issue or SparkSQL issue or my
configuration issue. 
I added some logs below for some steps. 

I appreciate any of your advice. 

Thanks a lot, 
Okehee 

==== some logs of some steps 

Query: select count(*) from api_search where pdate='2015-05-23'; 
( 

Step 2: 

2015-05-25 16:37:43 TRACE HiveContext$$anon$3:67 - 

=== Applying Rule
org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates === 

!'Project [COUNT(1) AS _c0#25L]                        Aggregate [],
[COUNT(1) AS _c0#25L] 

  Filter (pdate#26 = 2015-05-23)                        Filter (pdate#26 =
2015-05-23) 

   MetastoreRelation api_hdfs_perf, api_search, None     MetastoreRelation
api_hdfs_perf, api_search, None 
.. 

Step 3: 

2015-05-25 16:37:44 TRACE ProtobufRpcEngine:206 - 84: Call ->
/10.128.193.211:9000: getFileInfo {src:
"/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00"} 

2015-05-25 16:37:44 DEBUG Client:424 - The ping interval is 60000 ms. 

2015-05-25 16:37:44 DEBUG Client:693 - Connecting to /10.128.193.211:9000 

2015-05-25 16:37:44 DEBUG Client:1007 - IPC Client (2100771791) connection
to /10.128.193.211:9000 from ogoh sending #151 

2015-05-25 16:37:44 DEBUG Client:944 - IPC Client (2100771791) connection to
/10.128.193.211:9000 from ogoh: starting, having connections 2 

2015-05-25 16:37:44 DEBUG Client:1064 - IPC Client (2100771791) connection
to /10.128.193.211:9000 from ogoh got value #151 

2015-05-25 16:37:44 DEBUG ProtobufRpcEngine:235 - Call: getFileInfo took
13ms 

2015-05-25 16:37:44 TRACE ProtobufRpcEngine:250 - 84: Response <-
/10.128.193.211:9000: getFileInfo {fs { fileType: IS_DIR path: "" length: 0
permission { perm: 493 } owner: "hadoop" group: "supergroup"
modification_time: 1432364487906 access_time: 0 block_replication: 0
blocksize: 0 fileId: 100602 childrenNum: 2 }} 

2015-05-25 16:37:44 TRACE ProtobufRpcEngine:206 - 84: Call ->
/10.128.193.211:9000: getFileInfo {src:
"/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00"} 

2015-05-25 16:37:44 DEBUG Client:1007 - IPC Client (2100771791) connection
to /10.128.193.211:9000 from ogoh sending #152 

2015-05-25 16:37:44 DEBUG Client:1064 - IPC Client (2100771791) connection
to /10.128.193.211:9000 from ogoh got value #152 

2015-05-25 16:37:44 DEBUG ProtobufRpcEngine:235 - Call: getFileInfo took
2ms. 
...... 


Step 4: 

2015-05-25 16:37:47 TRACE ProtobufRpcEngine:206 - 89: Call ->
/10.128.193.211:9000: getBlockLocations {src:
"/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00/part-r-2015050800-00001.parquet"
offset: 0 length: 1342177280} 
... 


Step 5: 

2015-05-25 16:37:48 DEBUG DFSClient:951 - Connecting to datanode
10.191.137.197:9200 

2015-05-25 16:37:48 TRACE BlockReaderFactory:653 -
BlockReaderFactory(fileName=/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00/part-r-2015050800-00002.parquet,
block=BP-1843960649-10.128.193.211-1427923845046:blk_1073758677_981812):
trying to create a remote block reader from a TCP socket 
... 

Step 6: 

2015-05-25 16:37:56 TRACE ProtobufRpcEngine:206 - 84: Call ->
/10.128.193.211:9000: getFileInfo {src:
"/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00"} 
... 


Step 7: 

=== Applying Rule
org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions === 

== Optimized Logical Plan == 

Aggregate [], [COUNT(1) AS _c0#25L] 

 Project [] 

  Filter (pdate#111 = 2015-05-23) 

  
Relation[timestamp#84,request_id#85,request_timestamp#86,response_timestamp#87,request_query_url#88,request_query_params#89,response_status#90,q#91,session_id#92,partner_id#93,partner_name#94,partner_ip#95,partner_useragent#96,search_id#97,user_id#98,client_ip#99,client_country#100,client_useragent#101,client_platform#102,search_appids#103,search_topeditionidsbyappid#104,query_categories#105,ad_log#106,ad_log_app_editions#107,ad_log_app_id#108,trace#109,trace_annotations#110,pdate#111,phour#112]
ParquetRelation2(ArrayBuffer(hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=00,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=01,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=02,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=03,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=04,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=05,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=06,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=07,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=08,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=09,
hdfs://10.128.193.211:9000/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-08/phour=10,

.... 
... 


Step8 : 

2015-05-25 16:38:06 TRACE ProtobufRpcEngine:206 - 51: Call ->
/10.128.193.211:9000: getBlockLocations {src:
"/user/datawarehouse/api_hdfs_perf/api_search/pdate=2015-05-23/phour=01/part-r-00001.parquet"
offset: 0 length: 23423005} 






--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-s-performance-gets-degraded-depending-on-number-of-partitions-of-Hive-tables-is-it-normal-tp23100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message