kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: producer exceptions when broker dies
Date Fri, 25 Oct 2013 17:36:30 GMT
Hello Kane,

As discussed in the other thread, even if a timeout response is sent back
to the producer, the message may still be committed.

Did you shut down the leader broker of the partition or a follower broker?

Guozhang

On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.isturm@gmail.com> wrote:

> I have cluster of 3 kafka brokers. With the following script I send some
> data to kafka and in the middle do the controlled shutdown of 1 broker. All
> 3 brokers are ISR before I start sending. When i shutdown the broker i get
> a couple of exceptions and I expect data shouldn't be written. Say, I send
> 1500 lines and get 50 exceptions. I expect to consume 1450 lines, but
> instead i always consume more, i.e. 1480 or 1490. I want to decide if I
> want to retry sending myself, not using message.send.max.retries. But looks
> like if I retry sending if there is an exception - I will end up with
> duplicates. Is there anything I'm doing wrong or having wrong assumptions
> about kafka?
>
> Thanks.
>
> val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> 10.80.42.156:9092")
> var count = 0
> for(line <- Source.fromFile(file).getLines()){
>     try {
>       prod.send("benchmark", buffer.toList)
>       count += 1
>       println("sent %s", count)
>     } catch {
>       case _ => println("Exception!")
>     }
> }
>
> class MyProducer(brokerList: String) {
>   val sync = true
>   val requestRequiredAcks = "-1"
>
>   val props = new Properties()
>   props.put("metadata.broker.list", brokerList)
>   props.put("producer.type", if(sync) "sync" else "async")
>   props.put("request.required.acks", requestRequiredAcks)
>   props.put("key.serializer.class", classOf[StringEncoder].getName)
>   props.put("serializer.class", classOf[StringEncoder].getName)
>   props.put("message.send.max.retries", "0")
>   props.put("request.timeout.ms", "2000")
>
>   val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
>
>   def send(topic: String, messages: List[String]) = {
>     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
>     for (message <- messages) {
>       requests += new KeyedMessage(topic, null, message, message)
>     }
>     producer.send(requests)
>   }
> }
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message