spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Segel <msegel_had...@hotmail.com>
Subject Re: Joining a compressed ORC table with a non compressed text table
Date Wed, 29 Jun 2016 13:40:36 GMT
Hi, 

I’m not sure I understand your initial question… 

Depending on the compression algo, you may or may not be able to split the file. 
So if its not splittable, you have a single long running thread. 

My guess is that you end up with a very long single partition. 
If so, if you repartition, you may end up seeing better performance in the join. 

I see that you’re using a hive context. 

Have you tried to manually do this using just data frames and compare the DAG to the SQL DAG?


HTH

-Mike

> On Jun 29, 2016, at 9:14 AM, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:
> 
> Hi all,
> 
> It finished in 2 hours 18 minutes!
> 
> Started at
> [29/06/2016 10:25:27.27]
> [148]
> [148]
> [148]
> [148]
> [148]
> Finished at
> [29/06/2016 12:43:33.33]
> 
> I need to dig in more. 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage
or destruction of data or any other property which may arise from relying on this email's
technical content is explicitly disclaimed. The author will in no case be liable for any monetary
damages arising from such loss, damage or destruction.
>  
> 
> On 29 June 2016 at 10:42, Mich Talebzadeh <mich.talebzadeh@gmail.com <mailto:mich.talebzadeh@gmail.com>>
wrote:
> Focusing on Spark job, as I mentioned before Spark is running in local mode with 8GB
of memory for both the driver and executor memory.
> 
> However, I still see this enormous Duration time which indicates something is wrong badly!
> 
> Also I got rid of groupBy
> 
>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>   val rs = s2.join(s,"prod_id").sort(desc("prod_id")).take(5).foreach(println)
> 
> 
> <image.png>
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage
or destruction of data or any other property which may arise from relying on this email's
technical content is explicitly disclaimed. The author will in no case be liable for any monetary
damages arising from such loss, damage or destruction.
>  
> 
> On 29 June 2016 at 10:18, Jörn Franke <jornfranke@gmail.com <mailto:jornfranke@gmail.com>>
wrote:
> 
> I think the TEZ engine is much more maintained with respect to optimizations related
to Orc , hive , vectorizing, querying than the mr engine. It will be definitely better to
use it.
> Mr is also deprecated in hive 2.0.
> For me it does not make sense to use mr with hive larger than 1.1.
> 
> As I said, order by might be inefficient to use (not sure if this has changed). You may
want to use sort by.
> 
> That being said there are many optimizations methods.
> 
> On 29 Jun 2016, at 00:27, Mich Talebzadeh <mich.talebzadeh@gmail.com <mailto:mich.talebzadeh@gmail.com>>
wrote:
> 
>> That is a good point.
>> 
>> The ORC table property is as follows
>> 
>> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>> "orc.stripe.size"="268435456",
>> "orc.row.index.stride"="10000")
>> 
>> which puts each stripe at 256MB
>> 
>> Just to clarify this is spark running on Hive tables. I don't think the use of TEZ,
MR or Spark as execution engines is going to make any difference?
>> 
>> This is the same query with Hive on MR
>> 
>> select a.prod_id from sales2 a, sales_staging b where a.prod_id = b.prod_id order
by a.prod_id;
>> 
>> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
>> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 7.32 sec
>> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU 18.21 sec
>> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU 22.34 sec
>> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU 30.33 sec
>> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU 33.45 sec
>> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU 37.5 sec
>> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU 42.0 sec
>> 2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU 45.62 sec
>> 2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU 49.69 sec
>> 2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU 52.92 sec
>> 2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU 56.78 sec
>> 2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU 60.36 sec
>> 2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU 63.68 sec
>> 2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU 66.92 sec
>> 2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 70.18 sec
>> 2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 127.99 sec
>> 2016-06-28 23:25:57,494 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 134.64
sec
>> 2016-06-28 23:26:57,847 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 141.01
sec
>> 
>> which basically sits at 67% all day
>> 
>> 
>> 
>> 
>> 
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>> 
>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage
or destruction of data or any other property which may arise from relying on this email's
technical content is explicitly disclaimed. The author will in no case be liable for any monetary
damages arising from such loss, damage or destruction.
>>  
>> 
>> On 28 June 2016 at 23:07, Jörn Franke <jornfranke@gmail.com <mailto:jornfranke@gmail.com>>
wrote:
>> 
>> 
>> Bzip2 is splittable for text files.
>> 
>> Btw in Orc the question of splittable does not matter because each stripe is compressed
individually.
>> 
>> Have you tried tez? As far as I recall (at least it was in the first version of Hive)
mr uses for order by a single reducer which is a bottleneck.
>> 
>> Do you see some errors in the log file?
>> 
>> On 28 Jun 2016, at 23:53, Mich Talebzadeh <mich.talebzadeh@gmail.com <mailto:mich.talebzadeh@gmail.com>>
wrote:
>> 
>>> Hi,
>>> 
>>> 
>>> I have a simple join between table sales2 a compressed (snappy) ORC with 22 million
rows and another simple table sales_staging under a million rows stored as a text file with
no compression.
>>> 
>>> The join is very simple
>>> 
>>>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>>>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>>> 
>>>   val rs = s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)
>>> 
>>> 
>>> Now what is happening is it is sitting on SortMergeJoin operation on ZippedPartitionRDD
as shown in the DAG diagram below
>>> 
>>> 
>>> <image.png>
>>> 
>>> 
>>> And at this rate  only 10% is done and will take for ever to finish :(
>>> 
>>> Stage 3:==>                                                     (10 + 2) /
200]
>>> 
>>> Ok I understand that zipped files cannot be broken into blocks and operations
on them cannot be parallelized.
>>> 
>>> Having said that what are the alternatives? Never use compression and live with
it. I emphasise that any operation on the compressed table itself is pretty fast as it is
a simple table scan. However, a join between two tables on a column as above suggests seems
to be problematic?
>>> 
>>> Thanks
>>> 
>>> P.S. the same is happening using Hive with MR
>>> 
>>> select a.prod_id from sales2 a inner join sales_staging b on a.prod_id = b.prod_id
order by a.prod_id;
>>> 
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>>  
>>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>> 
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss,
damage or destruction of data or any other property which may arise from relying on this email's
technical content is explicitly disclaimed. The author will in no case be liable for any monetary
damages arising from such loss, damage or destruction.
>>>  
>> 
> 
> 


Mime
View raw message