kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Elizabeth Bennett <ebenn...@loggly.com>
Subject Thread safety of Encoder implementations
Date Tue, 13 Jan 2015 01:32:59 GMT
Hi Kafka Users,
I have written my own implementation of the kafka Encoder class for
serializing objects to Messages. It uses Kryo, which is a non-thread safe
java serialization library. I'm using Kafka 0.7.2.

We recently ran into an issue where we increased the number of kafka
brokers for our kafka producer from 1 to 2. When we did this, we ran into
exceptions that seemed related to Kryo being used concurrently by multiple
threads. So, my question is, do I need to modify my Encoder class to be
thread safe? I dug through the Kafka documentation and couldn't find
anything that said one way or another. Any information would be great.
Thank you!

--Liz Bennett

p.s. for what it's worth here is a stack trace from one of the exceptions
we saw:

2015-01-08 07:33:35,938 [ERROR] [ProducerHandlerWrapper.handle] Failed
to write 9 batched events to Kafka.
com.esotericsoftware.kryo.KryoException:
java.lang.ArrayIndexOutOfBoundsException: 40
Serialization trace:
fieldGroups (com.loggly.core.event.Event)
event (com.loggly.core.event.FailedEvent)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:520)
        at com.loggly.eventreader.kryo.KryoEventSerDes.serialize(KryoEventSerDes.java:39)
        at com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:23)
        at com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:8)
        at kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74)
        at kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:30)
        at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
        at scala.collection.mutable.ListBuffer.map(ListBuffer.scala:42)
        at kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74)
        at kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
        at scala.collection.Iterator$class.foreach(Iterator.scala:660)
        at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:93)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
        at scala.collection.mutable.HashMap.map(HashMap.scala:43)
        at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:74)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
        at com.loggly.kafka.producer.ProducerHandlerWrapper.handle(ProducerHandlerWrapper.java:64)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
        at scala.collection.immutable.Stream.foreach(Stream.scala:291)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
        at com.esotericsoftware.kryo.util.ObjectMap.resize(ObjectMap.java:460)
        at com.esotericsoftware.kryo.util.ObjectMap.put_internal(ObjectMap.java:125)
        at com.esotericsoftware.kryo.util.ObjectMap.put(ObjectMap.java:73)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.register(DefaultClassResolver.java:49)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
        at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:476)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
        at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:503)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:608)
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:91)
        at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538)
        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
        ... 40 more

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message