kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ricardo Ferreira <rifer...@riferrei.com>
Subject Re: NPE in kafka-streams with Avro input
Date Wed, 17 Jun 2020 15:50:34 GMT
Hi Dumitru,

According to the stack trace that you've shared the NPE is being thrown 
by this framework called *Avro4S* that you're using. This is important 
to isolate the problem because it means that it is not Kafka Streams the 
problem but rather, your serialization framework.

Nevertheless, the Avro specification allows fields to be null if you 
explicitly specify this in the Avro file. For instance:

```

{
   "type": "record",
   "name": "MyRecord",
   "fields" : [
     {"name": "userId", "type": "long"},              // mandatory field
     {"name": "userName", "type": ["null", "string"]} // optional field
   ]
}

```

The field *userName* above can have null values and be treated as 
optional. You may want to check if you can make this change in the Avro 
file or if it is made already, if the serialization framework that 
you're using don't have problems in handling situations like this.

Thanks,

-- Ricardo

On 6/17/20 11:29 AM, Dumitru-Nicolae Marasoui wrote:
> Hello kafka community,
> When the following kafka-streams starts with input topic values in avro
> format, we get this NPE below. The input is a record and a field of it is
> an array of other records. Reading the stack trace below what I understand
> is that at some point in deserializing a value structure it encounters an
> unexpected null value and hence the NPE. Do you have any hints as to what
> may be the problem? In this kafka-streams ETL job we emit multiple messages
> from a single input message (flatMapping the array field to the output).
> Thank you
> Exception in thread
> “global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1"
> java.lang.NullPointerException
> at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430)
> at
> com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381)
> at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16)
> at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22)
> at
> com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87)
> at
> org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
> at
> org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
> at
> org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
> at
> org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> at
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
> at
> org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:103)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:428)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Thank you,
> Nicolae
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message