spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: Orc predicate pushdown with Spark Sql
Date Wed, 25 Oct 2017 05:37:46 GMT
Well the meta information is in the file so I am not surprised that it reads the file, but
it should not read all the content, which is probably also not happening. 

> On 24. Oct 2017, at 18:16, Siva Gudavalli <gudavalli.siva@yahoo.com.INVALID> wrote:
> 
> 
> Hello,
>  
> I have an update here. 
>  
> spark SQL is pushing predicates down, if I load the orc files in spark Context and Is
not the same when I try to read hive Table directly.
> please let me know if i am missing something here.
>  
> Is this supported in spark  ? 
>  
> when I load the files in spark Context 
> scala> val hhhhhlogsv5 = sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5")
> 17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5
on driver
> 17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003
on driver
> 17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others
on driver
> 17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers
on driver
> hhhhhlogsv5: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br:
string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1:
string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string,
usrpartkey: string]
> scala> hhhhhlogsv5.registerTempTable("tempo")
> scala> sqlContext.sql ( "selecT id from tempo where cdt=20171003 and usrpartkey =
'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
> 17/10/24 16:11:22 INFO ParseDriver: Parsing command: selecT id from tempo where cdt=20171003
and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
> 17/10/24 16:11:22 INFO ParseDriver: Parse Completed
> 17/10/24 16:11:22 INFO DataSourceStrategy: Selected 1 partitions out of 1, pruned 0.0%
partitions.
> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated
size 164.5 KB, free 468.0 KB)
> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory
(estimated size 18.3 KB, free 486.4 KB)
> 17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.21.158.61:43493
(size: 18.3 KB, free: 511.4 MB)
> 17/10/24 16:11:22 INFO SparkContext: Created broadcast 6 from explain at <console>:33
> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated
size 170.2 KB, free 656.6 KB)
> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory
(estimated size 18.8 KB, free 675.4 KB)
> 17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 172.21.158.61:43493
(size: 18.8 KB, free: 511.4 MB)
> 17/10/24 16:11:22 INFO SparkContext: Created broadcast 7 from explain at <console>:33
> == Physical Plan ==
> TakeOrderedAndProject(limit=10, orderBy=[id#145 DESC], output=[id#145])
> +- ConvertToSafe
> +- Project [id#145]
> +- Filter (usr#152 = AA0YP)
> +- Scan OrcRelation[id#145,usr#152] InputPaths: maprfs:///user/hive/warehouse/hhhhhlogsv5,
PushedFilters: [EqualTo(usr,AA0YP)]
>  
> when i read this as hive Table 
>  
> scala> sqlContext.sql ( "selecT id from hhhhhlogsv5 where cdt=20171003 and usrpartkey
= 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
> 17/10/24 16:11:32 INFO ParseDriver: Parsing command: selecT id from hhhhhlogsv5 where
cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
> 17/10/24 16:11:32 INFO ParseDriver: Parse Completed
> 17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated
size 399.1 KB, free 1074.6 KB)
> 17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory
(estimated size 42.7 KB, free 1117.2 KB)
> 17/10/24 16:11:32 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on 172.21.158.61:43493
(size: 42.7 KB, free: 511.4 MB)
> 17/10/24 16:11:32 INFO SparkContext: Created broadcast 8 from explain at <console>:33
> == Physical Plan ==
> TakeOrderedAndProject(limit=10, orderBy=[id#192 DESC], output=[id#192])
> +- ConvertToSafe
> +- Project [id#192]
> +- Filter (usr#199 = AA0YP)
> +- HiveTableScan [id#192,usr#199], MetastoreRelation default, hhhhhlogsv5, None, [(cdt#189
= 20171003),(usrpartkey#191 = hhhUsers)]
>  
>  
> please let me know if i am missing anything here. thank you
> 
> 
> On Monday, October 23, 2017 1:56 PM, Siva Gudavalli <gss.subbu@gmail.com> wrote:
> 
> 
> Hello,
>  
> I am working with Spark SQL to query Hive Managed Table (in Orc Format)
>  
> I have my data organized by partitions and asked to set indexes for each 50,000 Rows
by setting ('orc.row.index.stride'='50000') 
>  
> lets say -> after evaluating partition there are around 50 files in which data is
organized.
>  
> Each file contains data specific to one given "cat" and I have set up a bloom filter
on cat.
>  
> my spark SQL query looks like this ->
>  
> select * from logs where cdt= 20171002 and catpartkey= others and usrpartkey= logUsers
and cat = 24;
>  
> I have set following property in my spark Sql context and assuming this will push down
the filters 
> sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
>  
> Never my filters are being pushed down. and it seems like partition pruning is happening
on all files. I dont understand no matter what my query is, it is triggering 50 tasks and
reading all files. 
>  
> Here is my debug logs -> 
>  
> 17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0,
size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
> 17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate
> 17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null
> 17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
with {include: [true, true, false, false, false, false, true, false, false, false, false,
false, false, false, false, false, false, false], offset: 0, length: 9223372036854775807}
> 17/10/23 17:26:43 DEBUG MapRClient: Open: path = /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
> 17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0,
size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
> 17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: 15790993,
range start: 21131541 end: 21146035]
> 17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, 15790993), size:
15723309 type: array-backed, data range [21131541, 21146035), size: 14494 type: array-backed]
> 17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not initiated, use
thread based class loader instead
> 17/10/23 17:26:43 DEBUG HadoopTableReader: org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5>
> 17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, IntegerType]
= 27)':
>  
> and here is my execution plan 
> == Parsed Logical Plan ==
> 'Limit 1000
> +- 'Sort ['id DESC], true
> +- 'Project [unresolvedalias('id)]
> +- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey
= logUsers)) && ('cat = 27))
> +- 'UnresolvedRelation `auditlogsv5`, None
> == Analyzed Logical Plan ==
> id: string
> Limit 1000
> +- Sort [id#165 DESC], true
> +- Project [id#165]
> +- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164
= logUsers)) && (cat#170 = 27))
> +- MetastoreRelation default, auditlogsv5, None
> == Optimized Logical Plan ==
> Limit 1000
> +- Sort [id#165 DESC], true
> +- Project [id#165]
> +- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164
= logUsers)) && (cat#170 = 27))
> +- MetastoreRelation default, auditlogsv5, None
> == Physical Plan ==
> TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165])
> +- ConvertToSafe
> +- Project [id#165]
> +- Filter (cat#170 = 27)
> +- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, [(cdt#162 =
20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)]
>  
>  
> Am I missing something here. I am on spark 1.6.1 and hive 1.2.0
>  
> please correct me. Thank you
> 
> 

Mime
View raw message