spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gourav Sengupta <>
Subject Re: SPARK Issue in Standalone cluster
Date Thu, 03 Aug 2017 09:30:41 GMT
Hi Steve,

I love you mate, thanks a ton once again for ACTUALLY RESPONDING.

I am now going through the documentation (
and it makes much much more sense now.

Gourav Sengupta

On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <>

> On 2 Aug 2017, at 20:05, Gourav Sengupta <>
> wrote:
> Hi Steve,
> I have written a sincere note of apology to everyone in a separate email.
> I sincerely request your kind forgiveness before hand if anything does
> sound impolite in my emails, in advance.
> Let me first start by thanking you.
> I know it looks like I formed all my opinion based on that document, but
> that is not the case at all. If you or anyone tries to execute the code
> that I have given then they will see what I mean. Code speaks louder and
> better than words for me.
> So I am not saying you are wrong. I am asking verify and expecting someone
> will be able to correct  a set of understanding that a moron like me has
> gained after long hours of not having anything better to do.
> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
> replication 2 and there is a HADOOP cluster of three nodes. All these nodes
> have SPARK workers (executors) running in them.  Both are stored in the
> following way:
> -----------------------------------------------------
> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
> | (worker1)   |  (worker2)    |  (worker3)   |
> | (master)     |                     |                    |
> -----------------------------------------------------
> | file1.csv      |                     | file1.csv     |
> -----------------------------------------------------
> |                    |  file2.csv      | file2.csv     |
> -----------------------------------------------------
> | file3.csv      |  file3.csv      |                   |
> -----------------------------------------------------
> HDFS replication does not store the same file in all the nodes in the
> cluster. So if I have three nodes and the replication is two then the same
> file will be stored physically in two nodes in the cluster. Does that sound
> right?
> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >
> 128 then it will be broken up into blocks
> file1.cvs -> [block0001, block002, block0003]
> and each block will be replicated. With replication = 2 there will be two
> copies of each block, but the file itself can span > 2 hosts.
> If SPARK is trying to process to the records then I am expecting that
> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
> not be processing file2.csv and WORKER3 should not be processing file3.csv.
> Because in case WORKER2 was trying to process file1.csv then it will
> actually causing network transmission of the file unnecessarily.
> Spark prefers to schedule work locally, so as to save on network traffic,
> but it schedules for execution time over waiting for workers free on the
> node with the data. IF a block is on nodes 2 and 3 but there is only a free
> thread on node 1, then node 1 gets the work
> There's details on whether/how work across blocks takes place which I'm
> avoiding. For now know those formats which are "splittable" will have work
> scheduled by block. If you use Parquet/ORC/avro for your data and compress
> with snappy, it will be split. This gives you maximum performance as >1
> thread can work on different blocks. That is, if file1 is split into three
> blocks, three worker threads can process it.
> THIS):
> if WORKER 2 is not processing file1.csv then how does it matter whether
> the file is there or not at all in the system? Should not SPARK just ask
> the workers to process the files which are avialable in the worker nodes?
> In case both WORKER2 and WORKER3 fails and are not available then file2.csv
> will not be processed at all.
> locality is best-effort, not guaranteed.
> EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
> take more than 13 mins to set up the cluster and run the code).
> Once you execute the code then you will find that:
> 1.  if the path starts with file:/// while reading back then there is no
> error reported, but the number of records reported back are only those
> records in the worker which also has the server.
> 2. also you will notice that once you cache the file before writing the
> partitions are ditributed nicely across the workers, and while writing
> back, the dataframe partitions does write properly to the worker node in
> the Master, but the workers in the other system have the files written in
> _temporary folder which does not get copied back to the main folder.
> Inspite of this the job is not reported as failed in SPARK.
> This gets into the "commit protocol". You don't want to know all the dirty
> details (*) but essentially its this
> 1. Every worker writes its output to a directory under the destination
> directory, something like '$dest/_temporary/$appAttemptId/_temporary/$
> taskAttemptID'
> 2. it is the spark driver which "commits" the job by moving the output
> from the individual workers from the temporary directories into $dest, then
> deleting $dest/_temporary
> 3. For which it needs to be able to list all the output in $dest/_temporary
> In your case, only the output on the same node of the driver is being
> committed, because only those files can be listed and moved. The output on
> the other nodes isn't seen, so isn't committed, nor cleaned up.
> Now in my own world, if I see, the following things are happening,
> something is going wrong (with me):
> 1. SPARK transfers files from different systems to process, instead of
> processing them locally (I do not have code to prove this, and therefore
> its just an assumption)
> 2. SPARK cannot determine when the writes are failing in standalone
> clusters workers and reports success (code is there for this)
> 3. SPARK reports back number of records in the worker running in the
> master node when count() is given without reporting an error while using
> file:/// and reports an error when I mention the path without file:///
> (for SPARK 2.1.x onwards, code is there for this)
> s everyone's been saying, file:// requires a shared filestore, with
> uniform paths everywhere. That's needed to list the files to process, read
> the files in the workers and commit the final output. NFS cross-mounting is
> the simplest way to do this, especially as for three nodes HDFS is
> overkill: more services to keep running, no real fault tolerance. Export a
> directory tree from one of the servers, give the rest access to it, don't
> worry about bandwidth use as the shared disk itself will become the
> bottleneck
> I very sincerely hope with your genuine help the bar of language and
> social skills will be lowered for me. And everyone will find a way to
> excuse me and not qualify this email as a means to measure my extremely
> versatile and amazingly vivid social skills. It will be a lot of help to
> just focus on the facts related to machines, data, error and (the language
> that I somehow understand better) code.
> My sincere apologies once again, as I am 100% sure that I did not meet the
> required social and language skills.
> Thanks a ton once again for your kindness, patience and understanding.
> Regards,
> Gourav Sengupta
> * for the curious, the details of the v1 and v2 commit protocols are
> committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/
> hadoop-aws/
> Like I said: you don't want to know the details, and you really don't want
> to step through Hadoop's FileOutputCommitter to see what's going on. The
> Spark side is much easier to follow.

View raw message