Hi Steve,

I love you mate, thanks a ton once again for ACTUALLY RESPONDING.

I am now going through the documentation (https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md) and it makes much much more sense now.

Regards,
Gourav Sengupta

On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <stevel@hortonworks.com> wrote:

On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengupta@gmail.com> wrote:

Hi Steve,

I have written a sincere note of apology to everyone in a separate email. I sincerely request your kind forgiveness before hand if anything does sound impolite in my emails, in advance. 

Let me first start by thanking you. 

I know it looks like I formed all my opinion based on that document, but that is not the case at all. If you or anyone tries to execute the code that I have given then they will see what I mean. Code speaks louder and better than words for me.

So I am not saying you are wrong. I am asking verify and expecting someone will be able to correct  a set of understanding that a moron like me has gained after long hours of not having anything better to do.


SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with replication 2 and there is a HADOOP cluster of three nodes. All these nodes have SPARK workers (executors) running in them.  Both are stored in the following way:
-----------------------------------------------------
| SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
| (worker1)   |  (worker2)    |  (worker3)   |
| (master)     |                     |                    |
-----------------------------------------------------
| file1.csv      |                     | file1.csv     |
-----------------------------------------------------
|                    |  file2.csv      | file2.csv     |
-----------------------------------------------------
| file3.csv      |  file3.csv      |                   |
-----------------------------------------------------





CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
HDFS replication does not store the same file in all the nodes in the cluster. So if I have three nodes and the replication is two then the same file will be stored physically in two nodes in the cluster. Does that sound right?


HDFS breaks files up into blocks (default = 128MB). If a .csv file is > 128 then it will be broken up into blocks

file1.cvs -> [block0001, block002, block0003]

and each block will be replicated. With replication = 2 there will be two copies of each block, but the file itself can span > 2 hosts.


ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
If SPARK is trying to process to the records then I am expecting that WORKER2 should not be processing file1.csv, and similary WORKER 1 should not be processing file2.csv and WORKER3 should not be processing file3.csv. Because in case WORKER2 was trying to process file1.csv then it will actually causing network transmission of the file unnecessarily.


Spark prefers to schedule work locally, so as to save on network traffic, but it schedules for execution time over waiting for workers free on the node with the data. IF a block is on nodes 2 and 3 but there is only a free thread on node 1, then node 1 gets the work

There's details on whether/how work across blocks takes place which I'm avoiding. For now know those formats which are "splittable" will have work scheduled by block. If you use Parquet/ORC/avro for your data and compress with snappy, it will be split. This gives you maximum performance as >1 thread can work on different blocks. That is, if file1 is split into three blocks, three worker threads can process it.


ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY THIS):
if WORKER 2 is not processing file1.csv then how does it matter whether the file is there or not at all in the system? Should not SPARK just ask the workers to process the files which are avialable in the worker nodes? In case both WORKER2 and WORKER3 fails and are not available then file2.csv will not be processed at all. 


locality is best-effort, not guaranteed.


ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE EXECUTED (Its been pointed out that I am learning SPARK, and even I did not take more than 13 mins to set up the cluster and run the code).

Once you execute the code then you will find that:
1.  if the path starts with file:/// while reading back then there is no error reported, but the number of records reported back are only those records in the worker which also has the server. 
2. also you will notice that once you cache the file before writing the partitions are ditributed nicely across the workers, and while writing back, the dataframe partitions does write properly to the worker node in the Master, but the workers in the other system have the files written in _temporary folder which does not get copied back to the main folder. Inspite of this the job is not reported as failed in SPARK.

This gets into the "commit protocol". You don't want to know all the dirty details (*) but essentially its this

1. Every worker writes its output to a directory under the destination directory, something like '$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID'
2. it is the spark driver which "commits" the job by moving the output from the individual workers from the temporary directories into $dest, then deleting $dest/_temporary
3. For which it needs to be able to list all the output in $dest/_temporary

In your case, only the output on the same node of the driver is being committed, because only those files can be listed and moved. The output on the other nodes isn't seen, so isn't committed, nor cleaned up.



Now in my own world, if I see, the following things are happening, something is going wrong (with me):
1. SPARK transfers files from different systems to process, instead of processing them locally (I do not have code to prove this, and therefore its just an assumption)
2. SPARK cannot determine when the writes are failing in standalone clusters workers and reports success (code is there for this)
3. SPARK reports back number of records in the worker running in the master node when count() is given without reporting an error while using file:/// and reports an error when I mention the path without file:/// (for SPARK 2.1.x onwards, code is there for this)


s everyone's been saying, file:// requires a shared filestore, with uniform paths everywhere. That's needed to list the files to process, read the files in the workers and commit the final output. NFS cross-mounting is the simplest way to do this, especially as for three nodes HDFS is overkill: more services to keep running, no real fault tolerance. Export a directory tree from one of the servers, give the rest access to it, don't worry about bandwidth use as the shared disk itself will become the bottleneck



I very sincerely hope with your genuine help the bar of language and social skills will be lowered for me. And everyone will find a way to excuse me and not qualify this email as a means to measure my extremely versatile and amazingly vivid social skills. It will be a lot of help to just focus on the facts related to machines, data, error and (the language that I somehow understand better) code.


My sincere apologies once again, as I am 100% sure that I did not meet the required social and language skills. 

Thanks a ton once again for your kindness, patience and understanding.


Regards,
Gourav Sengupta


* for the curious, the details of the v1 and v2 commit protocols are
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md 

Like I said: you don't want to know the details, and you really don't want to step through Hadoop's FileOutputCommitter to see what's going on. The Spark side is much easier to follow.