spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marco Mistroni <mmistr...@gmail.com>
Subject Re: SPARK Issue in Standalone cluster
Date Sat, 05 Aug 2017 20:09:27 GMT
Uh believe me there are lots of ppl on this list who will send u code
snippets if u ask... 😀

Yes that is what Steve pointed out, suggesting also that for that simple
exercise you should perform all operations on a spark standalone instead
(or alt. Use an nfs on the cluster)
I'd agree with his suggestion....
I suggest u another alternative:
https://community.cloud.databricks.com/

That's a ready made cluster and you can run your spark app as well store
data on the cluster (well I haven't tried myself but I assume it's
possible).   Try that out... I will try ur script there as I have an
account there (though I guess I'll get there before me.....)

Try that out and let me know if u get stuck....
Kr

On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengupta@gmail.com> wrote:

> Hi Marco,
>
> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
> someone actually executing code and providing response. It feel wonderful
> that at least someone considered to respond back by executing code and just
> did not filter out each and every technical details to brood only on my
> superb social skills, while claiming the reason for ignoring technical
> details is that it elementary. I think that Steve also is the first person
> who could answer the WHY of an elementary question instead of saying that
> is how it is and pointed out to the correct documentation.
>
> That code works fantastically. But the problem which I have tried to find
> out is while writing out the data and not reading it.
>
>
> So if you see try to read the data from the same folder which has the same
> file across all the nodes then it will work fine. In fact that is what
> should work.
>
> What does not work is that if you try to write back the file and then read
> it once again from the location you have written that is when the issue
> starts happening.
>
> Therefore if in my code you were to save the pandas dataframe as a CSV
> file and then read it then you will find the following observations:
>
> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/
> sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
>
>
> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
> THE DATA DOES NOT EXISTS
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("file:///Users/gouravsengupta/
> Development/spark/sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
>
>
> if you execute my code then also you will surprisingly see that the writes
> in the nodes which is not the master node does not complete moving the
> files from the _temporary folder to the main one.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistroni@gmail.com>
> wrote:
>
>> Hello
>>  please have a look at this. it'sa simple script that just read a
>> dataframe for n time, sleeping at random interval. i used it to test memory
>> issues that another user was experiencing on a spark cluster
>>
>> you should run it like this e.g
>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv> <num
>> of iterations>
>>
>> i ran it on the cluster like this
>>
>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
>> file:///root/pyscripts/tree_addhealth.csv
>>
>> hth, ping me back if you have issues
>> i do agree with Steve's comments.... if you want to test your  spark
>> script s just for playing, do it on  a standaone server on your localhost.
>> Moving to a c luster is just a matter of deploying your script and mke sure
>> you have a common place where to read and store the data..... SysAdmin
>> should give you this when they setup the cluster...
>>
>> kr
>>
>>
>>
>>
>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>> I am sincerely obliged for your kind time and response. Can you please
>>> try the solution that you have so kindly suggested?
>>>
>>> It will be a lot of help if you could kindly execute the code that I
>>> have given. I dont think that anyone has yet.
>>>
>>> There are lots of fine responses to my question here, but if you read
>>> the last response from Simon, it comes the closest to being satisfactory. I
>>> am sure even he did not execute the code, but at least he came quite close
>>> to understanding what the problem is.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistroni@gmail.com>
>>> wrote:
>>>
>>>> Hello
>>>>  my 2 cents here, hope it helps
>>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>>> it's an unnecessary dependency that you dont need for just running a python
>>>> script
>>>> Instead do the following:
>>>> - got to the root of our master / slave node. create a directory
>>>> /root/pyscripts
>>>> - place your csv file there as well as the python script
>>>> - run the script to replicate the whole directory  across the cluster
>>>> (i believe it's called copy-script.sh)
>>>> - then run your spark-submit , it will be something lke
>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>> - in your python script, as part of your processing, write the parquet
>>>> file in directory /root/pyscripts
>>>>
>>>> If you have an AWS account and you are versatile with that - you need
>>>> to setup bucket permissions etc - , you can just
>>>> - store your file in one of your S3 bucket
>>>> - create an EMR cluster
>>>> - connect to master or slave
>>>> - run your  scritp that reads from the s3 bucket and write to the same
>>>> s3 bucket
>>>>
>>>>
>>>> Feel free to mail me privately, i have a working script i have used to
>>>> test some code on spark standalone cluster
>>>> hth
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>> gourav.sengupta@gmail.com> wrote:
>>>>
>>>>> Hi Steve,
>>>>>
>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>
>>>>> I am now going through the documentation (
>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much much
>>>>> more sense now.
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>> stevel@hortonworks.com> wrote:
>>>>>
>>>>>>
>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengupta@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Steve,
>>>>>>
>>>>>> I have written a sincere note of apology to everyone in a separate
>>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>>> does sound impolite in my emails, in advance.
>>>>>>
>>>>>> Let me first start by thanking you.
>>>>>>
>>>>>> I know it looks like I formed all my opinion based on that document,
>>>>>> but that is not the case at all. If you or anyone tries to execute
the code
>>>>>> that I have given then they will see what I mean. Code speaks louder
and
>>>>>> better than words for me.
>>>>>>
>>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>>> someone will be able to correct  a set of understanding that a moron
like
>>>>>> me has gained after long hours of not having anything better to do.
>>>>>>
>>>>>>
>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS
>>>>>> with replication 2 and there is a HADOOP cluster of three nodes.
All these
>>>>>> nodes have SPARK workers (executors) running in them.  Both are stored
in
>>>>>> the following way:
>>>>>> -----------------------------------------------------
>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>> | (master)     |                     |                    |
>>>>>> -----------------------------------------------------
>>>>>> | file1.csv      |                     | file1.csv     |
>>>>>> -----------------------------------------------------
>>>>>> |                    |  file2.csv      | file2.csv     |
>>>>>> -----------------------------------------------------
>>>>>> | file3.csv      |  file3.csv      |                   |
>>>>>> -----------------------------------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>>> HDFS replication does not store the same file in all the nodes in
the
>>>>>> cluster. So if I have three nodes and the replication is two then
the same
>>>>>> file will be stored physically in two nodes in the cluster. Does
that sound
>>>>>> right?
>>>>>>
>>>>>>
>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file
is
>>>>>> > 128 then it will be broken up into blocks
>>>>>>
>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>
>>>>>> and each block will be replicated. With replication = 2 there will
be
>>>>>> two copies of each block, but the file itself can span > 2 hosts.
>>>>>>
>>>>>>
>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>> If SPARK is trying to process to the records then I am expecting
that
>>>>>> WORKER2 should not be processing file1.csv, and similary WORKER 1
should
>>>>>> not be processing file2.csv and WORKER3 should not be processing
file3.csv.
>>>>>> Because in case WORKER2 was trying to process file1.csv then it will
>>>>>> actually causing network transmission of the file unnecessarily.
>>>>>>
>>>>>>
>>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>>> traffic, but it schedules for execution time over waiting for workers
free
>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there
is only
>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>
>>>>>> There's details on whether/how work across blocks takes place which
>>>>>> I'm avoiding. For now know those formats which are "splittable" will
have
>>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data
and
>>>>>> compress with snappy, it will be split. This gives you maximum performance
>>>>>> as >1 thread can work on different blocks. That is, if file1 is
split into
>>>>>> three blocks, three worker threads can process it.
>>>>>>
>>>>>>
>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE
>>>>>> CLARIFY THIS):
>>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>>> whether the file is there or not at all in the system? Should not
SPARK
>>>>>> just ask the workers to process the files which are avialable in
the worker
>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available
then
>>>>>> file2.csv will not be processed at all.
>>>>>>
>>>>>>
>>>>>> locality is best-effort, not guaranteed.
>>>>>>
>>>>>>
>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD
>>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even
I did
>>>>>> not take more than 13 mins to set up the cluster and run the code).
>>>>>>
>>>>>> Once you execute the code then you will find that:
>>>>>> 1.  if the path starts with file:/// while reading back then there
>>>>>> is no error reported, but the number of records reported back are
only
>>>>>> those records in the worker which also has the server.
>>>>>> 2. also you will notice that once you cache the file before writing
>>>>>> the partitions are ditributed nicely across the workers, and while
writing
>>>>>> back, the dataframe partitions does write properly to the worker
node in
>>>>>> the Master, but the workers in the other system have the files written
in
>>>>>> _temporary folder which does not get copied back to the main folder.
>>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>>
>>>>>>
>>>>>> This gets into the "commit protocol". You don't want to know all
the
>>>>>> dirty details (*) but essentially its this
>>>>>>
>>>>>> 1. Every worker writes its output to a directory under the
>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>> 2. it is the spark driver which "commits" the job by moving the
>>>>>> output from the individual workers from the temporary directories
into
>>>>>> $dest, then deleting $dest/_temporary
>>>>>> 3. For which it needs to be able to list all the output in
>>>>>> $dest/_temporary
>>>>>>
>>>>>> In your case, only the output on the same node of the driver is being
>>>>>> committed, because only those files can be listed and moved. The
output on
>>>>>> the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Now in my own world, if I see, the following things are happening,
>>>>>> something is going wrong (with me):
>>>>>> 1. SPARK transfers files from different systems to process, instead
>>>>>> of processing them locally (I do not have code to prove this, and
therefore
>>>>>> its just an assumption)
>>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>>> clusters workers and reports success (code is there for this)
>>>>>> 3. SPARK reports back number of records in the worker running in
the
>>>>>> master node when count() is given without reporting an error while
using
>>>>>> file:/// and reports an error when I mention the path without
>>>>>> file:/// (for SPARK 2.1.x onwards, code is there for this)
>>>>>>
>>>>>>
>>>>>>
>>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>>> uniform paths everywhere. That's needed to list the files to process,
read
>>>>>> the files in the workers and commit the final output. NFS cross-mounting
is
>>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>>> overkill: more services to keep running, no real fault tolerance.
Export a
>>>>>> directory tree from one of the servers, give the rest access to it,
don't
>>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>>> bottleneck
>>>>>>
>>>>>>
>>>>>>
>>>>>> I very sincerely hope with your genuine help the bar of language
and
>>>>>> social skills will be lowered for me. And everyone will find a way
to
>>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>>> versatile and amazingly vivid social skills. It will be a lot of
help to
>>>>>> just focus on the facts related to machines, data, error and (the
language
>>>>>> that I somehow understand better) code.
>>>>>>
>>>>>>
>>>>>> My sincere apologies once again, as I am 100% sure that I did not
>>>>>> meet the required social and language skills.
>>>>>>
>>>>>> Thanks a ton once again for your kindness, patience and understanding.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>>
>>>>>> * for the curious, the details of the v1 and v2 commit protocols
are
>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>
>>>>>> Like I said: you don't want to know the details, and you really don't
>>>>>> want to step through Hadoop's FileOutputCommitter to see what's going
on.
>>>>>> The Spark side is much easier to follow.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message