kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: producer exceptions when broker dies
Date Fri, 25 Oct 2013 18:35:02 GMT
Aniket is exactly right. In general, Kafka provides "at least once"
guarantee instead of "exactly once".

Guozhang


On Fri, Oct 25, 2013 at 11:13 AM, Aniket Bhatnagar <
aniket.bhatnagar@gmail.com> wrote:

> As per my understanding, if the broker says the msg is committed,  its
> guaranteed to have been committed as per ur ack config. If it says it did
> not get committed, then its very hard to figure out if this was just a
> false error. Since there is concept of unique ids for messages, a replay of
> the same message will result in duplication. I think its a reasonable
> behaviour considering kafka prefers to append data to partitions fot
> performance reasons.
> The best way to right now deal with duplicate msgs is to build the
> processing engine (layer where your consumer sits) to deal with at least
> once semantics of the broker.
> On 25 Oct 2013 23:23, "Kane Kane" <kane.isturm@gmail.com> wrote:
>
> > Or, to rephrase it more generally, is there a way to know exactly if
> > message was committed or no?
> >
> >
> > On Fri, Oct 25, 2013 at 10:43 AM, Kane Kane <kane.isturm@gmail.com>
> wrote:
> >
> > > Hello Guozhang,
> > >
> > > My partitions are split almost evenly between broker, so, yes - broker
> > > that I shutdown is the leader for some of them. Does it mean i can get
> an
> > > exception and data is still being written? Is there any setting on the
> > > broker where i can control this? I.e. can i make broker replication
> > timeout
> > > shorter than producer timeout, so i can ensure if i get an exception
> data
> > > is not being committed?
> > >
> > > Thanks.
> > >
> > >
> > > On Fri, Oct 25, 2013 at 10:36 AM, Guozhang Wang <wangguoz@gmail.com
> > >wrote:
> > >
> > >> Hello Kane,
> > >>
> > >> As discussed in the other thread, even if a timeout response is sent
> > back
> > >> to the producer, the message may still be committed.
> > >>
> > >> Did you shut down the leader broker of the partition or a follower
> > broker?
> > >>
> > >> Guozhang
> > >>
> > >> On Fri, Oct 25, 2013 at 8:45 AM, Kane Kane <kane.isturm@gmail.com>
> > wrote:
> > >>
> > >> > I have cluster of 3 kafka brokers. With the following script I send
> > some
> > >> > data to kafka and in the middle do the controlled shutdown of 1
> > broker.
> > >> All
> > >> > 3 brokers are ISR before I start sending. When i shutdown the
> broker i
> > >> get
> > >> > a couple of exceptions and I expect data shouldn't be written. Say,
> I
> > >> send
> > >> > 1500 lines and get 50 exceptions. I expect to consume 1450 lines,
> but
> > >> > instead i always consume more, i.e. 1480 or 1490. I want to decide
> if
> > I
> > >> > want to retry sending myself, not using message.send.max.retries.
> But
> > >> looks
> > >> > like if I retry sending if there is an exception - I will end up
> with
> > >> > duplicates. Is there anything I'm doing wrong or having wrong
> > >> assumptions
> > >> > about kafka?
> > >> >
> > >> > Thanks.
> > >> >
> > >> > val prod = new MyProducer("10.80.42.147:9092,10.80.42.154:9092,
> > >> > 10.80.42.156:9092")
> > >> > var count = 0
> > >> > for(line <- Source.fromFile(file).getLines()){
> > >> >     try {
> > >> >       prod.send("benchmark", buffer.toList)
> > >> >       count += 1
> > >> >       println("sent %s", count)
> > >> >     } catch {
> > >> >       case _ => println("Exception!")
> > >> >     }
> > >> > }
> > >> >
> > >> > class MyProducer(brokerList: String) {
> > >> >   val sync = true
> > >> >   val requestRequiredAcks = "-1"
> > >> >
> > >> >   val props = new Properties()
> > >> >   props.put("metadata.broker.list", brokerList)
> > >> >   props.put("producer.type", if(sync) "sync" else "async")
> > >> >   props.put("request.required.acks", requestRequiredAcks)
> > >> >   props.put("key.serializer.class", classOf[StringEncoder].getName)
> > >> >   props.put("serializer.class", classOf[StringEncoder].getName)
> > >> >   props.put("message.send.max.retries", "0")
> > >> >   props.put("request.timeout.ms", "2000")
> > >> >
> > >> >   val producer = new Producer[AnyRef, AnyRef](new
> > ProducerConfig(props))
> > >> >
> > >> >   def send(topic: String, messages: List[String]) = {
> > >> >     val requests = new ArrayBuffer[KeyedMessage[AnyRef, AnyRef]]
> > >> >     for (message <- messages) {
> > >> >       requests += new KeyedMessage(topic, null, message, message)
> > >> >     }
> > >> >     producer.send(requests)
> > >> >   }
> > >> > }
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message