spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: broadcast variable of Kafka producer throws ConcurrentModificationException
Date Tue, 18 Aug 2015 23:28:16 GMT
Why are you even trying to broadcast a producer? A broadcast variable is
some immutable piece of serializable DATA that can be used for processing
on the executors. A Kafka producer is neither DATA nor immutable, and
definitely not serializable.
The right way to do this is to create the producer in the executors. Please
see the discussion in the programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <cody@koeninger.org> wrote:

> I wouldn't expect a kafka producer to be serializable at all... among
> other things, it has a background thread
>
> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
> wanshenghua@gmail.com> wrote:
>
>> Hi,
>> Did anyone see java.util.ConcurrentModificationException when using
>> broadcast variables?
>> I encountered this exception when wrapping a Kafka producer like this in
>> the spark streaming driver.
>>
>> Here is what I did.
>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>> String>(properties);
>> final Broadcast<KafkaDataProducer> bCastProducer
>>     = streamingContext.sparkContext().broadcast(producer);
>>
>> Then within an closure called by a foreachRDD, I was trying to get the
>> wrapped producer, i.e.
>>  KafkaProducer<String, String> p = bCastProducer.value();
>>
>> after rebuilding and rerunning, I got the stack trace like this
>>
>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>> java.util.ConcurrentModificationException
>> Serialization trace:
>> classes (sun.misc.Launcher$AppClassLoader)
>> classloader (java.security.ProtectionDomain)
>> context (java.security.AccessControlContext)
>> acc (org.apache.spark.util.MutableURLClassLoader)
>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>> producer ("my driver")
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>> at
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>> at
>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>> at
>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>> at "my driver"
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.util.ConcurrentModificationException
>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>> at java.util.Vector$Itr.next(Vector.java:1133)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> ... 41 more
>>
>> ​Thanks.​
>>
>> --
>>
>> Regards,
>> Shenghua (Daniel) Wan
>>
>
>

Mime
View raw message