kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: kafka streams passes org.apache.kafka.streams.kstream.internals.Change to my app!!
Date Fri, 09 Dec 2016 08:06:28 GMT
Hi Ara,

It is a bug in 0.10.1 that has been fixed:
https://issues.apache.org/jira/browse/KAFKA-4311
To work around it you should set
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to 0
The fix is available on trunk and 0.10.1 branch and there will be a
0.10.1.1 release any day now.

Thanks,
Damian

On Fri, 9 Dec 2016 at 01:12 Ara Ebrahimi <ara.ebrahimi@argyledata.com>
wrote:

> Hi,
>
> Once in a while and quite randomly this happens, but it does happen every
> few hundred thousand message:
>
> 2016-12-03 11:48:05 ERROR StreamThread:249 - stream-thread
> [StreamThread-4] Streams application error during processing:
> java.lang.ClassCastException:
> org.apache.kafka.streams.kstream.internals.Change cannot be cast to
> com.argyledata.streams.entity.Activity
> at com.argyledata.streams.StreamPipeline$$Lambda$14/33419717.apply(Unknown
> Source)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
> at
> org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateValueGetter.get(KStreamAggregate.java:112)
> at
> org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
>
> Has anyone else seen this weird problem?
>
> Ara.
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message