kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka Stream stops polling new messages
Date Tue, 02 May 2017 18:20:15 GMT
Did you check the logs? Maybe you need to increase log level to DEBUG to
get some more information.

Did you double check committed offsets via bin/kafka-consumer-groups.sh?

-Matthias

On 4/28/17 9:22 AM, João Peixoto wrote:
> My stream gets stale after a while and it simply does not receive any new
> messages, aka does not poll.
> 
> I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and the
> brokers are running 0.10.1.1.
> 
> The stream state is RUNNING and there are no exceptions in the logs.
> 
> Looking at the JMX metrics, the threads are there and running, just not
> doing anything.
> The metric "consumer-coordinator-metrics > heartbeat-response-time-max"
> (The max time taken to receive a response to a heartbeat request) reads
> 43,361 seconds (almost 12 hours) which is consistent with the time of the
> hang. Shouldn't this trigger a failure somehow?
> 
> The stream configuration looks something like this:
> 
> Properties props = new Properties();
>     props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>               CustomTimestampExtractor.class.getName());
>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
>     props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName);
>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> myConfig.getBrokerList());
>     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.ByteArray().getClass().getName());
>     props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> myConfig.getCommitIntervalMs()); // 5000
>     props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
>     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> myConfig.getStreamThreadsCount()); // 1
>     props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> myConfig.getMaxCacheBytes()); // 524_288_000L
>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>     props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
> 
> The stream LEFT JOINs 2 topics, one of them being a KTable, and outputs to
> another topic.
> 
> Thanks in advance for the help!
> 


Mime
View raw message