spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jegan <jega...@gmail.com>
Subject Re: IllegalArgumentException: Size exceeds Integer.MAX_VALUE
Date Mon, 05 Oct 2015 23:37:10 GMT
Thanks for your suggestion Ted.

Unfortunately at this point of time I cannot go beyond 1000 partitions. I
am writing this data to BigQuery and it has a limit of 1000 jobs per day
for a table(they have some limits on this)  I currently create 1 load job
per partition. Is there any other work-around?

Thanks again.

Regards,
Jegan

On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu <yuzhihong@gmail.com> wrote:

> As a workaround, can you set the number of partitions higher in the
> sc.textFile method ?
>
> Cheers
>
> On Mon, Oct 5, 2015 at 3:31 PM, Jegan <jegansp@gmail.com> wrote:
>
>> Hi All,
>>
>> I am facing the below exception when the size of the file being read in a
>> partition is above 2GB. This is apparently because Java's limitation on
>> memory mapped files. It supports mapping only 2GB files.
>>
>> Caused by: java.lang.IllegalArgumentException: Size exceeds
>> Integer.MAX_VALUE
>>     at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
>>     at
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>>     at
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207)
>>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>>     at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102)
>>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791)
>>     at
>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
>>     at
>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>     at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>     at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> My use case is to read the files from S3 and do some processing. I am
>> caching the data like below in order to avoid SocketTimeoutExceptions from
>> another library I am using for the processing.
>>
>> val rdd1 = sc.textFile("*******").coalesce(1000)
>> rdd1.persist(DISK_ONLY_2) // replication factor 2
>> rdd1.foreachPartition { iter => } // one pass over the data to download
>>
>> The 3rd line fails with the above error when a partition contains a file
>> of size more than 2GB file.
>>
>> Do you think this needs to be fixed in Spark? One idea may be is to use a
>> wrapper class (something called BigByteBuffer) which keeps an array of
>> ByteBuffers and keeps the index of the current buffer being read etc. Below
>> is the modified DiskStore.scala.
>>
>> private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer]
= {
>>   val channel = new RandomAccessFile(file, "r").getChannel
>>   Utils.tryWithSafeFinally {
>>     // For small files, directly read rather than memory map
>>     if (length < minMemoryMapBytes) {
>>       // Map small file in Memory
>>     } else {
>>       // TODO Create a BigByteBuffer
>>
>>     }
>>   } {
>>     channel.close()
>>   }
>> }
>>
>> class BigByteBuffer extends ByteBuffer {
>>   val buffers: Array[ByteBuffer]
>>   var currentIndex = 0
>>
>>   ... // Other methods
>> }
>>
>> Please let me know if there is any other work-around for the same. Thanks for your
time.
>>
>> Regards,
>> Jegan
>>
>
>

Mime
View raw message