Right, you concern is that you expect storeindex in ORC file to help the optimizer.

Frankly I do not know what write().mode(SaveMode.Overwrite).orc("orcFileToRead" does actually under the bonnet. From my experience in order for ORC index to be used you need to bucket the table. I have explained these before in here

Now it is possible that you have not updated statistics on the table

Even with Spark I tend to create my ORC table explicitly through Spark SQL.

You stated the join scans all the underlying ORC table. Your "id" column I assume is unique. So I would bucket it using id column.


HTH





On 20 June 2016 at 07:07, Mohanraj Ragupathiraj <mohanaugust@gmail.com> wrote:
Hi Mich,

Thank you for your reply. 

Let me explain more clearly.

File with 100 records needs to joined with a Big lookup File created in ORC format (500 million records). The Spark process i wrote is returing back the matching records and is working fine. My concern is that it loads the entire file (500 million) and matches with the 100 records instead of loading only the stripes with matching keys. I read that ORC file provides indexes (https://orc.apache.org/docs/indexes.html) and i assumned that when i join using Dataframes, the indexes will be used, resulting in loading of only matching records/stripes for processing instead of the whole table.

On Mon, Jun 20, 2016 at 1:00 PM, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:
Hi,

To start when you store the data in ORC file can you verify that the data is there?

For example register it as tempTable

processDF.register("tmp")
sql("select count(1) from tmp).show

Also what do you mean by index file in ORC?

HTH







On 20 June 2016 at 05:01, Mohanraj Ragupathiraj <mohanaugust@gmail.com> wrote:

I am trying to join a Dataframe(say 100 records) with an ORC file with 500 million records through Spark(can increase to 4-5 billion, 25 bytes each record).

I used Spark hiveContext API.

ORC File Creation Code

//fsdtRdd is JavaRDD, fsdtSchema is StructType schema
DataFrame fsdtDf = hiveContext.createDataFrame(fsdtRdd,fsdtSchema);
fsdtDf.write().mode(SaveMode.Overwrite).orc("orcFileToRead");

ORC File Reading Code

HiveContext hiveContext = new HiveContext(sparkContext);
DataFrame orcFileData= hiveContext.read().orc("orcFileToRead");
// allRecords is dataframe
DataFrame processDf = allRecords.join(orcFileData,allRecords.col("id").equalTo(orcFileData.col("id").as("ID")),"left_outer_join");
processDf.show();

When I read the ORC file, the get following in my Spark Logs:

Input split: file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc:0+3163348
min key = null, max key = null
Reading ORC rows from file:/C:/AOD_PID/PVP.vincir_frst_seen_tran_dt_ORC/part-r-00024-b708c946-0d49-4073-9cd1-5cc46bd5972b.orc with {include: [true, true, true], offset: 0, length: 9223372036854775807}
Finished task 55.0 in stage 2.0 (TID 59). 2455 bytes result sent to driver
Starting task 56.0 in stage 2.0 (TID 60, localhost, partition 56,PROCESS_LOCAL, 2220 bytes)
Finished task 55.0 in stage 2.0 (TID 59) in 5846 ms on localhost (56/84)
Running task 56.0 in stage 2.0 (TID 60)

Although the Spark job completes successfully, I think, its not able to utilize ORC index file capability and thus checks through entire block of ORC data before moving on.

Question

-- Is it a normal behaviour, or I have to set any configuration before saving the data in ORC format?

-- If it is NORMAL, what is the best way to join so that we discrad non-matching records on the disk level(maybe only the index file for ORC data is loaded)?





--
Thanks and Regards
Mohan
VISA Pte Limited, Singapore.