spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Joining a compressed ORC table with a non compressed text table
Date Wed, 29 Jun 2016 16:29:48 GMT
and also this issue

756018_0005_r_000000_0_-348705088_1, offset: 0, srvID:
4708e3eb-9b97-4fac-becd-0e77584945ad, blockid:
BP-1648199869-50.140.197.217-1462266926537:blk_1074784678_1043903,
duration: 17900868000
2016-06-29 17:27:53,089 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode: PacketResponder:
BP-1648199869-50.140.197.217-1462266926537:blk_1074784678_1043903,
type=HAS_DOWNSTREAM_IN_PIPELINE terminating
2016-06-29 17:27:53,093 INFO
org.apache.hadoop.hdfs.server.datanode.DataNode: Receiving
BP-1648199869-50.140.197.217-1462266926537:blk_1074784679_1043904 src: /
50.140.197.217:60800 dest: /50.140.197.217:50010
2016-06-29 17:28:20,644 WARN
org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write
packet to mirror took 26593ms (threshold=300ms)

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 June 2016 at 17:24, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:

>
>
> Thanks all.
>
>  As always it is good to remove systematics.
>
>  As I mentioned the big 22 million rows table sales2 was an ORC
> compressed file. I just created a text file sales2_text in Hive withwith
>
>  create table sales2_text as select * from sales2
>
>  then updated stats on sales2_text.
>
>  Now doing the classic join between two text files in Hive with
> map-reduce. Cannot be simpler
>
>  hive> select a.prod_id from sales2_text a, sales_staging b where
> a.prod_id = b.prod_id order by a.prod_id;
> WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in
> the future versions. Consider using a different execution engine (i.e.
> spark, tez) or using Hive 1.X releases.
> Query ID = hduser_20160629165751_fa6587c7-d36a-46f1-bc5c-3fff7b6d7e2b
> Total jobs = 2
> Stage-1 is selected by condition resolver.
> Launching Job 1 out of 2
> Number of reduce tasks not specified. Estimated from input data size: 4
> In order to change the average load for a reducer (in bytes):
>   set hive.exec.reducers.bytes.per.reducer=<number>
> In order to limit the maximum number of reducers:
>   set hive.exec.reducers.max=<number>
> In order to set a constant number of reducers:
>   set mapreduce.job.reduces=<number>
> Starting Job = job_1467142756018_0005, Tracking URL =
> http://rhes564:8088/proxy/application_1467142756018_0005/
> Kill Command = /home/hduser/hadoop-2.6.0/bin/hadoop job  -kill
> job_1467142756018_0005
> Hadoop job information for Stage-1: number of mappers: 5; number of
> reducers: 4
> 2016-06-29 16:58:13,986 Stage-1 map = 0%,  reduce = 0%
> 2016-06-29 16:58:26,408 Stage-1 map = 7%,  reduce = 0%, Cumulative CPU
> 12.87 sec
> 2016-06-29 16:58:32,573 Stage-1 map = 13%,  reduce = 0%, Cumulative CPU
> 20.09 sec
> 2016-06-29 16:58:35,652 Stage-1 map = 15%,  reduce = 0%, Cumulative CPU
> 23.27 sec
> 2016-06-29 16:58:41,807 Stage-1 map = 17%,  reduce = 0%, Cumulative CPU
> 29.62 sec
> 2016-06-29 16:58:47,968 Stage-1 map = 18%,  reduce = 0%, Cumulative CPU
> 35.69 sec
> 2016-06-29 16:58:51,042 Stage-1 map = 20%,  reduce = 0%, Cumulative CPU
> 38.72 sec
> 2016-06-29 16:59:05,435 Stage-1 map = 27%,  reduce = 0%, Cumulative CPU
> 53.33 sec
> 2016-06-29 16:59:11,586 Stage-1 map = 33%,  reduce = 0%, Cumulative CPU
> 60.08 sec
> 2016-06-29 16:59:14,656 Stage-1 map = 35%,  reduce = 0%, Cumulative CPU
> 63.37 sec
> 2016-06-29 16:59:17,726 Stage-1 map = 37%,  reduce = 0%, Cumulative CPU
> 66.44 sec
> 2016-06-29 16:59:23,866 Stage-1 map = 38%,  reduce = 0%, Cumulative CPU
> 72.65 sec
> 2016-06-29 16:59:26,934 Stage-1 map = 40%,  reduce = 0%, Cumulative CPU
> 75.68 sec
> 2016-06-29 16:59:42,346 Stage-1 map = 47%,  reduce = 0%, Cumulative CPU
> 91.77 sec
> 2016-06-29 16:59:48,497 Stage-1 map = 53%,  reduce = 0%, Cumulative CPU
> 97.93 sec
> 2016-06-29 16:59:51,566 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
> 101.13 sec
> 2016-06-29 16:59:57,702 Stage-1 map = 57%,  reduce = 0%, Cumulative CPU
> 107.45 sec
> 2016-06-29 17:00:00,768 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
> 110.49 sec
> 2016-06-29 17:00:03,840 Stage-1 map = 60%,  reduce = 0%, Cumulative CPU
> 113.53 sec
> 2016-06-29 17:00:16,134 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU
> 125.29 sec
> 2016-06-29 17:00:26,408 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
> 133.42 sec
> 2016-06-29 17:00:34,595 Stage-1 map = 100%,  reduce = 12%, Cumulative CPU
> 141.06 sec
> 2016-06-29 17:00:37,661 Stage-1 map = 100%,  reduce = 14%, Cumulative CPU
> 144.12 sec
> 2016-06-29 17:00:40,732 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 147.28 sec
> 2016-06-29 17:01:41,688 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 164.3 sec
> 2016-06-29 17:02:42,621 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 179.17 sec
> 2016-06-29 17:03:43,552 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 188.02 sec
> 2016-06-29 17:04:44,384 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 221.39 sec
> 2016-06-29 17:05:45,285 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 248.88 sec
> 2016-06-29 17:06:46,191 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 257.72 sec
> 2016-06-29 17:07:47,080 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 263.46 sec
> 2016-06-29 17:08:47,969 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 271.19 sec
> 2016-06-29 17:09:48,851 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 272.13 sec
> 2016-06-29 17:10:49,705 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU
> 272.68 sec
>
>
>
> Now it is stuck in reduce state!
>
>
>
>  Now when I look at logsNow[image: Inline images 1]
>
>
>  Now when I look at logs I see slow read processors that indicates IO or
> network problem that I need to sort out in the Hadoop cluster.
>
> [image: Inline images 2]
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 June 2016 at 14:40, Michael Segel <msegel_hadoop@hotmail.com> wrote:
>
>> Hi,
>>
>> I’m not sure I understand your initial question…
>>
>> Depending on the compression algo, you may or may not be able to split
>> the file.
>> So if its not splittable, you have a single long running thread.
>>
>> My guess is that you end up with a very long single partition.
>> If so, if you repartition, you may end up seeing better performance in
>> the join.
>>
>> I see that you’re using a hive context.
>>
>> Have you tried to manually do this using just data frames and compare the
>> DAG to the SQL DAG?
>>
>> HTH
>>
>> -Mike
>>
>> On Jun 29, 2016, at 9:14 AM, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> wrote:
>>
>> Hi all,
>>
>> It finished in 2 hours 18 minutes!
>>
>> Started at
>> [29/06/2016 10:25:27.27]
>> [148]
>> [148]
>> [148]
>> [148]
>> [148]
>> Finished at
>> [29/06/2016 12:43:33.33]
>>
>> I need to dig in more.
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 29 June 2016 at 10:42, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> wrote:
>>
>>> Focusing on Spark job, as I mentioned before Spark is running in local
>>> mode with 8GB of memory for both the driver and executor memory.
>>>
>>> However, I still see this enormous Duration time which indicates
>>> something is wrong badly!
>>>
>>> Also I got rid of groupBy
>>>
>>>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>>>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>>>   val rs =
>>> s2.join(s,"prod_id").sort(desc("prod_id")).take(5).foreach(println)
>>>
>>>
>>> <image.png>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 29 June 2016 at 10:18, Jörn Franke <jornfranke@gmail.com> wrote:
>>>
>>>>
>>>> I think the TEZ engine is much more maintained with respect to
>>>> optimizations related to Orc , hive , vectorizing, querying than the mr
>>>> engine. It will be definitely better to use it.
>>>> Mr is also deprecated in hive 2.0.
>>>> For me it does not make sense to use mr with hive larger than 1.1.
>>>>
>>>> As I said, order by might be inefficient to use (not sure if this has
>>>> changed). You may want to use sort by.
>>>>
>>>> That being said there are many optimizations methods.
>>>>
>>>> On 29 Jun 2016, at 00:27, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>>>> wrote:
>>>>
>>>> That is a good point.
>>>>
>>>> The ORC table property is as follows
>>>>
>>>> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>>>> "orc.stripe.size"="268435456",
>>>> "orc.row.index.stride"="10000")
>>>>
>>>> which puts each stripe at 256MB
>>>>
>>>> Just to clarify this is spark running on Hive tables. I don't think the
>>>> use of TEZ, MR or Spark as execution engines is going to make any
>>>> difference?
>>>>
>>>> This is the same query with Hive on MR
>>>>
>>>> select a.prod_id from sales2 a, sales_staging b where a.prod_id =
>>>> b.prod_id order by a.prod_id;
>>>>
>>>> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
>>>> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
>>>> 7.32 sec
>>>> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
>>>> 18.21 sec
>>>> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
>>>> 22.34 sec
>>>> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU
>>>> 30.33 sec
>>>> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU
>>>> 33.45 sec
>>>> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU
>>>> 37.5 sec
>>>> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU
>>>> 42.0 sec
>>>> 2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU
>>>> 45.62 sec
>>>> 2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU
>>>> 49.69 sec
>>>> 2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU
>>>> 52.92 sec
>>>> 2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU
>>>> 56.78 sec
>>>> 2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU
>>>> 60.36 sec
>>>> 2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU
>>>> 63.68 sec
>>>> 2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU
>>>> 66.92 sec
>>>> 2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative
>>>> CPU 70.18 sec
>>>> 2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative
>>>> CPU 127.99 sec
>>>> 2016-06-28 23:25:57,494 Stage-1 map = 100%,  reduce = 67%, Cumulative
>>>> CPU 134.64 sec
>>>> 2016-06-28 23:26:57,847 Stage-1 map = 100%,  reduce = 67%, Cumulative
>>>> CPU 141.01 sec
>>>>
>>>> which basically sits at 67% all day
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>> On 28 June 2016 at 23:07, Jörn Franke <jornfranke@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> Bzip2 is splittable for text files.
>>>>>
>>>>> Btw in Orc the question of splittable does not matter because each
>>>>> stripe is compressed individually.
>>>>>
>>>>> Have you tried tez? As far as I recall (at least it was in the first
>>>>> version of Hive) mr uses for order by a single reducer which is a
>>>>> bottleneck.
>>>>>
>>>>> Do you see some errors in the log file?
>>>>>
>>>>> On 28 Jun 2016, at 23:53, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> I have a simple join between table sales2 a compressed (snappy)
>>>>> ORC with 22 million rows and another simple table sales_staging under
a
>>>>> million rows stored as a text file with no compression.
>>>>>
>>>>> The join is very simple
>>>>>
>>>>>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>>>>>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>>>>>
>>>>>   val rs =
>>>>> s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)
>>>>>
>>>>>
>>>>> Now what is happening is it is sitting on SortMergeJoin operation
>>>>> on ZippedPartitionRDD as shown in the DAG diagram below
>>>>>
>>>>>
>>>>> <image.png>
>>>>>
>>>>>
>>>>> And at this rate  only 10% is done and will take for ever to finish :(
>>>>>
>>>>> Stage 3:==>                                                     (10
+
>>>>> 2) / 200]
>>>>>
>>>>> Ok I understand that zipped files cannot be broken into blocks and
>>>>> operations on them cannot be parallelized.
>>>>>
>>>>> Having said that what are the alternatives? Never use compression and
>>>>> live with it. I emphasise that any operation on the compressed table
itself
>>>>> is pretty fast as it is a simple table scan. However, a join between
two
>>>>> tables on a column as above suggests seems to be problematic?
>>>>>
>>>>> Thanks
>>>>>
>>>>> P.S. the same is happening using Hive with MR
>>>>>
>>>>> select a.prod_id from sales2 a inner join sales_staging b on a.prod_id
>>>>> = b.prod_id order by a.prod_id;
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Mime
View raw message