spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ahmed Nawar <ahmed.na...@gmail.com>
Subject Commit DB Transaction for each partition
Date Thu, 27 Aug 2015 18:42:43 GMT
Thanks for foreach idea. But once i used it i got empty rdd. I think
because "results" is an iterator.

Yes i know "Map is lazy" but i expected there is solution to force action.

I can not use foreachPartition because i need reuse the new RDD after some
maps.



On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <cody@koeninger.org> wrote:

>
> Map is lazy.  You need an actual action, or nothing will happen.  Use
> foreachPartition, or do an empty foreach after the map.
>
> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.nawar@gmail.com>
> wrote:
>
>> Dears,
>>
>>     I needs to commit DB Transaction for each partition,Not for each row.
>> below didn't work for me.
>>
>>
>> rdd.mapPartitions(partitionOfRecords => {
>>
>> DBConnectionInit()
>>
>> val results = partitionOfRecords.map(......)
>>
>> DBConnection.commit()
>>
>>
>> })
>>
>>
>>
>> Best regards,
>>
>> Ahmed Atef Nawwar
>>
>> Data Management & Big Data Consultant
>>
>>
>>
>>
>>
>>
>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>>
>>> Your kafka broker died or you otherwise had a rebalance.
>>>
>>> Normally spark retries take care of that.
>>>
>>> Is there something going on with your kafka installation, that rebalance
>>> is taking especially long?
>>>
>>> Yes, increasing backoff / max number of retries will "help", but it's
>>> better to figure out what's going on with kafka.
>>>
>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>> shushantarora09@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> My streaming application gets killed with below error
>>>>
>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>> kafka.common.NotLeaderForPartitionException,
>>>> kafka.common.NotLeaderForPartitionException,
>>>> kafka.common.NotLeaderForPartitionException,
>>>> kafka.common.NotLeaderForPartitionException,
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>> [testtopic,193]))
>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs
>>>> for time 1440626120000 ms
>>>> org.apache.spark.SparkException:
>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>> org.apache.spark.SparkException: Couldn't find leader offsets for
>>>> Set([testtopic,115]))
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>> at
>>>>
>>>>
>>>>
>>>> Kafka params in job logs printed are :
>>>>  value.serializer = class
>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>         key.serializer = class
>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>         block.on.buffer.full = true
>>>>         retry.backoff.ms = 100
>>>>         buffer.memory = 1048576
>>>>         batch.size = 16384
>>>>         metrics.sample.window.ms = 30000
>>>>         metadata.max.age.ms = 300000
>>>>         receive.buffer.bytes = 32768
>>>>         timeout.ms = 30000
>>>>         max.in.flight.requests.per.connection = 5
>>>>         bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
>>>>         metric.reporters = []
>>>>         client.id =
>>>>         compression.type = none
>>>>         retries = 0
>>>>         max.request.size = 1048576
>>>>         send.buffer.bytes = 131072
>>>>         acks = all
>>>>         reconnect.backoff.ms = 10
>>>>         linger.ms = 0
>>>>         metrics.num.samples = 2
>>>>         metadata.fetch.timeout.ms = 60000
>>>>
>>>>
>>>> Is it kafka broker getting down and job is getting killed ? Whats the
>>>> best way to handle it ?
>>>> Increasing retries and backoff time  wil help and to what values those
>>>> should be set to never have streaming application failure - rather it keep
>>>> on retrying after few seconds and send a event so that my custom code can
>>>> send notification of kafka broker down if its because of that.
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>
>>
>

Mime
View raw message