spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ahmed Nawar <ahmed.na...@gmail.com>
Subject Re: spark streaming 1.3 kafka topic error
Date Thu, 27 Aug 2015 13:53:08 GMT
Dears,

    I needs to commit DB Transaction for each partition,Not for each row.
below didn't work for me.


rdd.mapPartitions(partitionOfRecords => {

DBConnectionInit()

val results = partitionOfRecords.map(......)

DBConnection.commit()


})



Best regards,

Ahmed Atef Nawwar

Data Management & Big Data Consultant






On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <cody@koeninger.org> wrote:

> Your kafka broker died or you otherwise had a rebalance.
>
> Normally spark retries take care of that.
>
> Is there something going on with your kafka installation, that rebalance
> is taking especially long?
>
> Yes, increasing backoff / max number of retries will "help", but it's
> better to figure out what's going on with kafka.
>
> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <shushantarora09@gmail.com
> > wrote:
>
>> Hi
>>
>> My streaming application gets killed with below error
>>
>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> kafka.common.NotLeaderForPartitionException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>> [testtopic,193]))
>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
>> time 1440626120000 ms
>> org.apache.spark.SparkException:
>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>> org.apache.spark.SparkException: Couldn't find leader offsets for
>> Set([testtopic,115]))
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>>
>>
>>
>> Kafka params in job logs printed are :
>>  value.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>>         key.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>>         block.on.buffer.full = true
>>         retry.backoff.ms = 100
>>         buffer.memory = 1048576
>>         batch.size = 16384
>>         metrics.sample.window.ms = 30000
>>         metadata.max.age.ms = 300000
>>         receive.buffer.bytes = 32768
>>         timeout.ms = 30000
>>         max.in.flight.requests.per.connection = 5
>>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>         metric.reporters = []
>>         client.id =
>>         compression.type = none
>>         retries = 0
>>         max.request.size = 1048576
>>         send.buffer.bytes = 131072
>>         acks = all
>>         reconnect.backoff.ms = 10
>>         linger.ms = 0
>>         metrics.num.samples = 2
>>         metadata.fetch.timeout.ms = 60000
>>
>>
>> Is it kafka broker getting down and job is getting killed ? Whats the
>> best way to handle it ?
>> Increasing retries and backoff time  wil help and to what values those
>> should be set to never have streaming application failure - rather it keep
>> on retrying after few seconds and send a event so that my custom code can
>> send notification of kafka broker down if its because of that.
>>
>>
>> Thanks
>>
>>
>

Mime
View raw message