This is what I am getting in the container log for mr

2016-06-28 23:25:53,808 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file: FS hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_task_tmp.-mr-10004/_tmp.000000_0
2016-06-28 23:25:53,808 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: New Final Path: FS hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_tmp.-mr-10004/000000_0
2016-06-28 23:25:53,836 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 1
2016-06-28 23:25:53,837 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 10
2016-06-28 23:25:53,837 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 100
2016-06-28 23:25:53,844 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 1000
2016-06-28 23:25:53,875 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 10000
2016-06-28 23:25:53,954 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 100000
2016-06-28 23:25:55,072 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 1000000
2016-06-28 23:26:56,236 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 10000000
2016-06-28 23:27:58,499 WARN [ResponseProcessor for block BP-1648199869-50.140.197.217-1462266926537:blk_1074784072_1043287] org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fields took 35556ms (threshold=30000ms); ack: seqno: 6815 status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 35566795000, targets: [50.140.197.217:50010, 50.140.197.216:50010]
2016-06-28 23:31:38,437 INFO [main] org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 100000000
2016-06-28 23:35:27,631 WARN [ResponseProcessor for block BP-1648199869-50.140.197.217-1462266926537:blk_1074784086_1043301] org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fields took 31118ms (threshold=30000ms); ack: seqno: 36303 status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 31128701000, targets: [50.140.197.217:50010, 50.140.197.216:50010]




Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


On 28 June 2016 at 23:27, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:
That is a good point.

The ORC table property is as follows

TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="10000")

which puts each stripe at 256MB

Just to clarify this is spark running on Hive tables. I don't think the use of TEZ, MR or Spark as execution engines is going to make any difference?

This is the same query with Hive on MR

select a.prod_id from sales2 a, sales_staging b where a.prod_id = b.prod_id order by a.prod_id;

2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 7.32 sec
2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU 18.21 sec
2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU 22.34 sec
2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU 30.33 sec
2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU 33.45 sec
2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU 37.5 sec
2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU 42.0 sec
2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU 45.62 sec
2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU 49.69 sec
2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU 52.92 sec
2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU 56.78 sec
2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU 60.36 sec
2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU 63.68 sec
2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU 66.92 sec
2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 70.18 sec
2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 127.99 sec
2016-06-28 23:25:57,494 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 134.64 sec
2016-06-28 23:26:57,847 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU 141.01 sec

which basically sits at 67% all day





Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


On 28 June 2016 at 23:07, Jörn Franke <jornfranke@gmail.com> wrote:


Bzip2 is splittable for text files.

Btw in Orc the question of splittable does not matter because each stripe is compressed individually.

Have you tried tez? As far as I recall (at least it was in the first version of Hive) mr uses for order by a single reducer which is a bottleneck.

Do you see some errors in the log file?

On 28 Jun 2016, at 23:53, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:

Hi,


I have a simple join between table sales2 a compressed (snappy) ORC with 22 million rows and another simple table sales_staging under a million rows stored as a text file with no compression.

The join is very simple

  val s2 = HiveContext.table("sales2").select("PROD_ID")
  val s = HiveContext.table("sales_staging").select("PROD_ID")

  val rs = s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)


Now what is happening is it is sitting on SortMergeJoin operation on ZippedPartitionRDD as shown in the DAG diagram below


<image.png>


And at this rate  only 10% is done and will take for ever to finish :(

Stage 3:==>                                                     (10 + 2) / 200]

Ok I understand that zipped files cannot be broken into blocks and operations on them cannot be parallelized.

Having said that what are the alternatives? Never use compression and live with it. I emphasise that any operation on the compressed table itself is pretty fast as it is a simple table scan. However, a join between two tables on a column as above suggests seems to be problematic?

Thanks

P.S. the same is happening using Hive with MR

select a.prod_id from sales2 a inner join sales_staging b on a.prod_id = b.prod_id order by a.prod_id;

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.