kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: Another odd error
Date Wed, 14 Dec 2016 12:11:06 GMT
That is correct

On Wed, 14 Dec 2016 at 12:09 Jon Yeargers <jon.yeargers@cedexis.com> wrote:

> I have the app running on 5 machines. Is that what you mean?
>
> On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Hi Jon,
> >
> > Do you have more than one instance of the app running? The reason i ask
> is
> > because the task (task 1_3) that fails with the
> > "java.lang.IllegalStateException" in this log is previously running as a
> > Standby Task. This would mean the active task for this store would have
> > been running elsewhere, but i don't see that in the logs. The exception
> > occurs as StreamThread-1 starts to run task 1_3 as an active task. The
> > exception might indicate that another thread/instance is still writing to
> > the changelog topic for the State Store.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers <jon.yeargers@cedexis.com>
> > wrote:
> >
> > > As near as I can see it's rebalancing constantly.
> > >
> > > I'll up that value and see what happens.
> > >
> > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy <damian.guy@gmail.com>
> > wrote:
> > >
> > > > Hi Jon,
> > > >
> > > > I haven't had much of a chance to look at the logs in detail too much
> > > yet,
> > > > but i have noticed that your app seems to be rebalancing frequently.
> > It
> > > > seems that it is usually around the 300 second mark, which usually
> > would
> > > > mean that poll hasn't been called for at least that long. You might
> > want
> > > to
> > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to
> > > > something
> > > > higher than 300000 (which is the default).
> > > >
> > > > I'll continue to look at your logs and get back to you.
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers <jon.yeargers@cedexis.com>
> > > > wrote:
> > > >
> > > > > attached is a log with lots of disconnections and a small amount
of
> > > > > actual, useful activity.
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers <
> > > jon.yeargers@cedexis.com>
> > > > > wrote:
> > > > >
> > > > > n/m - I understand the logging issue now. Am generating a new one.
> > Will
> > > > > send shortly.
> > > > >
> > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers <
> > > jon.yeargers@cedexis.com>
> > > > > wrote:
> > > > >
> > > > > Yes - saw that one. There were plenty of smaller records available
> > > > though.
> > > > >
> > > > > I sent another log this morning with the level set to DEBUG.
> > Hopefully
> > > > you
> > > > > rec'd it.
> > > > >
> > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy <damian.guy@gmail.com>
> > > > wrote:
> > > > >
> > > > > HI Jon,
> > > > >
> > > > > It looks like you have the logging level for KafkaStreams set to
at
> > > least
> > > > > WARN. I can only see ERROR level logs being produced from Streams.
> > > > >
> > > > > However, i did notice an issue in the logs (not related to your
> > > specific
> > > > > error but you will need to fix anyway):
> > > > >
> > > > > There are lots of messages like:
> > > > > task [2_9] Error sending record to topic
> > > > > PRTMinuteAgg-prt_hour_agg_stream-changelog
> > > > > org.apache.kafka.common.errors.RecordTooLargeException: The message
> > is
> > > > > 2381750 bytes when serialized which is larger than the maximum
> > > > >
> > > > > This means you need to add some extra config to your StreamsConfig:
> > > > > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
> > > > > expectedMaximumMessageSizeBytes)
> > > > >
> > > > > You will also possible need to adjust the broker properties and
> > > > > increase message.max.bytes
> > > > > - it will need to be at least as large as the setting above.
> > > > >
> > > > > At the moment all of the change-logs for your state-stores are
> being
> > > > > dropped due to this issue.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > >
> > > > >
> > > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers <
> jon.yeargers@cedexis.com>
> > > > > wrote:
> > > > >
> > > > > > (am attaching a debug log - note that app terminated with no
> > further
> > > > > > messages)
> > > > > >
> > > > > > topology: kStream -> groupByKey.aggregate(minute) -> foreach
> > > > > >                              \-> groupByKey.aggregate(hour)
->
> > > foreach
> > > > > >
> > > > > >
> > > > > > config:
> > > > > >
> > > > > >         Properties config = new Properties();
> > > > > >         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > BROKER_IP);
> > > > > >         config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > > > ZOOKEEPER_IP);
> > > > > >         config.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > > "PRTMinuteAgg" );
> > > > > >         config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > > > > AggKey.class.getName());
> > > > > >         config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > > > > Serdes.String().getClass().getName());
> > > > > >         config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
> > "snappy");
> > > > > >         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
> > "2");
> > > > > >         config.put(StreamsConfig.STATE_DIR_CONFIG,
> > > > "/mnt/PRTMinuteAgg");
> > > > > >
> > > > > >         config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
"2");
> > > > > >
> > > > > >
> > > > > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang <
> wangguoz@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > Jon,
> > > > > >
> > > > > > To help investigating this issue, could you let me know 1) your
> > > > topology
> > > > > > sketch and 2) your app configs? For example did you enable
> caching
> > in
> > > > > your
> > > > > > apps with the cache.max.bytes.buffering config?
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <
> > > > jon.yeargers@cedexis.com>
> > > > > > wrote:
> > > > > >
> > > > > > > I get this one quite a bit. It kills my app after a short
time
> of
> > > > > > running.
> > > > > > > Driving me nuts.
> > > > > > >
> > > > > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
> > > > > matthias@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Not sure about this one.
> > > > > > > >
> > > > > > > > Can you describe what you do exactly? Can you reproduce
the
> > > issue?
> > > > We
> > > > > > > > definitely want to investigate this.
> > > > > > > >
> > > > > > > > -Matthias
> > > > > > > >
> > > > > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > > > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > > > > > > >
> > > > > > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User
provided
> > listener
> > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1
> > for
> > > > > group
> > > > > > > > > MinuteAgg failed on partition assignment
> > > > > > > > >
> > > > > > > > > java.lang.IllegalStateException: task [1_9] Log
end offset
> > > > should
> > > > > not
> > > > > > > > > change while restoring
> > > > > > > > >
> > > > > > > > >         at
> > > > >
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorStateManager.
> > > > > > > > restoreActiveState(ProcessorStateManager.java:245)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorStateManager.
> > > > >
> > > > > > > > register(ProcessorStateManager.java:198)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > RocksDBWindowStore.init(
> > > > > > > > RocksDBWindowStore.java:206)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > MeteredWindowStore.init(
> > > > > > > > MeteredWindowStore.java:66)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > CachingWindowStore.init(
> > > > > > > > CachingWindowStore.java:64)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > > > > > > initializeStateStores(AbstractTask.java:81)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamTask.<init>(StreamTask.java:120)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.createStreamTask(StreamThread.java:633)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > >
> org.apache.kafka.streams.processor.internals.StreamThread.ac
> > > > > cess$100(
> > > > > > > > StreamThread.java:69)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > >
> org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > > > > > > onPartitionsAssigned(StreamThread.java:124)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > >
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > > > > > > onJoinComplete(ConsumerCoordinator.java:228)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > >
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > >
> > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > > > > > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > >
> > > > > org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > > > > > > ConsumerCoordinator.java:259)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > > > > pollOnce(KafkaConsumer.java:1013)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > > > > > KafkaConsumer.java:979)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.runLoop(
> > > > > > > > StreamThread.java:407)
> > > > > > > > >
> > > > > > > > >         at
> > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.run(StreamThread.java:242)
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message