kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Elizabeth Bennett <ebenn...@loggly.com>
Subject Re: Thread safety of Encoder implementations
Date Tue, 13 Jan 2015 19:40:52 GMT
Hi Guozhang,
Thanks for you response. We've only got one producer client (per Kryo
instance) but the producer client is configured (via the broker.list
config) to produce to two Kafka brokers. When we create the Producer, we
pass in an instance of the serializer. What if we used the serializer.class
config to specify the class name of the serializer rather than pass in an
instance? Would Kafka then create a separate serializer instance for each
broker that it produces to? That would solve our problem assuming that the
Producer spawns new threads for each kafka broker that it produces to,
which I'm not sure about.

--Liz

On Mon, Jan 12, 2015 at 10:55 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Hi Liz,
>
> Do you have multiple producer clients that use the same Kryo serializer
> objects? Each client will only have one background thread that tries to
> call serialize(), and hence in that case you will have concurrent access.
>
> Guozhang
>
>
> On Mon, Jan 12, 2015 at 5:32 PM, Elizabeth Bennett <ebennett@loggly.com>
> wrote:
>
> > Hi Kafka Users,
> > I have written my own implementation of the kafka Encoder class for
> > serializing objects to Messages. It uses Kryo, which is a non-thread safe
> > java serialization library. I'm using Kafka 0.7.2.
> >
> > We recently ran into an issue where we increased the number of kafka
> > brokers for our kafka producer from 1 to 2. When we did this, we ran into
> > exceptions that seemed related to Kryo being used concurrently by
> multiple
> > threads. So, my question is, do I need to modify my Encoder class to be
> > thread safe? I dug through the Kafka documentation and couldn't find
> > anything that said one way or another. Any information would be great.
> > Thank you!
> >
> > --Liz Bennett
> >
> > p.s. for what it's worth here is a stack trace from one of the exceptions
> > we saw:
> >
> > 2015-01-08 07:33:35,938 [ERROR] [ProducerHandlerWrapper.handle] Failed
> > to write 9 batched events to Kafka.
> > com.esotericsoftware.kryo.KryoException:
> > java.lang.ArrayIndexOutOfBoundsException: 40
> > Serialization trace:
> > fieldGroups (com.loggly.core.event.Event)
> > event (com.loggly.core.event.FailedEvent)
> >         at
> >
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> >         at
> >
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474)
> >         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538)
> >         at
> >
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >         at
> >
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474)
> >         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:520)
> >         at
> >
> com.loggly.eventreader.kryo.KryoEventSerDes.serialize(KryoEventSerDes.java:39)
> >         at
> >
> com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:23)
> >         at
> >
> com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:8)
> >         at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74)
> >         at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74)
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
> >         at
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> >         at scala.collection.immutable.List.foreach(List.scala:45)
> >         at
> >
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:30)
> >         at
> scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42)
> >         at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
> >         at scala.collection.mutable.ListBuffer.map(ListBuffer.scala:42)
> >         at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74)
> >         at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74)
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194)
> >         at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
> >         at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> >         at
> > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> >         at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> >         at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)
> >         at scala.collection.mutable.HashMap.foreach(HashMap.scala:93)
> >         at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:194)
> >         at scala.collection.mutable.HashMap.map(HashMap.scala:43)
> >         at
> >
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:74)
> >         at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> >         at
> >
> com.loggly.kafka.producer.ProducerHandlerWrapper.handle(ProducerHandlerWrapper.java:64)
> >         at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> >         at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> >         at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> >         at scala.collection.immutable.Stream.foreach(Stream.scala:291)
> >         at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> >         at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
> >         at
> > com.esotericsoftware.kryo.util.ObjectMap.resize(ObjectMap.java:460)
> >         at
> > com.esotericsoftware.kryo.util.ObjectMap.put_internal(ObjectMap.java:125)
> >         at
> com.esotericsoftware.kryo.util.ObjectMap.put(ObjectMap.java:73)
> >         at
> >
> com.esotericsoftware.kryo.util.DefaultClassResolver.register(DefaultClassResolver.java:49)
> >         at
> >
> com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
> >         at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:476)
> >         at
> >
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> >         at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:503)
> >         at
> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:608)
> >         at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:91)
> >         at
> >
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
> >         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538)
> >         at
> >
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> >         ... 40 more
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message