spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Siva Gudavalli <gudavalli.s...@yahoo.com.INVALID>
Subject Re: Orc predicate pushdown with Spark Sql
Date Fri, 27 Oct 2017 10:55:27 GMT

I found a workaround, when I create Hive Table using Spark “saveAsTable”, I see filters
being pushed down.

-> other approaches I tried where filters are not pushed down Is, 

1) when I create Hive Table upfront and load orc into it using Spark SQL
2) when I create orc files using spark SQL and then create Hive External Table

If my understanding is correct, when I use saveAsTable spark is using & also registering
Hive Metastore with its custom Serde and Is able to pushdown filters. 
Please correct me.

Another question, 

When i am writing Orc to hive using “saveAsTable”, is there any way I can provide details
about Orc Files.
for instance: stripe.size, can i create bloom filters etc… 


Regards
Shiv



> On Oct 25, 2017, at 1:37 AM, Jörn Franke <jornfranke@gmail.com> wrote:
> 
> Well the meta information is in the file so I am not surprised that it reads the file,
but it should not read all the content, which is probably also not happening. 
> 
> On 24. Oct 2017, at 18:16, Siva Gudavalli <gudavalli.siva@yahoo.com.INVALID <mailto:gudavalli.siva@yahoo.com.INVALID>>
wrote:
> 
>> 
>> Hello,
>>  
>> I have an update here. 
>>  
>> spark SQL is pushing predicates down, if I load the orc files in spark Context and
Is not the same when I try to read hive Table directly.
>> please let me know if i am missing something here.
>>  
>> Is this supported in spark  ? 
>>  
>> when I load the files in spark Context 
>> scala> val hhhhhlogsv5 = sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5")
>> 17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5
on driver
>> 17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003
on driver
>> 17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others
on driver
>> 17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers
on driver
>> hhhhhlogsv5: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string,
br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int,
c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string,
usrpartkey: string]
>> scala> hhhhhlogsv5.registerTempTable("tempo")
>> scala> sqlContext.sql ( "selecT id from tempo where cdt=20171003 and usrpartkey
= 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
>> 17/10/24 16:11:22 INFO ParseDriver: Parsing command: selecT id from tempo where cdt=20171003
and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
>> 17/10/24 16:11:22 INFO ParseDriver: Parse Completed
>> 17/10/24 16:11:22 INFO DataSourceStrategy: Selected 1 partitions out of 1, pruned
0.0% partitions.
>> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6 stored as values in memory
(estimated size 164.5 KB, free 468.0 KB)
>> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory
(estimated size 18.3 KB, free 486.4 KB)
>> 17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.21.158.61:43493
(size: 18.3 KB, free: 511.4 MB)
>> 17/10/24 16:11:22 INFO SparkContext: Created broadcast 6 from explain at <console>:33
>> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7 stored as values in memory
(estimated size 170.2 KB, free 656.6 KB)
>> 17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory
(estimated size 18.8 KB, free 675.4 KB)
>> 17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 172.21.158.61:43493
(size: 18.8 KB, free: 511.4 MB)
>> 17/10/24 16:11:22 INFO SparkContext: Created broadcast 7 from explain at <console>:33
>> == Physical Plan ==
>> TakeOrderedAndProject(limit=10, orderBy=[id#145 DESC], output=[id#145])
>> +- ConvertToSafe
>> +- Project [id#145]
>> +- Filter (usr#152 = AA0YP)
>> +- Scan OrcRelation[id#145,usr#152] InputPaths: maprfs:///user/hive/warehouse/hhhhhlogsv5,
PushedFilters: [EqualTo(usr,AA0YP)]
>>  
>> when i read this as hive Table 
>>  
>> scala> sqlContext.sql ( "selecT id from hhhhhlogsv5 where cdt=20171003 and usrpartkey
= 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
>> 17/10/24 16:11:32 INFO ParseDriver: Parsing command: selecT id from hhhhhlogsv5 where
cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
>> 17/10/24 16:11:32 INFO ParseDriver: Parse Completed
>> 17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8 stored as values in memory
(estimated size 399.1 KB, free 1074.6 KB)
>> 17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory
(estimated size 42.7 KB, free 1117.2 KB)
>> 17/10/24 16:11:32 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on 172.21.158.61:43493
(size: 42.7 KB, free: 511.4 MB)
>> 17/10/24 16:11:32 INFO SparkContext: Created broadcast 8 from explain at <console>:33
>> == Physical Plan ==
>> TakeOrderedAndProject(limit=10, orderBy=[id#192 DESC], output=[id#192])
>> +- ConvertToSafe
>> +- Project [id#192]
>> +- Filter (usr#199 = AA0YP)
>> +- HiveTableScan [id#192,usr#199], MetastoreRelation default, hhhhhlogsv5, None,
[(cdt#189 = 20171003),(usrpartkey#191 = hhhUsers)]
>>  
>>  
>> please let me know if i am missing anything here. thank you
>> 
>> 
>> On Monday, October 23, 2017 1:56 PM, Siva Gudavalli <gss.subbu@gmail.com <mailto:gss.subbu@gmail.com>>
wrote:
>> 
>> 
>> Hello,
>>  
>> I am working with Spark SQL to query Hive Managed Table (in Orc Format)
>>  
>> I have my data organized by partitions and asked to set indexes for each 50,000 Rows
by setting ('orc.row.index.stride'='50000') 
>>  
>> lets say -> after evaluating partition there are around 50 files in which data
is organized.
>>  
>> Each file contains data specific to one given "cat" and I have set up a bloom filter
on cat.
>>  
>> my spark SQL query looks like this ->
>>  
>> select * from logs where cdt= 20171002 and catpartkey= others and usrpartkey= logUsers
and cat = 24;
>>  
>> I have set following property in my spark Sql context and assuming this will push
down the filters 
>> sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
>>  
>> Never my filters are being pushed down. and it seems like partition pruning is happening
on all files. I dont understand no matter what my query is, it is triggering 50 tasks and
reading all files. 
>>  
>> Here is my debug logs -> 
>>  
>> 17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0,
size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
>> 17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate
>> 17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null
>> 17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
with {include: [true, true, false, false, false, false, true, false, false, false, false,
false, false, false, false, false, false, false], offset: 0, length: 9223372036854775807}
>> 17/10/23 17:26:43 DEBUG MapRClient: Open: path = /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
>> 17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0,
size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
>> 17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: 15790993,
range start: 21131541 end: 21146035]
>> 17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, 15790993),
size: 15723309 type: array-backed, data range [21131541, 21146035), size: 14494 type: array-backed]
>> 17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not initiated,
use thread based class loader instead
>> 17/10/23 17:26:43 DEBUG HadoopTableReader: org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive..serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5>
>> 17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, IntegerType]
= 27)':
>>  
>> and here is my execution plan 
>> == Parsed Logical Plan ==
>> 'Limit 1000
>> +- 'Sort ['id DESC], true
>> +- 'Project [unresolvedalias('id)]
>> +- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey
= logUsers)) && ('cat = 27))
>> +- 'UnresolvedRelation `auditlogsv5`, None
>> == Analyzed Logical Plan ==
>> id: string
>> Limit 1000
>> +- Sort [id#165 DESC], true
>> +- Project [id#165]
>> +- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) &&
(usrpartkey#164 = logUsers)) && (cat#170 = 27))
>> +- MetastoreRelation default, auditlogsv5, None
>> == Optimized Logical Plan ==
>> Limit 1000
>> +- Sort [id#165 DESC], true
>> +- Project [id#165]
>> +- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) &&
(usrpartkey#164 = logUsers)) && (cat#170 = 27))
>> +- MetastoreRelation default, auditlogsv5, None
>> == Physical Plan ==
>> TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165])
>> +- ConvertToSafe
>> +- Project [id#165]
>> +- Filter (cat#170 = 27)
>> +- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, [(cdt#162
= 20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)]
>>  
>>  
>> Am I missing something here. I am on spark 1.6.1 and hive 1.2.0
>>  
>> please correct me. Thank you
>> 
>> 


Mime
View raw message