spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Imran Rashid <iras...@cloudera.com>
Subject Re: 2GB limit for partitions?
Date Tue, 03 Feb 2015 23:44:53 GMT
Thanks for the explanations, makes sense.  For the record looks like this
was worked on a while back (and maybe the work is even close to a solution?)

https://issues.apache.org/jira/browse/SPARK-1476

and perhaps an independent solution was worked on here?

https://issues.apache.org/jira/browse/SPARK-1391


On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin <rxin@databricks.com> wrote:

> cc dev list
>
>
> How are you saving the data? There are two relevant 2GB limits:
>
> 1. Caching
>
> 2. Shuffle
>
>
> For caching, a partition is turned into a single block.
>
> For shuffle, each map partition is partitioned into R blocks, where R =
> number of reduce tasks. It is unlikely a shuffle block > 2G, although it
> can still happen.
>
> I think the 2nd problem is easier to fix than the 1st, because we can
> handle that in the network transport layer. It'd require us to divide the
> transfer of a very large block into multiple smaller blocks.
>
>
>
> On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid <irashid@cloudera.com> wrote:
>
>> Michael,
>>
>> you are right, there is definitely some limit at 2GB.  Here is a trivial
>> example to demonstrate it:
>>
>> import org.apache.spark.storage.StorageLevel
>> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new
>> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
>> d.count()
>>
>> It gives the same error you are observing.  I was under the same
>> impression as Sean about the limits only being on blocks, not partitions --
>> but clearly that isn't the case here.
>>
>> I don't know the whole story yet, but I just wanted to at least let you
>> know you aren't crazy :)
>> At the very least this suggests that you might need to make smaller
>> partitions for now.
>>
>> Imran
>>
>>
>> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <
>> m_albert137@yahoo.com.invalid> wrote:
>>
>>> Greetings!
>>>
>>> Thanks for the response.
>>>
>>> Below is an example of the exception I saw.
>>> I'd rather not post code at the moment, so I realize it is completely
>>> unreasonable to ask for a diagnosis.
>>> However, I will say that adding a "partitionBy()" was the last change
>>> before this error was created.
>>>
>>>
>>> Thanks for your time and any thoughts you might have.
>>>
>>> Sincerely,
>>>  Mike
>>>
>>>
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
>>> failure: Lost task 4.3 in stage 5.0 (TID 6012,
>>> ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
>>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>>>     at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>>>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
>>>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>>>     at
>>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
>>>     at
>>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
>>>     at
>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>     at
>>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>>     at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>     at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>     at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>     at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>     at
>>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>>
>>>
>>>   ------------------------------
>>>  *From:* Sean Owen <sowen@cloudera.com>
>>> *To:* Michael Albert <m_albert137@yahoo.com>
>>> *Cc:* "user@spark.apache.org" <user@spark.apache.org>
>>> *Sent:* Monday, February 2, 2015 10:13 PM
>>> *Subject:* Re: 2GB limit for partitions?
>>>
>>> The limit is on blocks, not partitions. Partitions have many blocks.
>>>
>>> It sounds like you are creating very large values in memory, but I'm
>>> not sure given your description. You will run into problems if a
>>> single object is more than 2GB, of course. More of the stack trace
>>> might show what is mapping that much memory.
>>>
>>> If you simply want data into 1000 files it's a lot simpler. Just
>>> repartition into 1000 partitions and save the data. If you need more
>>> control over what goes into which partition, use a Partitioner, yes.
>>>
>>>
>>>
>>> On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
>>> <m_albert137@yahoo.com.invalid> wrote:
>>> > Greetings!
>>> >
>>> > SPARK-1476 says that there is a 2G limit for "blocks".
>>> > Is this the same as a 2G limit for partitions (or approximately so?)?
>>> >
>>> >
>>> > What I had been attempting to do is the following.
>>> > 1) Start with a moderately large data set (currently about 100GB, but
>>> > growing).
>>> > 2) Create about 1,000 files (yes, files) each representing a subset of
>>> the
>>> > data.
>>> >
>>> > The current attempt I am working on is something like this.
>>> > 1) Do a "map" whose output key indicates which of the 1,000 files it
>>> will go
>>> > into and whose value is what I will want to stick into the file.
>>> > 2) Partition the data and use the body of mapPartition to open a file
>>> and
>>> > save the data.
>>> >
>>> > My apologies, this is actually embedded in a bigger mess, so I won't
>>> post
>>> > it.
>>> >
>>> > However, I get errors telling me that there is an
>>> "IllegalArgumentException:
>>> > Size exceeds Inter.MAX_VALUE", with sun.nio.ch.FileChannelImpl.map at
>>> the
>>> > top of the stack.  This leads me to think that I have hit the limit or
>>> > partition and/or block size.
>>> >
>>> > Perhaps this is not a good way to do it?
>>> >
>>> > I suppose I could run 1,000 passes over the data, each time collecting
>>> the
>>> > output for one of my 1,000 final files, but that seems likely to be
>>> > painfully slow to run.
>>> >
>>> > Am I missing something?
>>> >
>>> > Admittedly, this is an odd use case....
>>> >
>>> > Thanks!
>>> >
>>> > Sincerely,
>>> >  Mike Albert
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>>
>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message