spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jim Blomo <jim.bl...@gmail.com>
Subject Re: pySpark memory usage
Date Wed, 14 May 2014 17:50:28 GMT
Should add that I had to tweak the numbers a bit to keep above swap
threshold, but below the "Too many open files" error (`ulimit -n` is
32768).

On Wed, May 14, 2014 at 10:47 AM, Jim Blomo <jim.blomo@gmail.com> wrote:
> That worked amazingly well, thank you Matei!  Numbers that worked for
> me were 400 for the textFile()s, 1500 for the join()s.
>
> On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia <matei.zaharia@gmail.com> wrote:
>> Hey Jim, unfortunately external spilling is not implemented in Python right now.
While it would be possible to update combineByKey to do smarter stuff here, one simple workaround
you can try is to launch more map tasks (or more reduce tasks). To set the minimum number
of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”,
1000)).
>>
>> Matei
>>
>> On May 12, 2014, at 5:47 PM, Jim Blomo <jim.blomo@gmail.com> wrote:
>>
>>> Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.
>>>
>>> I noticed that the S3 exception seem to occur more frequently when the
>>> box is swapping.  Why is the box swapping?  combineByKey seems to make
>>> the assumption that it can fit an entire partition in memory when
>>> doing the combineLocally step.  I'm going to try to break this apart
>>> but will need some sort of heuristic options include looking at memory
>>> usage via the resource module and trying to keep below
>>> 'spark.executor.memory', or using batchSize to limit the number of
>>> entries in the dictionary.  Let me know if you have any opinions.
>>>
>>> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <ilikerps@gmail.com> wrote:
>>>> I'd just like to update this thread by pointing to the PR based on our
>>>> initial design: https://github.com/apache/spark/pull/640
>>>>
>>>> This solution is a little more general and avoids catching IOException
>>>> altogether. Long live exception propagation!
>>>>
>>>>
>>>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pwendell@gmail.com>
wrote:
>>>>>
>>>>> Hey Jim,
>>>>>
>>>>> This IOException thing is a general issue that we need to fix and your
>>>>> observation is spot-in. There is actually a JIRA for it here I created
a few
>>>>> days ago:
>>>>> https://issues.apache.org/jira/browse/SPARK-1579
>>>>>
>>>>> Aaron is assigned on that one but not actively working on it, so we'd
>>>>> welcome a PR from you on this if you are interested.
>>>>>
>>>>> The first thought we had was to set a volatile flag when the reader sees
>>>>> an exception (indicating there was a failure in the task) and avoid
>>>>> swallowing the IOException in the writer if this happens. But I think
there
>>>>> is a race here where the writer sees the error first before the reader
knows
>>>>> what is going on.
>>>>>
>>>>> Anyways maybe if you have a simpler solution you could sketch it out
in
>>>>> the JIRA and we could talk over there. The current proposal in the JIRA
is
>>>>> somewhat complicated...
>>>>>
>>>>> - Patrick
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <jim.blomo@gmail.com>
wrote:
>>>>>>
>>>>>> FYI, it looks like this "stdin writer to Python finished early" error
was
>>>>>> caused by a break in the connection to S3, from which the data was
being
>>>>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>>>>> catching can potentially mask an exception for the data source, and
that is
>>>>>> indeed what I see happening.  The underlying libraries (jets3t and
>>>>>> httpclient) do have retry capabilities, but I don't see a great way
of
>>>>>> setting them through Spark code.  Instead I added the patch below
which
>>>>>> kills the worker on the exception.  This allows me to completely
load the
>>>>>> data source after a few worker retries.
>>>>>>
>>>>>> Unfortunately, java.net.SocketException is the same error that is
>>>>>> sometimes expected from the client when using methods like take().
 One
>>>>>> approach around this conflation is to create a new locally scoped
exception
>>>>>> class, eg. WriterException, catch java.net.SocketException during
output
>>>>>> writing, then re-throw the new exception.  The worker thread could
then
>>>>>> distinguish between the reasons java.net.SocketException might be
thrown.
>>>>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>>>>
>>>>>> Let me know if I should open a ticket or discuss this on the developers
>>>>>> list instead.  Best,
>>>>>>
>>>>>> Jim
>>>>>>
>>>>>> diff --git
>>>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> index 0d71fdb..f31158c 100644
>>>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>>>>             readerException = e
>>>>>>             Try(worker.shutdownOutput()) // kill Python worker process
>>>>>>
>>>>>> +          case e: java.net.SocketException =>
>>>>>> +           // This can happen if a connection to the datasource,
eg S3,
>>>>>> resets
>>>>>> +           // or is otherwise broken
>>>>>> +            readerException = e
>>>>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>>>>> +
>>>>>>           case e: IOException =>
>>>>>>             // This can happen for legitimate reasons if the Python
code
>>>>>> stops returning data
>>>>>>             // before we are done passing elements through, e.g.,
for
>>>>>> take(). Just log a message to
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <jim.blomo@gmail.com>
wrote:
>>>>>>>
>>>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965,
min:
>>>>>>> 343)
>>>>>>>
>>>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <matei.zaharia@gmail.com>
>>>>>>> wrote:
>>>>>>>> Okay, thanks. Do you have any info on how large your records
and data
>>>>>>>> file are? I'd like to reproduce and fix this.
>>>>>>>>
>>>>>>>> Matei
>>>>>>>>
>>>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.blomo@gmail.com>
wrote:
>>>>>>>>
>>>>>>>>> Hi Matei, thanks for working with me to find these issues.
>>>>>>>>>
>>>>>>>>> To summarize, the issues I've seen are:
>>>>>>>>> 0.9.0:
>>>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>
>>>>>>>>> SNAPSHOT 2014-03-18:
>>>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>>>>>>> Java heap space.  To me this indicates a memory leak
since Spark
>>>>>>>>> should simply be counting records of size < 3MB
>>>>>>>>> - Without persist(), "stdin writer to Python finished
early" hangs
>>>>>>>>> the
>>>>>>>>> application, unknown root cause
>>>>>>>>>
>>>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308
with
>>>>>>>>> debugging turned on.  This gives me the stacktrace on
the new "stdin"
>>>>>>>>> problem:
>>>>>>>>>
>>>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python
finished
>>>>>>>>> early
>>>>>>>>> java.net.SocketException: Connection reset
>>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>>>>>>>       at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>>>>>>>       at
>>>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>>>>>>>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>>>>>>>       at
>>>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>>>>>>>       at
>>>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>>>>>>>       at
>>>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>>       at
>>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>>       at java.io.DataInputStream.read(DataInputStream.java:100)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>>>>>>>       at
>>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>>>>>>>       at
>>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>>       at
>>>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>>>>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>       at
>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>       at
>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>>>>>>> <matei.zaharia@gmail.com> wrote:
>>>>>>>>>> Cool, thanks for the update. Have you tried running
a branch with
>>>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?)
Also, what
>>>>>>>>>> memory leak issue are you referring to, is it separate
from this? (Couldn't
>>>>>>>>>> find it earlier in the thread.)
>>>>>>>>>>
>>>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template
to
>>>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO,
console
>>>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure
this file is present in
>>>>>>>>>> "conf" on all workers.
>>>>>>>>>>
>>>>>>>>>> BTW I've managed to run PySpark with this fix on
some reasonably
>>>>>>>>>> large S3 data (multiple GB) and it was fine. It might
happen only if records
>>>>>>>>>> are large, or something like that. How much heap
are you giving to your
>>>>>>>>>> executors, and does it show that much in the web
UI?
>>>>>>>>>>
>>>>>>>>>> Matei
>>>>>>>>>>
>>>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.blomo@gmail.com>
wrote:
>>>>>>>>>>
>>>>>>>>>>> I think the problem I ran into in 0.9 is covered
in
>>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>>>
>>>>>>>>>>> When I kill the python process, the stacktrace
I gets indicates
>>>>>>>>>>> that
>>>>>>>>>>> this happens at initialization.  It looks like
the initial write to
>>>>>>>>>>> the Python process does not go through, and then
the iterator hangs
>>>>>>>>>>> waiting for output.  I haven't had luck turning
on debugging for
>>>>>>>>>>> the
>>>>>>>>>>> executor process.  Still trying to learn the
lgo4j properties that
>>>>>>>>>>> need to be set.
>>>>>>>>>>>
>>>>>>>>>>> No luck yet on tracking down the memory leak.
>>>>>>>>>>>
>>>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception
in task ID 11
>>>>>>>>>>> org.apache.spark.SparkException: Python worker
exited unexpectedly
>>>>>>>>>>> (crashed)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>>>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>>>>>>>>      at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>>>>>>      at java.security.AccessController.doPrivileged(Native
Method)
>>>>>>>>>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>>>>>>>>>      at
>>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>>>>>>>>      at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>     at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>      at java.lang.Thread.run(Thread.java:724)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.blomo@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> I've only tried 0.9, in which I ran into
the `stdin writer to
>>>>>>>>>>>> Python
>>>>>>>>>>>> finished early` so frequently I wasn't able
to load even a 1GB
>>>>>>>>>>>> file.
>>>>>>>>>>>> Let me know if I can provide any other info!
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>>>>>>>>>> <matei.zaharia@gmail.com> wrote:
>>>>>>>>>>>>> I see, did this also fail with previous
versions of Spark (0.9 or
>>>>>>>>>>>>> 0.8)? We'll try to look into these, seems
like a serious error.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo
<jim.blomo@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks, Matei.  I am running "Spark
1.0.0-SNAPSHOT built for
>>>>>>>>>>>>>> Hadoop
>>>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I tried batchSizes of 512, 10, and
1 and each got me further but
>>>>>>>>>>>>>> none
>>>>>>>>>>>>>> have succeeded.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I can get this to work -- with manual
interventions -- if I omit
>>>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)`
and set
>>>>>>>>>>>>>> batchSize=1.  5
>>>>>>>>>>>>>> of the 175 executors hung, and I
had to kill the python process
>>>>>>>>>>>>>> to get
>>>>>>>>>>>>>> things going again.  The only indication
of this in the logs was
>>>>>>>>>>>>>> `INFO
>>>>>>>>>>>>>> python.PythonRDD: stdin writer to
Python finished early`.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> With batchSize=1 and persist, a new
memory error came up in
>>>>>>>>>>>>>> several
>>>>>>>>>>>>>> tasks, before the app was failed:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor:
Uncaught exception in
>>>>>>>>>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java
heap space
>>>>>>>>>>>>>>     at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>>>     at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>>>>>>>>>     at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>>>>>>     at
>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> There are other exceptions, but I
think they all stem from the
>>>>>>>>>>>>>> above,
>>>>>>>>>>>>>> eg. org.apache.spark.SparkException:
Error sending message to
>>>>>>>>>>>>>> BlockManagerMaster
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know if there are other settings
I should try, or if I
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>> try a newer snapshot.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks again!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM,
Matei Zaharia
>>>>>>>>>>>>>> <matei.zaharia@gmail.com> wrote:
>>>>>>>>>>>>>>> Hey Jim,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize"
parameter to PySpark that
>>>>>>>>>>>>>>> makes it group multiple objects
together before passing them between Java
>>>>>>>>>>>>>>> and Python, but this may be too
high by default. Try passing batchSize=10 to
>>>>>>>>>>>>>>> your SparkContext constructor
to lower it (the default is 1024). Or even
>>>>>>>>>>>>>>> batchSize=1 to match earlier
versions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM,
Jim Blomo <jim.blomo@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi all, I'm wondering if
there's any settings I can use to
>>>>>>>>>>>>>>>> reduce the
>>>>>>>>>>>>>>>> memory needed by the PythonRDD
when computing simple stats.  I
>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>> getting OutOfMemoryError
exceptions while calculating count()
>>>>>>>>>>>>>>>> on big,
>>>>>>>>>>>>>>>> but not absurd, records.
 It seems like PythonRDD is trying to
>>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>>> too many of these records
in memory, when all that is needed
>>>>>>>>>>>>>>>> is to
>>>>>>>>>>>>>>>> stream through them and count.
 Any tips for getting through
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> workload?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz')
# ~54GB of compressed
>>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> # the biggest individual
text line is ~3MB
>>>>>>>>>>>>>>>> parsed = session.map(lambda
l: l.split("\t",1)).map(lambda
>>>>>>>>>>>>>>>> (y,s):
>>>>>>>>>>>>>>>> (loads(y), loads(s)))
>>>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> parsed.count()
>>>>>>>>>>>>>>>> # will never finish: executor.Executor:
Uncaught exception
>>>>>>>>>>>>>>>> will FAIL
>>>>>>>>>>>>>>>> all executors
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Incidentally the whole app
appears to be killed, but this
>>>>>>>>>>>>>>>> error is not
>>>>>>>>>>>>>>>> propagated to the shell.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cluster:
>>>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory,
17GB swap,
>>>>>>>>>>>>>>>> spark.executor.memory=10GB)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Exception:
>>>>>>>>>>>>>>>> java.lang.OutOfMemoryError:
Java heap space
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>>>>>>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>

Mime
View raw message