spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sea aj <saj3...@gmail.com>
Subject Re: SPARK Issue in Standalone cluster
Date Tue, 22 Aug 2017 08:56:20 GMT
Hi everyone,

I have a huge dataframe with 1 billion rows and each row is a nested list.
That being said, I want to train some ML models on this df but due to the
huge size, I get out memory error on one of my nodes when I run fit
function.

currently, my configuration is:
144 cores, 16 cores for each of the 8 nodes.
100gb of ram for each slave and 100gb of ram for the driver. I set the
maxResultSize to be 20gb.

Do you have any suggestion so far?

I can think of splitting the data to multiple dataframes and then training
the model on each individually but besides the longer runtime, I learned
that fit function overwrites the previous model each time I call it. Isn't
there a way to get the fit function to train the new model with regard to
the previously trained model?

Thanks





On Sun, Aug 6, 2017 at 11:04 PM, Gourav Sengupta <gourav.sengupta@gmail.com>
wrote:

> Hi Marco,
>
> thanks a ton, I will surely use those alternatives.
>
>
> Regards,
> Gourav Sengupta
>
> On Sun, Aug 6, 2017 at 3:45 PM, Marco Mistroni <mmistroni@gmail.com>
> wrote:
>
>> Sengupta
>>  further to this, if you try the following notebook in databricks cloud,
>> it will read a .csv file , write to a parquet file and read it again (just
>> to count the number of rows stored)
>> Please note that the path to the csv file might differ for you.....
>> So, what you will need todo is
>> 1 - create an account to community.cloud.databricks.com
>> 2 - upload the .csv file onto the Data of your databricks private cluster
>> 3  - run the script. that will store the data on the distrubuted
>> filesystem of the databricks cloudn (dbfs)
>>
>> It's worth investing in this free databricks cloud as it can create a
>> cluster for you with minimal effort, and it's  a very easy way to test your
>> spark scripts on a real cluster
>>
>> hope this helps
>> kr
>>
>> ##################################
>> from pyspark.sql import SQLContext
>>
>> from random import randint
>> from time import sleep
>> from pyspark.sql.session import SparkSession
>> import logging
>> logger = logging.getLogger(__name__)
>> logger.setLevel(logging.INFO)
>> ch = logging.StreamHandler()
>> logger.addHandler(ch)
>>
>>
>> import sys
>>
>> def read_parquet_file(parquetFileName):
>>   logger.info('Reading now the parquet files we just created...:%s',
>> parquetFileName)
>>   parquet_data = sqlContext.read.parquet(parquetFileName)
>>   logger.info('Parquet file has %s', parquet_data.count())
>>
>> def dataprocessing(filePath, count, sqlContext):
>>     logger.info( 'Iter count is:%s' , count)
>>     if count == 0:
>>         print 'exiting'
>>     else:
>>         df_traffic_tmp = sqlContext.read.format("csv").
>> option("header",'true').load(filePath)
>>         logger.info( '#############################DataSet has:%s' ,
>> df_traffic_tmp.count())
>>         logger.info('WRting to a parquet file')
>>         parquetFileName = "dbfs:/myParquetDf2.parquet"
>>         df_traffic_tmp.write.parquet(parquetFileName)
>>         sleepInterval = randint(10,100)
>>         logger.info( '#############################Sleeping for %s' ,
>> sleepInterval)
>>         sleep(sleepInterval)
>>         read_parquet_file(parquetFileName)
>>         dataprocessing(filePath, count-1, sqlContext)
>>
>> filename = '/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This
>> path might differ for you
>> iterations = 1
>> logger.info('----------------------')
>> logger.info('Filename:%s', filename)
>> logger.info('Iterations:%s', iterations )
>> logger.info('----------------------')
>>
>> logger.info ('Initializing sqlContext')
>> logger.info( '........Starting spark..........Loading from%s for %s
>> iterations' , filename, iterations)
>> logger.info(  'Starting up....')
>> sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
>> logger.info ('Initializing sqlContext')
>> sqlContext = SQLContext(sc)
>> dataprocessing(filename, iterations, sqlContext)
>> logger.info('Out of here..')
>> ######################################
>>
>>
>> On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mmistroni@gmail.com>
>> wrote:
>>
>>> Uh believe me there are lots of ppl on this list who will send u code
>>> snippets if u ask... 😀
>>>
>>> Yes that is what Steve pointed out, suggesting also that for that simple
>>> exercise you should perform all operations on a spark standalone instead
>>> (or alt. Use an nfs on the cluster)
>>> I'd agree with his suggestion....
>>> I suggest u another alternative:
>>> https://community.cloud.databricks.com/
>>>
>>> That's a ready made cluster and you can run your spark app as well store
>>> data on the cluster (well I haven't tried myself but I assume it's
>>> possible).   Try that out... I will try ur script there as I have an
>>> account there (though I guess I'll get there before me.....)
>>>
>>> Try that out and let me know if u get stuck....
>>> Kr
>>>
>>> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengupta@gmail.com>
>>> wrote:
>>>
>>>> Hi Marco,
>>>>
>>>> For the first time in several years FOR THE VERY FIRST TIME. I am
>>>> seeing someone actually executing code and providing response. It feel
>>>> wonderful that at least someone considered to respond back by executing
>>>> code and just did not filter out each and every technical details to brood
>>>> only on my superb social skills, while claiming the reason for ignoring
>>>> technical details is that it elementary. I think that Steve also is the
>>>> first person who could answer the WHY of an elementary question instead of
>>>> saying that is how it is and pointed out to the correct documentation.
>>>>
>>>> That code works fantastically. But the problem which I have tried to
>>>> find out is while writing out the data and not reading it.
>>>>
>>>>
>>>> So if you see try to read the data from the same folder which has the
>>>> same file across all the nodes then it will work fine. In fact that is what
>>>> should work.
>>>>
>>>> What does not work is that if you try to write back the file and then
>>>> read it once again from the location you have written that is when the
>>>> issue starts happening.
>>>>
>>>> Therefore if in my code you were to save the pandas dataframe as a CSV
>>>> file and then read it then you will find the following observations:
>>>>
>>>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ---------------------------
>>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>>>> columns=list('ABCD'))
>>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>>>> header=True, sep=",", index=0)
>>>> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/spa
>>>> rkdata/testdir/")
>>>> testdf.cache()
>>>> testdf.count()
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ---------------------------
>>>>
>>>>
>>>> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN
>>>> WHICH THE DATA DOES NOT EXISTS
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ---------------------------
>>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>>>> columns=list('ABCD'))
>>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>>>> header=True, sep=",", index=0)
>>>> testdf = spark.read.load("file:///Users/gouravsengupta/Development/sp
>>>> ark/sparkdata/testdir/")
>>>> testdf.cache()
>>>> testdf.count()
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ---------------------------
>>>>
>>>>
>>>> if you execute my code then also you will surprisingly see that the
>>>> writes in the nodes which is not the master node does not complete moving
>>>> the files from the _temporary folder to the main one.
>>>>
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>>
>>>>
>>>> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistroni@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello
>>>>>  please have a look at this. it'sa simple script that just read a
>>>>> dataframe for n time, sleeping at random interval. i used it to test
memory
>>>>> issues that another user was experiencing on a spark cluster
>>>>>
>>>>> you should run it like this e.g
>>>>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv>
>>>>> <num of iterations>
>>>>>
>>>>> i ran it on the cluster like this
>>>>>
>>>>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>>>>> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
>>>>> file:///root/pyscripts/tree_addhealth.csv
>>>>>
>>>>> hth, ping me back if you have issues
>>>>> i do agree with Steve's comments.... if you want to test your  spark
>>>>> script s just for playing, do it on  a standaone server on your localhost.
>>>>> Moving to a c luster is just a matter of deploying your script and mke
sure
>>>>> you have a common place where to read and store the data..... SysAdmin
>>>>> should give you this when they setup the cluster...
>>>>>
>>>>> kr
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>
>>>>>> Hi Marco,
>>>>>>
>>>>>> I am sincerely obliged for your kind time and response. Can you
>>>>>> please try the solution that you have so kindly suggested?
>>>>>>
>>>>>> It will be a lot of help if you could kindly execute the code that
I
>>>>>> have given. I dont think that anyone has yet.
>>>>>>
>>>>>> There are lots of fine responses to my question here, but if you
read
>>>>>> the last response from Simon, it comes the closest to being satisfactory.
I
>>>>>> am sure even he did not execute the code, but at least he came quite
close
>>>>>> to understanding what the problem is.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistroni@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello
>>>>>>>  my 2 cents here, hope it helps
>>>>>>> If you want to just to play around with Spark, i'd leave Hadoop
out,
>>>>>>> it's an unnecessary dependency that you dont need for just running
a python
>>>>>>> script
>>>>>>> Instead do the following:
>>>>>>> - got to the root of our master / slave node. create a directory
>>>>>>> /root/pyscripts
>>>>>>> - place your csv file there as well as the python script
>>>>>>> - run the script to replicate the whole directory  across the
>>>>>>> cluster (i believe it's called copy-script.sh)
>>>>>>> - then run your spark-submit , it will be something lke
>>>>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>>>>> - in your python script, as part of your processing, write the
>>>>>>> parquet file in directory /root/pyscripts
>>>>>>>
>>>>>>> If you have an AWS account and you are versatile with that -
you
>>>>>>> need to setup bucket permissions etc - , you can just
>>>>>>> - store your file in one of your S3 bucket
>>>>>>> - create an EMR cluster
>>>>>>> - connect to master or slave
>>>>>>> - run your  scritp that reads from the s3 bucket and write to
the
>>>>>>> same s3 bucket
>>>>>>>
>>>>>>>
>>>>>>> Feel free to mail me privately, i have a working script i have
used
>>>>>>> to test some code on spark standalone cluster
>>>>>>> hth
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Steve,
>>>>>>>>
>>>>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>>>>
>>>>>>>> I am now going through the documentation (
>>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes
much
>>>>>>>> much more sense now.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Gourav Sengupta
>>>>>>>>
>>>>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>>>>> stevel@hortonworks.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <
>>>>>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi Steve,
>>>>>>>>>
>>>>>>>>> I have written a sincere note of apology to everyone
in a separate
>>>>>>>>> email. I sincerely request your kind forgiveness before
hand if anything
>>>>>>>>> does sound impolite in my emails, in advance.
>>>>>>>>>
>>>>>>>>> Let me first start by thanking you.
>>>>>>>>>
>>>>>>>>> I know it looks like I formed all my opinion based on
that
>>>>>>>>> document, but that is not the case at all. If you or
anyone tries to
>>>>>>>>> execute the code that I have given then they will see
what I mean. Code
>>>>>>>>> speaks louder and better than words for me.
>>>>>>>>>
>>>>>>>>> So I am not saying you are wrong. I am asking verify
and expecting
>>>>>>>>> someone will be able to correct  a set of understanding
that a moron like
>>>>>>>>> me has gained after long hours of not having anything
better to do.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> SCENARIO: there are two files file1.csv and file2.csv
stored in
>>>>>>>>> HDFS with replication 2 and there is a HADOOP cluster
of three nodes. All
>>>>>>>>> these nodes have SPARK workers (executors) running in
them.  Both are
>>>>>>>>> stored in the following way:
>>>>>>>>> -----------------------------------------------------
>>>>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>>>>> | (master)     |                     |              
     |
>>>>>>>>> -----------------------------------------------------
>>>>>>>>> | file1.csv      |                     | file1.csv  
  |
>>>>>>>>> -----------------------------------------------------
>>>>>>>>> |                    |  file2.csv      | file2.csv  
  |
>>>>>>>>> -----------------------------------------------------
>>>>>>>>> | file3.csv      |  file3.csv      |                
  |
>>>>>>>>> -----------------------------------------------------
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN
DRAWN:
>>>>>>>>> HDFS replication does not store the same file in all
the nodes in
>>>>>>>>> the cluster. So if I have three nodes and the replication
is two then the
>>>>>>>>> same file will be stored physically in two nodes in the
cluster. Does that
>>>>>>>>> sound right?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> HDFS breaks files up into blocks (default = 128MB). If
a .csv file
>>>>>>>>> is > 128 then it will be broken up into blocks
>>>>>>>>>
>>>>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>>>>
>>>>>>>>> and each block will be replicated. With replication =
2 there will
>>>>>>>>> be two copies of each block, but the file itself can
span > 2 hosts.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>>>>> If SPARK is trying to process to the records then I am
expecting
>>>>>>>>> that WORKER2 should not be processing file1.csv, and
similary WORKER 1
>>>>>>>>> should not be processing file2.csv and WORKER3 should
not be processing
>>>>>>>>> file3.csv. Because in case WORKER2 was trying to process
file1.csv then it
>>>>>>>>> will actually causing network transmission of the file
unnecessarily.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Spark prefers to schedule work locally, so as to save
on network
>>>>>>>>> traffic, but it schedules for execution time over waiting
for workers free
>>>>>>>>> on the node with the data. IF a block is on nodes 2 and
3 but there is only
>>>>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>>>>
>>>>>>>>> There's details on whether/how work across blocks takes
place
>>>>>>>>> which I'm avoiding. For now know those formats which
are "splittable" will
>>>>>>>>> have work scheduled by block. If you use Parquet/ORC/avro
for your data and
>>>>>>>>> compress with snappy, it will be split. This gives you
maximum performance
>>>>>>>>> as >1 thread can work on different blocks. That is,
if file1 is split into
>>>>>>>>> three blocks, three worker threads can process it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN,
PLEASE
>>>>>>>>> CLARIFY THIS):
>>>>>>>>> if WORKER 2 is not processing file1.csv then how does
it matter
>>>>>>>>> whether the file is there or not at all in the system?
Should not SPARK
>>>>>>>>> just ask the workers to process the files which are avialable
in the worker
>>>>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are
not available then
>>>>>>>>> file2.csv will not be processed at all.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> locality is best-effort, not guaranteed.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE
CODE
>>>>>>>>> SHOULD BE EXECUTED (Its been pointed out that I am learning
SPARK, and even
>>>>>>>>> I did not take more than 13 mins to set up the cluster
and run the code).
>>>>>>>>>
>>>>>>>>> Once you execute the code then you will find that:
>>>>>>>>> 1.  if the path starts with file:/// while reading back
then
>>>>>>>>> there is no error reported, but the number of records
reported back are
>>>>>>>>> only those records in the worker which also has the server.
>>>>>>>>> 2. also you will notice that once you cache the file
before
>>>>>>>>> writing the partitions are ditributed nicely across the
workers, and while
>>>>>>>>> writing back, the dataframe partitions does write properly
to the worker
>>>>>>>>> node in the Master, but the workers in the other system
have the files
>>>>>>>>> written in _temporary folder which does not get copied
back to the main
>>>>>>>>> folder. Inspite of this the job is not reported as failed
in SPARK.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This gets into the "commit protocol". You don't want
to know all
>>>>>>>>> the dirty details (*) but essentially its this
>>>>>>>>>
>>>>>>>>> 1. Every worker writes its output to a directory under
the
>>>>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>>>>> 2. it is the spark driver which "commits" the job by
moving the
>>>>>>>>> output from the individual workers from the temporary
directories into
>>>>>>>>> $dest, then deleting $dest/_temporary
>>>>>>>>> 3. For which it needs to be able to list all the output
in
>>>>>>>>> $dest/_temporary
>>>>>>>>>
>>>>>>>>> In your case, only the output on the same node of the
driver is
>>>>>>>>> being committed, because only those files can be listed
and moved. The
>>>>>>>>> output on the other nodes isn't seen, so isn't committed,
nor cleaned up.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Now in my own world, if I see, the following things are
happening,
>>>>>>>>> something is going wrong (with me):
>>>>>>>>> 1. SPARK transfers files from different systems to process,
>>>>>>>>> instead of processing them locally (I do not have code
to prove this, and
>>>>>>>>> therefore its just an assumption)
>>>>>>>>> 2. SPARK cannot determine when the writes are failing
in
>>>>>>>>> standalone clusters workers and reports success (code
is there for this)
>>>>>>>>> 3. SPARK reports back number of records in the worker
running in
>>>>>>>>> the master node when count() is given without reporting
an error while
>>>>>>>>> using file:/// and reports an error when I mention the
path
>>>>>>>>> without file:/// (for SPARK 2.1.x onwards, code is there
for this)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> s everyone's been saying, file:// requires a shared filestore,
>>>>>>>>> with uniform paths everywhere. That's needed to list
the files to process,
>>>>>>>>> read the files in the workers and commit the final output.
NFS
>>>>>>>>> cross-mounting is the simplest way to do this, especially
as for three
>>>>>>>>> nodes HDFS is overkill: more services to keep running,
no real fault
>>>>>>>>> tolerance. Export a directory tree from one of the servers,
give the rest
>>>>>>>>> access to it, don't worry about bandwidth use as the
shared disk itself
>>>>>>>>> will become the bottleneck
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I very sincerely hope with your genuine help the bar
of language
>>>>>>>>> and social skills will be lowered for me. And everyone
will find a way to
>>>>>>>>> excuse me and not qualify this email as a means to measure
my extremely
>>>>>>>>> versatile and amazingly vivid social skills. It will
be a lot of help to
>>>>>>>>> just focus on the facts related to machines, data, error
and (the language
>>>>>>>>> that I somehow understand better) code.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> My sincere apologies once again, as I am 100% sure that
I did not
>>>>>>>>> meet the required social and language skills.
>>>>>>>>>
>>>>>>>>> Thanks a ton once again for your kindness, patience and
>>>>>>>>> understanding.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Gourav Sengupta
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> * for the curious, the details of the v1 and v2 commit
protocols
>>>>>>>>> are
>>>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>>>>
>>>>>>>>> Like I said: you don't want to know the details, and
you really
>>>>>>>>> don't want to step through Hadoop's FileOutputCommitter
to see what's going
>>>>>>>>> on. The Spark side is much easier to follow.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Mime
View raw message