spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jim Blomo <jim.bl...@gmail.com>
Subject Re: pySpark memory usage
Date Wed, 14 May 2014 17:47:02 GMT
That worked amazingly well, thank you Matei!  Numbers that worked for
me were 400 for the textFile()s, 1500 for the join()s.

On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia <matei.zaharia@gmail.com> wrote:
> Hey Jim, unfortunately external spilling is not implemented in Python right now. While
it would be possible to update combineByKey to do smarter stuff here, one simple workaround
you can try is to launch more map tasks (or more reduce tasks). To set the minimum number
of map tasks, you can pass it as a second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”,
1000)).
>
> Matei
>
> On May 12, 2014, at 5:47 PM, Jim Blomo <jim.blomo@gmail.com> wrote:
>
>> Thanks, Aaron, this looks like a good solution!  Will be trying it out shortly.
>>
>> I noticed that the S3 exception seem to occur more frequently when the
>> box is swapping.  Why is the box swapping?  combineByKey seems to make
>> the assumption that it can fit an entire partition in memory when
>> doing the combineLocally step.  I'm going to try to break this apart
>> but will need some sort of heuristic options include looking at memory
>> usage via the resource module and trying to keep below
>> 'spark.executor.memory', or using batchSize to limit the number of
>> entries in the dictionary.  Let me know if you have any opinions.
>>
>> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <ilikerps@gmail.com> wrote:
>>> I'd just like to update this thread by pointing to the PR based on our
>>> initial design: https://github.com/apache/spark/pull/640
>>>
>>> This solution is a little more general and avoids catching IOException
>>> altogether. Long live exception propagation!
>>>
>>>
>>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pwendell@gmail.com> wrote:
>>>>
>>>> Hey Jim,
>>>>
>>>> This IOException thing is a general issue that we need to fix and your
>>>> observation is spot-in. There is actually a JIRA for it here I created a
few
>>>> days ago:
>>>> https://issues.apache.org/jira/browse/SPARK-1579
>>>>
>>>> Aaron is assigned on that one but not actively working on it, so we'd
>>>> welcome a PR from you on this if you are interested.
>>>>
>>>> The first thought we had was to set a volatile flag when the reader sees
>>>> an exception (indicating there was a failure in the task) and avoid
>>>> swallowing the IOException in the writer if this happens. But I think there
>>>> is a race here where the writer sees the error first before the reader knows
>>>> what is going on.
>>>>
>>>> Anyways maybe if you have a simpler solution you could sketch it out in
>>>> the JIRA and we could talk over there. The current proposal in the JIRA is
>>>> somewhat complicated...
>>>>
>>>> - Patrick
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <jim.blomo@gmail.com> wrote:
>>>>>
>>>>> FYI, it looks like this "stdin writer to Python finished early" error
was
>>>>> caused by a break in the connection to S3, from which the data was being
>>>>> pulled.  A recent commit to PythonRDD noted that the current exception
>>>>> catching can potentially mask an exception for the data source, and that
is
>>>>> indeed what I see happening.  The underlying libraries (jets3t and
>>>>> httpclient) do have retry capabilities, but I don't see a great way of
>>>>> setting them through Spark code.  Instead I added the patch below which
>>>>> kills the worker on the exception.  This allows me to completely load
the
>>>>> data source after a few worker retries.
>>>>>
>>>>> Unfortunately, java.net.SocketException is the same error that is
>>>>> sometimes expected from the client when using methods like take().  One
>>>>> approach around this conflation is to create a new locally scoped exception
>>>>> class, eg. WriterException, catch java.net.SocketException during output
>>>>> writing, then re-throw the new exception.  The worker thread could then
>>>>> distinguish between the reasons java.net.SocketException might be thrown.
>>>>> Perhaps there is a more elegant way to do this in Scala, though?
>>>>>
>>>>> Let me know if I should open a ticket or discuss this on the developers
>>>>> list instead.  Best,
>>>>>
>>>>> Jim
>>>>>
>>>>> diff --git
>>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>> index 0d71fdb..f31158c 100644
>>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
>>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
>>>>>             readerException = e
>>>>>             Try(worker.shutdownOutput()) // kill Python worker process
>>>>>
>>>>> +          case e: java.net.SocketException =>
>>>>> +           // This can happen if a connection to the datasource, eg
S3,
>>>>> resets
>>>>> +           // or is otherwise broken
>>>>> +            readerException = e
>>>>> +            Try(worker.shutdownOutput()) // kill Python worker process
>>>>> +
>>>>>           case e: IOException =>
>>>>>             // This can happen for legitimate reasons if the Python code
>>>>> stops returning data
>>>>>             // before we are done passing elements through, e.g., for
>>>>> take(). Just log a message to
>>>>>
>>>>>
>>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <jim.blomo@gmail.com>
wrote:
>>>>>>
>>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count:
>>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min:
>>>>>> 343)
>>>>>>
>>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <matei.zaharia@gmail.com>
>>>>>> wrote:
>>>>>>> Okay, thanks. Do you have any info on how large your records
and data
>>>>>>> file are? I'd like to reproduce and fix this.
>>>>>>>
>>>>>>> Matei
>>>>>>>
>>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.blomo@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Hi Matei, thanks for working with me to find these issues.
>>>>>>>>
>>>>>>>> To summarize, the issues I've seen are:
>>>>>>>> 0.9.0:
>>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>
>>>>>>>> SNAPSHOT 2014-03-18:
>>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError:
>>>>>>>> Java heap space.  To me this indicates a memory leak since
Spark
>>>>>>>> should simply be counting records of size < 3MB
>>>>>>>> - Without persist(), "stdin writer to Python finished early"
hangs
>>>>>>>> the
>>>>>>>> application, unknown root cause
>>>>>>>>
>>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308
with
>>>>>>>> debugging turned on.  This gives me the stacktrace on the
new "stdin"
>>>>>>>> problem:
>>>>>>>>
>>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python
finished
>>>>>>>> early
>>>>>>>> java.net.SocketException: Connection reset
>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>>>>>>>       at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>>>>>>>       at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>>>>>>>>       at
>>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>>>>>>>       at
>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>       at
>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>       at
>>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69)
>>>>>>>>       at
>>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)
>>>>>>>>       at java.io.FilterInputStream.read(FilterInputStream.java:133)
>>>>>>>>       at
>>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)
>>>>>>>>       at
>>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
>>>>>>>>       at
>>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
>>>>>>>>       at
>>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
>>>>>>>>       at
>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
>>>>>>>>       at
>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>>>>>>>       at java.io.DataInputStream.read(DataInputStream.java:100)
>>>>>>>>       at
>>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
>>>>>>>>       at
>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
>>>>>>>>       at
>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
>>>>>>>>       at
>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192)
>>>>>>>>       at
>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175)
>>>>>>>>       at
>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>>>       at
>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
>>>>>>>>       at
>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>>>       at
>>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
>>>>>>>>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>       at
>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>       at
>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>       at
>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia
>>>>>>>> <matei.zaharia@gmail.com> wrote:
>>>>>>>>> Cool, thanks for the update. Have you tried running a
branch with
>>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?)
Also, what
>>>>>>>>> memory leak issue are you referring to, is it separate
from this? (Couldn't
>>>>>>>>> find it earlier in the thread.)
>>>>>>>>>
>>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template
to
>>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO,
console
>>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure
this file is present in
>>>>>>>>> "conf" on all workers.
>>>>>>>>>
>>>>>>>>> BTW I've managed to run PySpark with this fix on some
reasonably
>>>>>>>>> large S3 data (multiple GB) and it was fine. It might
happen only if records
>>>>>>>>> are large, or something like that. How much heap are
you giving to your
>>>>>>>>> executors, and does it show that much in the web UI?
>>>>>>>>>
>>>>>>>>> Matei
>>>>>>>>>
>>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.blomo@gmail.com>
wrote:
>>>>>>>>>
>>>>>>>>>> I think the problem I ran into in 0.9 is covered
in
>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323
>>>>>>>>>>
>>>>>>>>>> When I kill the python process, the stacktrace I
gets indicates
>>>>>>>>>> that
>>>>>>>>>> this happens at initialization.  It looks like the
initial write to
>>>>>>>>>> the Python process does not go through, and then
the iterator hangs
>>>>>>>>>> waiting for output.  I haven't had luck turning on
debugging for
>>>>>>>>>> the
>>>>>>>>>> executor process.  Still trying to learn the lgo4j
properties that
>>>>>>>>>> need to be set.
>>>>>>>>>>
>>>>>>>>>> No luck yet on tracking down the memory leak.
>>>>>>>>>>
>>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception
in task ID 11
>>>>>>>>>> org.apache.spark.SparkException: Python worker exited
unexpectedly
>>>>>>>>>> (crashed)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231)
>>>>>>>>>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:222)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>>>>>>>      at org.apache.spark.scheduler.Task.run(Task.scala:52)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>>>>>>>      at java.security.AccessController.doPrivileged(Native
Method)
>>>>>>>>>>      at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42)
>>>>>>>>>>      at
>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>>>>>>>>>>      at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>     at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>      at java.lang.Thread.run(Thread.java:724)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.blomo@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> I've only tried 0.9, in which I ran into the
`stdin writer to
>>>>>>>>>>> Python
>>>>>>>>>>> finished early` so frequently I wasn't able to
load even a 1GB
>>>>>>>>>>> file.
>>>>>>>>>>> Let me know if I can provide any other info!
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia
>>>>>>>>>>> <matei.zaharia@gmail.com> wrote:
>>>>>>>>>>>> I see, did this also fail with previous versions
of Spark (0.9 or
>>>>>>>>>>>> 0.8)? We'll try to look into these, seems
like a serious error.
>>>>>>>>>>>>
>>>>>>>>>>>> Matei
>>>>>>>>>>>>
>>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.blomo@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT
built for
>>>>>>>>>>>>> Hadoop
>>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I tried batchSizes of 512, 10, and 1
and each got me further but
>>>>>>>>>>>>> none
>>>>>>>>>>>>> have succeeded.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I can get this to work -- with manual
interventions -- if I omit
>>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)`
and set
>>>>>>>>>>>>> batchSize=1.  5
>>>>>>>>>>>>> of the 175 executors hung, and I had
to kill the python process
>>>>>>>>>>>>> to get
>>>>>>>>>>>>> things going again.  The only indication
of this in the logs was
>>>>>>>>>>>>> `INFO
>>>>>>>>>>>>> python.PythonRDD: stdin writer to Python
finished early`.
>>>>>>>>>>>>>
>>>>>>>>>>>>> With batchSize=1 and persist, a new memory
error came up in
>>>>>>>>>>>>> several
>>>>>>>>>>>>> tasks, before the app was failed:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor:
Uncaught exception in
>>>>>>>>>>>>> thread Thread[stdin writer for python,5,main]
>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap
space
>>>>>>>>>>>>>     at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>>>>>>>>     at java.lang.String.<init>(String.java:203)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>>>>>>>>>>>>>     at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:350)
>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.decode(Text.java:327)
>>>>>>>>>>>>>     at org.apache.hadoop.io.Text.toString(Text.java:254)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>>>>>>>>>>>>>     at
>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>
>>>>>>>>>>>>> There are other exceptions, but I think
they all stem from the
>>>>>>>>>>>>> above,
>>>>>>>>>>>>> eg. org.apache.spark.SparkException:
Error sending message to
>>>>>>>>>>>>> BlockManagerMaster
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me know if there are other settings
I should try, or if I
>>>>>>>>>>>>> should
>>>>>>>>>>>>> try a newer snapshot.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks again!
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei
Zaharia
>>>>>>>>>>>>> <matei.zaharia@gmail.com> wrote:
>>>>>>>>>>>>>> Hey Jim,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize"
parameter to PySpark that
>>>>>>>>>>>>>> makes it group multiple objects together
before passing them between Java
>>>>>>>>>>>>>> and Python, but this may be too high
by default. Try passing batchSize=10 to
>>>>>>>>>>>>>> your SparkContext constructor to
lower it (the default is 1024). Or even
>>>>>>>>>>>>>> batchSize=1 to match earlier versions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Matei
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim
Blomo <jim.blomo@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi all, I'm wondering if there's
any settings I can use to
>>>>>>>>>>>>>>> reduce the
>>>>>>>>>>>>>>> memory needed by the PythonRDD
when computing simple stats.  I
>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>> getting OutOfMemoryError exceptions
while calculating count()
>>>>>>>>>>>>>>> on big,
>>>>>>>>>>>>>>> but not absurd, records.  It
seems like PythonRDD is trying to
>>>>>>>>>>>>>>> keep
>>>>>>>>>>>>>>> too many of these records in
memory, when all that is needed
>>>>>>>>>>>>>>> is to
>>>>>>>>>>>>>>> stream through them and count.
 Any tips for getting through
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> workload?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Code:
>>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz')
# ~54GB of compressed
>>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> # the biggest individual text
line is ~3MB
>>>>>>>>>>>>>>> parsed = session.map(lambda l:
l.split("\t",1)).map(lambda
>>>>>>>>>>>>>>> (y,s):
>>>>>>>>>>>>>>> (loads(y), loads(s)))
>>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> parsed.count()
>>>>>>>>>>>>>>> # will never finish: executor.Executor:
Uncaught exception
>>>>>>>>>>>>>>> will FAIL
>>>>>>>>>>>>>>> all executors
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Incidentally the whole app appears
to be killed, but this
>>>>>>>>>>>>>>> error is not
>>>>>>>>>>>>>>> propagated to the shell.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cluster:
>>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB
swap,
>>>>>>>>>>>>>>> spark.executor.memory=10GB)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Exception:
>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java
heap space
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>>>>>>>>>>>>>    at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>>>>>>>>>>>>>    at
>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Mime
View raw message