kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dumitru-Nicolae Marasoui <nicolae.maras...@ovoenergy.com>
Subject NPE in kafka-streams with Avro input
Date Wed, 17 Jun 2020 15:29:37 GMT
Hello kafka community,
When the following kafka-streams starts with input topic values in avro
format, we get this NPE below. The input is a record and a field of it is
an array of other records. Reading the stack trace below what I understand
is that at some point in deserializing a value structure it encounters an
unexpected null value and hence the NPE. Do you have any hints as to what
may be the problem? In this kafka-streams ETL job we emit multiple messages
from a single input message (flatMapping the array field to the output).
Thank you
Exception in thread
“global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1"
java.lang.NullPointerException
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430)
at
com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381)
at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16)
at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22)
at
com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
at
org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:103)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:428)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Thank you,
Nicolae

-- 
Thank you,
Nicolae Marasoiu
Scala Engineer
Orion, OVO Group

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message