kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: AW: Consumer with another group.id conflicts with streams()
Date Wed, 03 May 2017 21:50:34 GMT
I just confirmed.

`KafkaConsumer.close()` should be idempotent. It's a bug in the consumer.

https://issues.apache.org/jira/browse/KAFKA-5169


-Matthias



On 5/3/17 2:20 PM, Matthias J. Sax wrote:
> Yes, Streams might call "close" multiple times, as we assume it's an
> idempotent operations.
> 
> Thus, if you close "your" Consumer in `DebugTransformer.close()`, the
> operation is not idempotent anymore, and thus fails.
> `KafkaConsumer.close()` is not idempotent :(
> 
> You can just use a "try-catch" when you call `Consumer.close()` and
> swallow the exception. This should fix the issue.
> 
> If you have only one Streams instance, this does not hit, at we don't
> call "close" twice.
> 
> 
> -Matthias
> 
> 
> On 5/3/17 9:34 AM, Andreas Voss wrote:
>> Hi again,
>>
>> One more observation: The problem only occurs when the two application instances
are started one after the other with some delay in-between (7 seconds in my test setup). So
the first instance already started to process the events that were in the queue, when the
second instance came up. Looks like the second instance tries to initiate a rebalance that
fails.
>>
>> Thanks,
>> Andreas
>>
>>
>> -----Urspr√ľngliche Nachricht-----
>> Von: Narendra Kumar
>> Gesendet: Mittwoch, 3. Mai 2017 13:11
>> An: Andreas Voss <AndreasVoss@fico.com>; users@kafka.apache.org
>> Cc: Wladimir Schmidt <WladimirSchmidt@fico.com>; Christian Leiner <ChristianLeiner@fico.com>
>> Betreff: RE: Consumer with another group.id conflicts with streams()
>>
>> Hi Matthias,
>>
>> I enabled logs in the application and this is what I have observed -->
>>
>> While rebalancing ProcessorNode.close() is getting  called twice, once from StreamThread.suspendTasksAndState()
and once from StreamThread.closeNonAssignedSuspendedTasks(). DebugTranDebugTransformer has
instance field of type KafkaConsumer to do some lookup. DebugTransformer.close() throws  exception
when called second time (i.e. IllegalStateException from  KafkaConsumer instance because it
is already closed.), we fail to close the task's state manager ( i.e. call to task.closeStateManager(true);
fails). StateManager is still holding lock to the task's state directory. After rebalance,
if the same task id is launched on same application instance but in different thread then
the task get stuck because it fails to get lock to the task's state directory. The task get
stuck forever and can't process any data from here. Depending on how much data was there on
the partition at the time of rebalance we see different number of alerts.
>>
>> I have attached the log file for reference.
>>
>> Thanks,
>> Narendra
>>
>>
>> -----Original Message-----
>> From: Andreas Voss
>> Sent: Tuesday, April 25, 2017 1:27 PM
>> To: users@kafka.apache.org
>> Cc: Narendra Kumar <NarendraKumar@fico.com>; Wladimir Schmidt <WladimirSchmidt@fico.com>;
Christian Leiner <ChristianLeiner@fico.com>
>> Subject: Re: Consumer with another group.id conflicts with streams()
>>
>> Hi Matthias,
>>
>> thank you for your response. Here are some more details:
>>
>> - my input and output topics have 6 partitions each.
>> - my application instances run from docker images so the is no state left over from
a previous run
>> - I have a single docker container running kafka + zookeeper (spotify/kafka).
>> - when the second consumer is in place, I receive a random number of records in the
target topic (e.g. I send 1000 records and receive 439 on first run and 543 on second run)
>> - the problem only occurs if two instances of the application are running. If I only
start one instance then it's slow but when I send 1000 records I also receive 1000 records.
I think this is also an indicator for a bug, because a streaming app should behave the same,
independent of whether one or two instances are running.
>> - I added the properties you suggested, but behavior did not change.
>>
>> I think this is a bug, consumers of different groups should not interact with each
other. Should I submit a bug report and if so, any suggestions on how to do that?
>>
>> Andreas
>>
>>
>> -----Urspr√ľngliche Nachricht-----
>> Von: Matthias J. Sax [mailto:matthias@confluent.io]
>> Gesendet: Montag, 24. April 2017 19:18
>> An: users@kafka.apache.org
>> Betreff: Re: Consumer with another group.id conflicts with streams()
>>
>> Hi,
>>
>> hard to diagnose. The new consumer should not affect the Streams app though -- even
if I am wondering why you need it.
>>
>>> KafkaConsumer (with a UUID as group.id) that reads some historical
>>> data from input topic
>>
>> Maybe using GlobalKTable instead might be a better solution?
>>
>>> (i.e. I feed 1000 records into source topic and receive around 200 on
>>> the target topic)
>>
>> Are this the first 200 records? Or are those 200 record "random ones"
>> from your input topic? How many partitions do you have for input/output topic?
>>
>>> looks like a lot of rebalancing happens.
>>
>> We recommend to change StreamsConfig values as follows to improve in rebalance behavior:
>>
>>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
>>> from default of 0
>>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
>>> from default of 300 s
>>
>> We will change the default values accordingly in future release but for now you should
set this manually.
>>
>>
>> Hope this helps.
>>
>>
>> -Matthias
>>
>> On 4/24/17 10:01 AM, Andreas Voss wrote:
>>> Hi, I have a simple streaming app that copies data from one topic to another,
so when I feed 1000 records into source topic I receive 1000 records in the target topic.
Also the app contains a transform() step which does nothing, except instantiating a KafkaConsumer
(with a UUID as group.id) that reads some historical data from input topic. As soon as this
consumer is in place, the streaming app does not work anymore, records get lost (i.e. I feed
1000 records into source topic and receive around 200 on the target topic) and it's terribly
slow - looks like a lot of rebalancing happens.
>>>
>>> To me this looks like a bug, because the KStreamBuilder uses the application
id as group.id ("kafka-smurfing" in this case), and the transformer uses a different one (uuid).
>>>
>>> Full source code:
>>>
>>> public class Main {
>>>
>>>   public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092";
>>>   public static final String SOURCE_TOPIC = "transactions";
>>>   public static final String TARGET_TOPIC =  "alerts";
>>>
>>>   public static void main(String[] args) throws Exception {
>>>
>>>     KStreamBuilder builder = new KStreamBuilder();
>>>     builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC)
>>>            .transform(() -> new DebugTransformer())
>>>            .to(Serdes.String(), Serdes.String(), TARGET_TOPIC);
>>>
>>>     Properties props = new Properties();
>>>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing");
>>>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS);
>>>     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
>>>     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
>>>     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
>>>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>
>>>     KafkaStreams streams = new KafkaStreams(builder, props);
>>>     streams.start();
>>>     Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
>>>
>>>   }
>>>
>>> }
>>>
>>> public class DebugTransformer implements Transformer<String, String,
>>> KeyValue<String, String>> {
>>>
>>>   private KafkaConsumer<String, String> consumer;
>>>   private ProcessorContext context;
>>>
>>>   @Override
>>>   public void init(ProcessorContext context) {
>>>     this.context = context;
>>>     Properties props = new Properties();
>>>     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Main.BOOTSTRAP_SERVERS);
>>>     props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
>>>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
>>>     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>>>     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
>>>     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
>>>     consumer = new KafkaConsumer<>(props);
>>>   }
>>>
>>>   @Override
>>>   public KeyValue<String, String> transform(String key, String value) {
>>>     TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, context.partition());
>>>     consumer.assign(Arrays.asList(partition));
>>>     consumer.seek(partition, 0);
>>>     consumer.poll(100);
>>>     return KeyValue.pair(key, value);
>>>   }
>>>
>>>   @Override
>>>   public void close() {
>>>     consumer.close();
>>>   }
>>>
>>>   @Override
>>>   public KeyValue<String, String> punctuate(long timestamp) {
>>>     return null;
>>>   }
>>>
>>> }
>>>
>>> Thanks for any hints,
>>> Andreas
>>>
>>> This email and any files transmitted with it are confidential, proprietary and
intended solely for the individual or entity to whom they are addressed. If you have received
this email in error please delete it immediately.
>>>
>>
>> This email and any files transmitted with it are confidential, proprietary and intended
solely for the individual or entity to whom they are addressed. If you have received this
email in error please delete it immediately.
>>
> 


Mime
View raw message