spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dibyendu Bhattacharya <dibyendu.bhattach...@gmail.com>
Subject Re: spark streaming 1.3 kafka error
Date Sat, 22 Aug 2015 16:28:28 GMT
I think you also can give a try to this consumer :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your
environment. This has been running fine for topic with large number of
Kafka partition ( > 200 ) like yours without any issue.. no issue with
connection as this consumer re-use kafka connection , and also can recover
from any failures ( network loss , Kafka leader goes down, ZK down etc ..).


Regards,
Dibyendu

On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora <shushantarora09@gmail.com>
wrote:

> On trying the consumer without external connections  or with low number of
> external conections its working fine -
>
> so doubt is how  socket got closed -
>
> java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
>
>
>
> On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das <akhil@sigmoidanalytics.com>
> wrote:
>
>> Can you try some other consumer and see if the issue still exists?
>> On Aug 22, 2015 12:47 AM, "Shushant Arora" <shushantarora09@gmail.com>
>> wrote:
>>
>>> Exception comes when client has so many connections to some another
>>> external server also.
>>> So I think Exception is coming because of client side issue only- server
>>> side there is no issue.
>>>
>>>
>>> Want to understand is executor(simple consumer) not making new
>>> connection to kafka broker at start of each task ? Or is it created once
>>> only and that is getting closed somehow ?
>>>
>>> On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> it comes at start of each tasks when there is new data inserted in
>>>> kafka.( data inserted is very few)
>>>> kafka topic has 300 partitions - data inserted is ~10 MB.
>>>>
>>>> Tasks gets failed and it retries which succeed and after certain no of
>>>> fail tasks it kills the job.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <akhil@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> That looks like you are choking your kafka machine. Do a top on the
>>>>> kafka machines and see the workload, it may happen that you are spending
>>>>> too much time on disk io etc.
>>>>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <cody@koeninger.org>
wrote:
>>>>>
>>>>>> Sounds like that's happening consistently, not an occasional network
>>>>>> problem?
>>>>>>
>>>>>> Look at the Kafka broker logs
>>>>>>
>>>>>> Make sure you've configured the correct kafka broker hosts / ports
>>>>>> (note that direct stream does not use zookeeper host / port).
>>>>>>
>>>>>> Make sure that host / port is reachable from your driver and worker
>>>>>> nodes, ie telnet or netcat to it.  It looks like your driver can
reach it
>>>>>> (since there's partition info in the logs), but that doesn't mean
the
>>>>>> worker can.
>>>>>>
>>>>>> Use lsof / netstat to see what's going on with those ports while
the
>>>>>> job is running, or tcpdump if you need to.
>>>>>>
>>>>>> If you can't figure out what's going on from a networking point of
>>>>>> view, post a minimal reproducible code sample that demonstrates the
issue,
>>>>>> so it can be tested in a different environment.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>>
>>>>>>> Getting below error in spark streaming 1.3 while consuming from
kafka using directkafka stream. Few of tasks are getting failed in each run.
>>>>>>>
>>>>>>>
>>>>>>> What is the reason /solution of this error?
>>>>>>>
>>>>>>>
>>>>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task
262.0 in stage 130.0 (TID 16332)
>>>>>>> java.io.EOFException: Received -1 when reading from channel,
socket has likely been closed.
>>>>>>> 	at kafka.utils.Utils$.read(Utils.scala:376)
>>>>>>> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>>>> 	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>>>> 	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>>>> 	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>>>> 	at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>>>>>> 	at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>>>> 	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>>>> 	at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>>>>>> 	at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>>>>> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>>>>>> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>>>> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>>>> 	at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>>>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend:
Got assigned task 16348
>>>>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1
in stage 130.0 (TID 16348)
>>>>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic test_hbrealtimeevents,
partition 75 offsets 4701 -> 4718
>>>>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying
properties
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>

Mime
View raw message