kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sourabh Chandak <sourabh3...@gmail.com>
Subject Re: ERROR BoundedByteBufferReceive: OOME with size 352518400
Date Fri, 25 Sep 2015 15:37:00 GMT
Thanks Cody I was able to find out the issue yesterday after sending the
last email.

On Friday, September 25, 2015, Cody Koeninger <cody@koeninger.org> wrote:

> So you're still having a problem getting partitions or offsets from kafka
> when creating the stream.  You can try each of those kafka operations
> individually (getPartitions / getLatestLeaderOffsets)
>
> checkErrors should be dealing with an arraybuffer of throwables, not just
> a single one.  Is that the only error you're seeing, or are there more?
>
> You can also modify it to call printStackTrace or whatever on each
> individual error, instead of only printing the message.
>
>
>
>
> On Thu, Sep 24, 2015 at 5:00 PM, Sourabh Chandak <sourabh3934@gmail.com
> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>
>> I was able to get pass this issue. I was pointing the SSL port whereas
>> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
>> am getting the following error:
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.BufferUnderflowException
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>>         at scala.util.Either.fold(Either.scala:97)
>>         at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>>         at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>>         at
>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> Thanks,
>> Sourabh
>>
>> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger <cody@koeninger.org
>> <javascript:_e(%7B%7D,'cvml','cody@koeninger.org');>> wrote:
>>
>>> That looks like the OOM is in the driver, when getting partition
>>> metadata to create the direct stream.  In that case, executor memory
>>> allocation doesn't matter.
>>>
>>> Allocate more driver memory, or put a profiler on it to see what's
>>> taking up heap.
>>>
>>>
>>>
>>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak <sourabh3934@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>>>
>>>> Adding Cody and Sriharsha
>>>>
>>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak <sourabh3934@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','sourabh3934@gmail.com');>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have ported receiver less spark streaming for kafka to Spark 1.2 and
>>>>> am trying to run a spark streaming job to consume data form my broker,
but
>>>>> I am getting the following error:
>>>>>
>>>>> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
>>>>> 352518400
>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>>>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>>>>>         at
>>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>         at
>>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>         at
>>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>>>>         at
>>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
>>>>>         at
>>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
>>>>>         at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>         at
>>>>> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>>>>         at org.apache.spark.streaming.kafka.KafkaCluster.org
>>>>> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>>>>         at
>>>>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:296)
>>>>>         at
>>>>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:35)
>>>>>         at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:58)
>>>>>         at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>>>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>
>>>>>
>>>>>
>>>>> I have tried allocating 100G of memory with 1 executor but it is still
>>>>> failing.
>>>>>
>>>>> Spark version: 1.2.2
>>>>> Kafka version ported: 0.8.2
>>>>> Kafka server version: trunk version with SSL enabled
>>>>>
>>>>> Can someone please help me debug this.
>>>>>
>>>>> Thanks,
>>>>> Sourabh
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message