spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From java8964 <java8...@hotmail.com>
Subject RE: Problem understanding spark word count execution
Date Fri, 02 Oct 2015 13:22:30 GMT
No problem.
>From the mapper side, Spark is very similar as the MapReduce; but on the reducer fetching
side, MR uses sort merge vs Spark uses HashMap.
So keep this in mind that you can get data automatically sorted on the reducer side on MR,
but not in Spark.
Spark's performance comes:Cache ability and smart arranging the tasks into stages. Intermediate
data between stages never stored in HDFS, but in local disk. In MR jobs, from one MR job to
another one, the intermediate data stored in HDFS.Spark uses threads to run tasks, instead
of heavy process as MR.
Without caching, in my experience, Spark can get about 2x to 5x better than MR job, depending
on the jog logic. If the data volume is small, Spark will be even better, as the processor
is way more expensive than the thread in this case.
I didn't see your Spark script, so my guess is that you are using "rdd.collect()", which will
transfer the final result to driver and dump it in the console.
Yong
Date: Fri, 2 Oct 2015 00:50:24 -0700
Subject: Re: Problem understanding spark word count execution
From: kartik@bluedata.com
To: java8964@hotmail.com
CC: nicolae.marasoiu@adswizz.com; user@spark.apache.org

Thanks Yong , 
That was a good explanation I was looking for , however I have one doubt , you write - "Image
that you have 2 mappers to read the data, then each mapper will generate the (word, count)
tuple output in segments. Spark always output that in local file. (In fact, one file with
different segments to represent different partitions) "  if this is true then spark is very
similar to Hadoop MapReduce (Disk IO bw phases) , with so many IOs after each stage how does
spark achieves the performance that it does as compared to map reduce . Another doubt is 
"The 2000 bytes sent to driver is the final output aggregated on the reducers end, and merged
back to the driver." , which part of our word count code takes care of this part ? And yes
there are only 273 distinct words in the text so that's not a surprise.
Thanks again,
Hope to get a reply.
--Kartik
On Thu, Oct 1, 2015 at 5:49 PM, java8964 <java8964@hotmail.com> wrote:



I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the word (ReduceBy
is more accurate here). Image that you have 2 mappers to read the data, then each mapper will
generate the (word, count) tuple output in segments. Spark always output that in local file.
(In fact, one file with different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only contains (word, count
of word) tuples. After each mapper generates this segmented file for different partitions,
then the reduce will fetch the partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 HDFS block,
or 2x256M. There are 2 tasks concurrent read these 2 partitions, about 2.5M lines of data
of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of the segment
file generated, corresponding to all the unique words and their count of this partition. So
the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its meaning. Does
it mean that this partition have 273 unique words from these 2.5M lines of data? That is kind
of low, but I really don't have other explaining of its meaning.
If you finally output shows hundreds of unique words, then it is.
The 2000 bytes sent to driver is the final output aggregated on the reducers end, and merged
back to the driver.
Yong

Date: Thu, 1 Oct 2015 13:33:59 -0700
Subject: Re: Problem understanding spark word count execution
From: kartik@bluedata.com
To: nicolae.marasoiu@adswizz.com
CC: user@spark.apache.org

Hi Nicolae,Thanks for the reply. To further clarify things -
sc.textFile is reading from HDFS, now shouldn't the file be read in a way such that EACH executer
works on only the local copy of file part available , in this case its a ~ 4.64 GB file and
block size is 256MB, so approx 19 partitions will be created and each task will run on  1
partition (which is what I am seeing in the stages logs) , also i assume it will read the
file in a way that each executer will have exactly same amount of data. so there shouldn't
be any shuffling in reading atleast.
During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is the output
I am seeing
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeInput Size /
RecordsWrite TimeShuffle Write Size / RecordsErrors0440SUCCESSNODE_LOCAL1 / 10.35.244.102015/09/29
13:57:2414 s0.2 s256.0 MB (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 10.35.244.112015/09/29
13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 273I have following questions -
1) What exactly is 2.7KB of shuffle write  ?2) is this 2.7 KB of shuffle write is local to
that executer ?3) In the executers log I am seeing 2000 bytes results sent to the driver ,
if instead this number is much much greater than 2000 byes such that it does not fit in executer's
memory , will shuffle write increase ?4)For word count 256 MB data is substantial amount text
, how come the result for this stage is only 2000 bytes !! it should send everyword with respective
count , for a 256 MB input this result should be much bigger ? 
I hope I am clear this time.
Hope to get a reply,
ThanksKartik


On Thu, Oct 1, 2015 at 12:38 PM, Nicolae Marasoiu <nicolae.marasoiu@adswizz.com> wrote:







Hi,




So you say " sc.textFile
-> flatMap -> Map".



My understanding is like this:
First step is a number of partitions are determined, p of them. You can give hint on this.
Then the nodes which will load partitions p, that is n nodes (where n<=p).



Relatively at the same time or not, the n nodes start opening different sections of the file
- the physical equivalent of the
partitions: for instance in HDFS they would do an open and a seek I guess and just read from
the stream there, convert to whatever the InputFormat dictates.


The shuffle can only be the part when a node opens an HDFS file for instance but the node
does not have a local replica of the blocks which it needs to read (those pertaining to his
assigned partitions). So he needs to pick them up from remote
nodes which do have replicas of that data.



After blocks are read into memory, flatMap and Map are local computations generating new RDDs
and in the end the result is sent to the driver (whatever termination computation does on
the RDD like the result of reduce, or side effects of rdd.foreach, etc).



Maybe you can share more of your context if still unclear.
I just made assumptions to give clarity on a similar thing.



Nicu



From: Kartik Mathur <kartik@bluedata.com>

Sent: Thursday, October 1, 2015 10:25 PM

To: Nicolae Marasoiu

Cc: user

Subject: Re: Problem understanding spark word count execution
 


Thanks Nicolae , 
So In my case all executers are sending results back to the driver and and "shuffle
is just sending out the textFile to distribute the partitions", could you please elaborate
on this  ? what exactly is in this file ?



On Wed, Sep 30, 2015 at 9:57 PM, Nicolae Marasoiu 
<nicolae.marasoiu@adswizz.com> wrote:








Hi,



2- the end results are sent back to the driver; the shuffles are transmission of intermediate
results between nodes such as the -> which are all intermediate transformations.



More precisely, since flatMap and map are narrow dependencies, meaning they can usually happen
on the local node, I bet shuffle is just sending out the textFile to a few nodes to distribute
the partitions.







From: Kartik Mathur <kartik@bluedata.com>

Sent: Thursday, October 1, 2015 12:42 AM

To: user

Subject: Problem understanding spark word count execution
 




Hi All,



I tried running spark word count and I have couple of questions - 



I am analyzing stage 0 , i.e 
 sc.textFile -> flatMap -> Map (Word count example)



1) In the Stage logs under Application UI details for every task I am seeing Shuffle write
as 2.7 KB,
question - how can I know where all did this task write ? like how many bytes to which executer
?



2) In the executer's log when I look for same task it says 2000 bytes of result is sent to
driver , my question is ,
if the results were directly sent to driver what is this shuffle write ? 



Thanks,
Kartik




















 		 	   		  

 		 	   		  
Mime
View raw message