spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: spark streaming 1.3 kafka topic error
Date Thu, 27 Aug 2015 14:11:28 GMT
Map is lazy.  You need an actual action, or nothing will happen.  Use
foreachPartition, or do an empty foreach after the map.

On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.nawar@gmail.com> wrote:

> Dears,
>
>     I needs to commit DB Transaction for each partition,Not for each row.
> below didn't work for me.
>
>
> rdd.mapPartitions(partitionOfRecords => {
>
> DBConnectionInit()
>
> val results = partitionOfRecords.map(......)
>
> DBConnection.commit()
>
>
> })
>
>
>
> Best regards,
>
> Ahmed Atef Nawwar
>
> Data Management & Big Data Consultant
>
>
>
>
>
>
> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <cody@koeninger.org>
> wrote:
>
>> Your kafka broker died or you otherwise had a rebalance.
>>
>> Normally spark retries take care of that.
>>
>> Is there something going on with your kafka installation, that rebalance
>> is taking especially long?
>>
>> Yes, increasing backoff / max number of retries will "help", but it's
>> better to figure out what's going on with kafka.
>>
>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>> shushantarora09@gmail.com> wrote:
>>
>>> Hi
>>>
>>> My streaming application gets killed with below error
>>>
>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> kafka.common.NotLeaderForPartitionException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>> [testtopic,193]))
>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>> for time 1440626120000 ms
>>> org.apache.spark.SparkException:
>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>> Set([testtopic,115]))
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>> at
>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>> at
>>>
>>>
>>>
>>> Kafka params in job logs printed are :
>>>  value.serializer = class
>>> org.apache.kafka.common.serialization.StringSerializer
>>>         key.serializer = class
>>> org.apache.kafka.common.serialization.StringSerializer
>>>         block.on.buffer.full = true
>>>         retry.backoff.ms = 100
>>>         buffer.memory = 1048576
>>>         batch.size = 16384
>>>         metrics.sample.window.ms = 30000
>>>         metadata.max.age.ms = 300000
>>>         receive.buffer.bytes = 32768
>>>         timeout.ms = 30000
>>>         max.in.flight.requests.per.connection = 5
>>>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>>         metric.reporters = []
>>>         client.id =
>>>         compression.type = none
>>>         retries = 0
>>>         max.request.size = 1048576
>>>         send.buffer.bytes = 131072
>>>         acks = all
>>>         reconnect.backoff.ms = 10
>>>         linger.ms = 0
>>>         metrics.num.samples = 2
>>>         metadata.fetch.timeout.ms = 60000
>>>
>>>
>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>> best way to handle it ?
>>> Increasing retries and backoff time  wil help and to what values those
>>> should be set to never have streaming application failure - rather it keep
>>> on retrying after few seconds and send a event so that my custom code can
>>> send notification of kafka broker down if its because of that.
>>>
>>>
>>> Thanks
>>>
>>>
>>
>

Mime
View raw message