nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "michael.griffiths3@baesystems.com" <michael.griffit...@baesystems.com>
Subject RE: NiFi-Spark receiver not serializable
Date Fri, 04 Sep 2015 17:16:07 GMT
Hi Mark,

I've just left a comment on the JIRA ticket, but yes that has solved the issue, thanks! I
tried making everything else serializable but missed that one.

Thanks again,
Michael

-----Original Message-----
From: Mark Payne [mailto:markap14@hotmail.com]
Sent: 04 September 2015 17:24
To: users@nifi.apache.org
Subject: RE: NiFi-Spark receiver not serializable

Michael,

I have submitted a patch for NIFI-927, if you are interested in giving that a try.

Thanks
-Mark

----------------------------------------
> From: markap14@hotmail.com
> To: users@nifi.apache.org
> Subject: RE: NiFi-Spark receiver not serializable
> Date: Fri, 4 Sep 2015 11:19:48 -0500
>
>
> Michael,
>
> It looks like it is actually not the Runnable that needs to be
> receivable but rather the inner class of the Runnable (referenced as
> Runnable$1 here). This is the implementation of the NiFiDataPacket interface. I have
created a ticket to address this:
>
> https://issues.apache.org/jira/browse/NIFI-927
>
> Thanks
> -Mark
>
> ________________________________
>> From: michael.griffiths3@baesystems.com
>> To: users@nifi.apache.org
>> Subject: NiFi-Spark receiver not serializable
>> Date: Fri, 4 Sep 2015 16:02:10 +0000
>>
>>
>> Hi all,
>>
>>
>>
>> I keep having an odd error that happens frequently (but not always, a
>> Spark job can run for ~10 minutes before this is thrown). The
>> NiFiReceiver's Runnable is unable to be serialized due to it being
>> modified at the same time as it is being serialized. The full stack
>> trace is below. I've looked through the NiFiReceiver code, including
>> taking a copy and making the Runnable extend serializable but that
>> did not solve it either.
>>
>>
>>
>> Has anyone seen this before? To be frank, I'm unsure whether it's the
>> receiver or my own code affecting the receiver.
>>
>>
>>
>> Many thanks,
>>
>>
>>
>> Michael
>>
>>
>>
>> 15/09/04 16:55:39 WARN scheduler.TaskSetManager: Lost task 0.0 in
>> stage
>> 48.0 (TID 411, slave6.localdomain): java.lang.RuntimeException:
>> com.esotericsoftware.kryo.KryoException:
>> java.util.ConcurrentModificationException
>>
>> Serialization trace:
>>
>> classes (sun.misc.Launcher$AppClassLoader)
>>
>> classloader (java.security.ProtectionDomain)
>>
>> cachedPDs (javax.security.auth.SubjectDomainCombiner)
>>
>> combiner (java.security.AccessControlContext)
>>
>> acc (org.apache.spark.util.MutableURLClassLoader)
>>
>> contextClassLoader
>> (org.apache.spark.streaming.util.RecurringTimer$$anon$1)
>>
>> thread (org.apache.spark.streaming.util.RecurringTimer)
>>
>> blockIntervalTimer
>> (org.apache.spark.streaming.receiver.BlockGenerator)
>>
>> blockGenerator
>> (org.apache.spark.streaming.receiver.ReceiverSupervisorImpl)
>>
>> executor_ (org.apache.nifi.spark.NiFiReceiver)
>>
>> this$0 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable)
>>
>> this$1 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable$1)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:585)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> at
>> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializ
>> er.java:79)
>>
>> at
>> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializ
>> er.java:17)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:570)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSeri
>> alizer.java:213)
>>
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>
>> at
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoS
>> erializer.scala:148)
>>
>> at
>> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.s
>> cala:153)
>>
>> at
>> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManage
>> r.scala:1189)
>>
>> at
>> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scal
>> a:1198)
>>
>> at
>> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:190)
>>
>> at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:4
>> 80)
>>
>> at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala
>> :302)
>>
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(N
>> ettyBlockRpcServer.scala:57)
>>
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(N
>> ettyBlockRpcServer.scala:57)
>>
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>> .scala:244)
>>
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>> .scala:244)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
>> d.scala:33)
>>
>> at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlock
>> RpcServer.scala:57)
>>
>> at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcReq
>> uest(TransportRequestHandler.java:114)
>>
>> at
>> org.apache.spark.network.server.TransportRequestHandler.handle(Transp
>> ortRequestHandler.java:87)
>>
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(
>> TransportChannelHandler.java:101)
>>
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(
>> TransportChannelHandler.java:51)
>>
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChanne
>> lInboundHandler.java:105)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>> ractChannelHandlerContext.java:333)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>> ctChannelHandlerContext.java:319)
>>
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandle
>> r.java:254)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>> ractChannelHandlerContext.java:333)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>> ctChannelHandlerContext.java:319)
>>
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToM
>> essageDecoder.java:103)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>> ractChannelHandlerContext.java:333)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>> ctChannelHandlerContext.java:319)
>>
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessage
>> Decoder.java:163)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>> ractChannelHandlerContext.java:333)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>> ctChannelHandlerContext.java:319)
>>
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChanne
>> lPipeline.java:787)
>>
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstra
>> ctNioByteChannel.java:130)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.jav
>> a:511)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEve
>> ntLoop.java:468)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.ja
>> va:382)
>>
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThread
>> EventExecutor.java:116)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.util.ConcurrentModificationException
>>
>> at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
>>
>> at java.util.Vector$Itr.next(Vector.java:1137)
>>
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(Coll
>> ectionSerializer.java:67)
>>
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(Coll
>> ectionSerializer.java:18)
>>
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.wri
>> te(FieldSerializer.java:564)
>>
>> ... 78 more
>>
>>
>>
>> at
>> org.apache.spark.network.client.TransportResponseHandler.handle(Trans
>> portResponseHandler.java:162)
>>
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(
>> TransportChannelHandler.java:103)
>>
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(
>> TransportChannelHandler.java:51)
>>
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChanne
>> lInboundHandler.java:105)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>> ractChannelHandlerContext.java:333)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>> ctChannelHandlerContext.java:319)
>>
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandle
>> r.java:254)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>> ractChannelHandlerContext.java:333)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>> ctChannelHandlerContext.java:319)
>>
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToM
>> essageDecoder.java:103)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>> ractChannelHandlerContext.java:333)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>> ctChannelHandlerContext.java:319)
>>
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessage
>> Decoder.java:163)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abst
>> ractChannelHandlerContext.java:333)
>>
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstra
>> ctChannelHandlerContext.java:319)
>>
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChanne
>> lPipeline.java:787)
>>
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstra
>> ctNioByteChannel.java:130)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.jav
>> a:511)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEve
>> ntLoop.java:468)
>>
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.ja
>> va:382)
>>
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThread
>> EventExecutor.java:116)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>>
>>
>>
>> Michael Griffiths
>> NationProtect Developer
>> BAE Systems Applied Intelligence
>> ___________________________________________________________
>>
>> E: michael.griffiths3@baesystems.com
>>
>> BAE Systems Applied Intelligence, Surrey Research Park, Guildford,
>> Surrey, GU2 7RQ.
>> www.baesystems.com/ai<http://www.baesystems.com/ai>
>>
>>
>>
>> Please consider the environment before printing this email. This
>> message should be regarded as confidential. If you have received this
>> email in error please notify the sender and destroy it immediately.
>> Statements of intent shall only become binding when confirmed in hard
>> copy by an authorised signatory. The contents of this email may
>> relate to dealings with other companies under the control of BAE
>> Systems Applied Intelligence Limited, details of which can be found
>> at http://www.baesystems.com/Businesses/index.htm.
>

Please consider the environment before printing this email. This message should be regarded
as confidential. If you have received this email in error please notify the sender and destroy
it immediately. Statements of intent shall only become binding when confirmed in hard copy
by an authorised signatory. The contents of this email may relate to dealings with other companies
under the control of BAE Systems Applied Intelligence Limited, details of which can be found
at http://www.baesystems.com/Businesses/index.htm.

Mime
View raw message