spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eduardo Cusa <eduardo.c...@usmediaconsulting.com>
Subject Re: python : Out of memory: Kill process
Date Mon, 30 Mar 2015 14:18:05 GMT
Hi, I change my process flow.

Now I am processing a file per hour, instead of process at the end of the
day.

This decreased the memory comsuption .

Regards
Eduardo









On Thu, Mar 26, 2015 at 3:16 PM, Davies Liu <davies@databricks.com> wrote:

> Could you narrow down to a step which cause the OOM, something like:
>
> log2= self.sqlContext.jsonFile(path)
> log2.count()
> ...
> out.count()
> ...
>
> On Thu, Mar 26, 2015 at 10:34 AM, Eduardo Cusa
> <eduardo.cusa@usmediaconsulting.com> wrote:
> > the last try was without log2.cache() and still getting out of memory
> >
> > I using the following conf, maybe help:
> >
> >
> >
> >   conf = (SparkConf()
> >           .setAppName("LoadS3")
> >           .set("spark.executor.memory", "13g")
> >           .set("spark.driver.memory", "13g")
> >           .set("spark.driver.maxResultSize","2g")
> >           .set("spark.default.parallelism","200")
> >           .set("spark.kryoserializer.buffer.mb","512"))
> >   sc = SparkContext(conf=conf )
> >   sqlContext = SQLContext(sc)
> >
> >
> >
> >
> >
> > On Thu, Mar 26, 2015 at 2:29 PM, Davies Liu <davies@databricks.com>
> wrote:
> >>
> >> Could you try to remove the line `log2.cache()` ?
> >>
> >> On Thu, Mar 26, 2015 at 10:02 AM, Eduardo Cusa
> >> <eduardo.cusa@usmediaconsulting.com> wrote:
> >> > I running on ec2 :
> >> >
> >> > 1 Master : 4 CPU 15 GB RAM  (2 GB swap)
> >> >
> >> > 2 Slaves  4 CPU 15 GB RAM
> >> >
> >> >
> >> > the uncompressed dataset size is 15 GB
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, Mar 26, 2015 at 10:41 AM, Eduardo Cusa
> >> > <eduardo.cusa@usmediaconsulting.com> wrote:
> >> >>
> >> >> Hi Davies, I upgrade to 1.3.0 and still getting Out of Memory.
> >> >>
> >> >> I ran the same code as before, I need to make any changes?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> On Wed, Mar 25, 2015 at 4:00 PM, Davies Liu <davies@databricks.com>
> >> >> wrote:
> >> >>>
> >> >>> With batchSize = 1, I think it will become even worse.
> >> >>>
> >> >>> I'd suggest to go with 1.3, have a taste for the new DataFrame
API.
> >> >>>
> >> >>> On Wed, Mar 25, 2015 at 11:49 AM, Eduardo Cusa
> >> >>> <eduardo.cusa@usmediaconsulting.com> wrote:
> >> >>> > Hi Davies, I running 1.1.0.
> >> >>> >
> >> >>> > Now I'm following this thread that recommend use batchsize
> parameter
> >> >>> > =
> >> >>> > 1
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/pySpark-memory-usage-td3022.html
> >> >>> >
> >> >>> > if this does not work I will install  1.2.1 or  1.3
> >> >>> >
> >> >>> > Regards
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> > On Wed, Mar 25, 2015 at 3:39 PM, Davies Liu <
> davies@databricks.com>
> >> >>> > wrote:
> >> >>> >>
> >> >>> >> What's the version of Spark you are running?
> >> >>> >>
> >> >>> >> There is a bug in SQL Python API [1], it's fixed in 1.2.1
and
> 1.3,
> >> >>> >>
> >> >>> >> [1] https://issues.apache.org/jira/browse/SPARK-6055
> >> >>> >>
> >> >>> >> On Wed, Mar 25, 2015 at 10:33 AM, Eduardo Cusa
> >> >>> >> <eduardo.cusa@usmediaconsulting.com> wrote:
> >> >>> >> > Hi Guys, I running the following function with spark-submmit
> and
> >> >>> >> > de
> >> >>> >> > SO
> >> >>> >> > is
> >> >>> >> > killing my process :
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >   def getRdd(self,date,provider):
> >> >>> >> >     path='s3n://'+AWS_BUCKET+'/'+date+'/*.log.gz'
> >> >>> >> >     log2= self.sqlContext.jsonFile(path)
> >> >>> >> >     log2.registerTempTable('log_test')
> >> >>> >> >     log2.cache()
> >> >>> >>
> >> >>> >> You only visit the table once, cache does not help here.
> >> >>> >>
> >> >>> >> >     out=self.sqlContext.sql("SELECT user, tax from
log_test
> where
> >> >>> >> > provider =
> >> >>> >> > '"+provider+"'and country <> ''").map(lambda
row: (row.user,
> >> >>> >> > row.tax))
> >> >>> >> >     print "out1"
> >> >>> >> >     return  map((lambda (x,y): (x, list(y))),
> >> >>> >> > sorted(out.groupByKey(2000).collect()))
> >> >>> >>
> >> >>> >> 100 partitions (or less) will be enough for 2G dataset.
> >> >>> >>
> >> >>> >> >
> >> >>> >> >
> >> >>> >> > The input dataset has 57 zip files (2 GB)
> >> >>> >> >
> >> >>> >> > The same process with a smaller dataset completed
successfully
> >> >>> >> >
> >> >>> >> > Any ideas to debug is welcome.
> >> >>> >> >
> >> >>> >> > Regards
> >> >>> >> > Eduardo
> >> >>> >> >
> >> >>> >> >
> >> >>> >
> >> >>> >
> >> >>
> >> >>
> >> >
> >
> >
>

Mime
View raw message