flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Custom Kryo serializer
Date Wed, 19 Jul 2017 10:34:13 GMT
Hello,

I assume you're passing the class of your serializer in a 
StateDescriptor constructor.

If so, you could add a breakpoint in 
Statedescriptor#initializeSerializerUnlessSet,
and check what typeInfo is created and which serializer is created as a 
result.

One thing you could try right away is registering your serializer for 
the Model implementations,
instead of the trait.

Regards,
Chesnay

On 14.07.2017 15:50, Boris Lublinsky wrote:
> Hi
> I have several implementations of my Model trait,
>
> trait Model {
>    def score(input :AnyVal) :AnyVal def cleanup() :Unit def toBytes() : Array[Byte]
>    def getType :Long }
>
> neither one of them are serializable, but are used in the state 
> definition.
> So I implemented custom serializer
>
> import com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io>.{Input,
Output}
> import com.esotericsoftware.kryo.{Kryo, Serializer}
> import com.lightbend.model.modeldescriptor.ModelDescriptor
>
>
> class ModelSerializerKryoextends Serializer[Model]{
>    
>    super.setAcceptsNull(false)
>    super.setImmutable(true)
>
>    /** Reads bytes and returns a new object of the specified concrete 
> type. * <p> * Before Kryo can be used to read child objects, {@link 
> Kryo#reference(Object)} must be called with the parent object to * 
> ensure it can be referenced by the child objects. Any serializer that 
> uses {@link Kryo} to read a child object may need to * be reentrant. * 
> <p> * This method should not be called directly, instead this 
> serializer can be passed to {@link Kryo} read methods that accept a * 
> serialier. * * @return May be null if { @link #getAcceptsNull()} is 
> true. */ override def read(kryo: Kryo, input: Input, `type`:Class[Model]): Model = {
>
>      import ModelSerializerKryo._
>
>      val mType = input.readLong().asInstanceOf[Int]
>      val bytes =Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
>      factories.get(mType)match {
>        case Some(factory) => factory.restore(bytes)
>        case _ =>throw new Exception(s"Unknown model type $mTypeto restore")
>      }
>    }
>
>    /** Writes the bytes for the object to the output. * <p> * This method 
> should not be called directly, instead this serializer can be passed 
> to {@link Kryo} write methods that accept a * serialier. * * @param 
> value May be null if { @link #getAcceptsNull()} is true. */ override 
> def write(kryo: Kryo, output: Output, value: Model):Unit = {
>      output.writeLong(value.getType)
>      output.write(value.toBytes)
>    }
> }
>
> object ModelSerializerKryo{
>    private val factories =Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel, ModelDescriptor.ModelType.TENSORFLOW.value
-> TensorFlowModel)
> }
> And added the following
>
> // Add custom serializer env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])
>
> To configure it.
> I can see checkpoint messages at the output console, but I never hist 
> a break point in serializer.
> Any suggestions?
>
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
> https://www.lightbend.com/
>


Mime
View raw message