spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ahmed Nawar <ahmed.na...@gmail.com>
Subject Re: Commit DB Transaction for each partition
Date Thu, 27 Aug 2015 20:30:39 GMT
Thanks a lot for your support. It is working now.
I wrote it like below


val newRDD = rdd.mapPartitions { partition => {

  val result = partition.map(.....)

  result
}
}

newRDD.foreach {

}


On Thu, Aug 27, 2015 at 10:34 PM, Cody Koeninger <cody@koeninger.org> wrote:

> This job contains a spark output action, and is what I originally meant:
>
>
> rdd.mapPartitions {
>   result
> }.foreach {
>
> }
>
> This job is just a transformation, and won't do anything unless you have
> another output action.  Not to mention, it will exhaust the iterator, as
> you noticed:
>
> rdd.mapPartitions {
>   result.foreach
>   result
> }
>
>
>
> On Thu, Aug 27, 2015 at 2:22 PM, Ahmed Nawar <ahmed.nawar@gmail.com>
> wrote:
>
>> Yes, of course, I am doing that. But once i added results.foreach(row=>
>> {})   i pot empty RDD.
>>
>>
>>
>> rdd.mapPartitions(partitionOfRecords => {
>>
>> DBConnectionInit()
>>
>> val results = partitionOfRecords.map(......)
>>
>> DBConnection.commit()
>>
>> results.foreach(row=> {})
>>
>> results
>>
>> })
>>
>>
>>
>> On Thu, Aug 27, 2015 at 10:18 PM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>>
>>> You need to return an iterator from the closure you provide to
>>> mapPartitions
>>>
>>> On Thu, Aug 27, 2015 at 1:42 PM, Ahmed Nawar <ahmed.nawar@gmail.com>
>>> wrote:
>>>
>>>> Thanks for foreach idea. But once i used it i got empty rdd. I think
>>>> because "results" is an iterator.
>>>>
>>>> Yes i know "Map is lazy" but i expected there is solution to force
>>>> action.
>>>>
>>>> I can not use foreachPartition because i need reuse the new RDD after
>>>> some maps.
>>>>
>>>>
>>>>
>>>> On Thu, Aug 27, 2015 at 5:11 PM, Cody Koeninger <cody@koeninger.org>
>>>> wrote:
>>>>
>>>>>
>>>>> Map is lazy.  You need an actual action, or nothing will happen.  Use
>>>>> foreachPartition, or do an empty foreach after the map.
>>>>>
>>>>> On Thu, Aug 27, 2015 at 8:53 AM, Ahmed Nawar <ahmed.nawar@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Dears,
>>>>>>
>>>>>>     I needs to commit DB Transaction for each partition,Not for each
>>>>>> row. below didn't work for me.
>>>>>>
>>>>>>
>>>>>> rdd.mapPartitions(partitionOfRecords => {
>>>>>>
>>>>>> DBConnectionInit()
>>>>>>
>>>>>> val results = partitionOfRecords.map(......)
>>>>>>
>>>>>> DBConnection.commit()
>>>>>>
>>>>>>
>>>>>> })
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Ahmed Atef Nawwar
>>>>>>
>>>>>> Data Management & Big Data Consultant
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 27, 2015 at 4:16 PM, Cody Koeninger <cody@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Your kafka broker died or you otherwise had a rebalance.
>>>>>>>
>>>>>>> Normally spark retries take care of that.
>>>>>>>
>>>>>>> Is there something going on with your kafka installation, that
>>>>>>> rebalance is taking especially long?
>>>>>>>
>>>>>>> Yes, increasing backoff / max number of retries will "help",
but
>>>>>>> it's better to figure out what's going on with kafka.
>>>>>>>
>>>>>>> On Wed, Aug 26, 2015 at 9:07 PM, Shushant Arora <
>>>>>>> shushantarora09@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> My streaming application gets killed with below error
>>>>>>>>
>>>>>>>> 5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
>>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>>> kafka.common.NotLeaderForPartitionException,
>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets
for
>>>>>>>> Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
>>>>>>>> [testtopic,193]))
>>>>>>>> 15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating
>>>>>>>> jobs for time 1440626120000 ms
>>>>>>>> org.apache.spark.SparkException:
>>>>>>>> ArrayBuffer(kafka.common.NotLeaderForPartitionException,
>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets
for
>>>>>>>> Set([testtopic,115]))
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>>>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>>>>> at
>>>>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>>>>> at
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Kafka params in job logs printed are :
>>>>>>>>  value.serializer = class
>>>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>>>         key.serializer = class
>>>>>>>> org.apache.kafka.common.serialization.StringSerializer
>>>>>>>>         block.on.buffer.full = true
>>>>>>>>         retry.backoff.ms = 100
>>>>>>>>         buffer.memory = 1048576
>>>>>>>>         batch.size = 16384
>>>>>>>>         metrics.sample.window.ms = 30000
>>>>>>>>         metadata.max.age.ms = 300000
>>>>>>>>         receive.buffer.bytes = 32768
>>>>>>>>         timeout.ms = 30000
>>>>>>>>         max.in.flight.requests.per.connection = 5
>>>>>>>>         bootstrap.servers = [broker1:9092, broker2:9092,
>>>>>>>> broker3:9092]
>>>>>>>>         metric.reporters = []
>>>>>>>>         client.id =
>>>>>>>>         compression.type = none
>>>>>>>>         retries = 0
>>>>>>>>         max.request.size = 1048576
>>>>>>>>         send.buffer.bytes = 131072
>>>>>>>>         acks = all
>>>>>>>>         reconnect.backoff.ms = 10
>>>>>>>>         linger.ms = 0
>>>>>>>>         metrics.num.samples = 2
>>>>>>>>         metadata.fetch.timeout.ms = 60000
>>>>>>>>
>>>>>>>>
>>>>>>>> Is it kafka broker getting down and job is getting killed
? Whats
>>>>>>>> the best way to handle it ?
>>>>>>>> Increasing retries and backoff time  wil help and to what
values
>>>>>>>> those should be set to never have streaming application failure
- rather it
>>>>>>>> keep on retrying after few seconds and send a event so that
my custom code
>>>>>>>> can send notification of kafka broker down if its because
of that.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message