flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Lublinsky <boris.lublin...@lightbend.com>
Subject Re: Custom Kryo serializer
Date Mon, 24 Jul 2017 14:31:31 GMT
Thanks Chesney,
Can you, please, point me to any example? 

Boris Lublinsky
FDP Architect
boris.lublinsky@lightbend.com
https://www.lightbend.com/

> On Jul 24, 2017, at 9:27 AM, Chesnay Schepler <chesnay@apache.org> wrote:
> 
> Copy of a mail i sent to the user mailing list only:
> 
> Raw state can only be used when implementing an operator, not a function.
> 
> For functions you have to use Managed Operator State. Your function will have to implement
> the CheckpointedFunction interface, and create a ValueStateDescriptor that you register
in initializeState.
> 
> On 24.07.2017 16:26, Boris Lublinsky wrote:
>> Is there a chance, this can be answered?
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 
>>> Begin forwarded message:
>>> 
>>> From: Boris Lublinsky <boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>>
>>> Subject: Re: Custom Kryo serializer
>>> Date: July 19, 2017 at 8:28:16 AM CDT
>>> To: user@flink.apache.org <mailto:user@flink.apache.org>, chesnay@apache.org
<mailto:chesnay@apache.org>
>>> 
>>> Thanks for the reply, but I am not using it for managed state, but rather for
the raw state
>>> In my implementation I have the following
>>> 
>>> class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe,
Double]{
>>> 
>>>   // The managed keyed state see https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
>>>   var modelState: ValueState[ModelToServeStats] = _
>>>   var newModelState: ValueState[ModelToServeStats] = _
>>>   // The raw state - https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state>
>>>   var currentModel : Option[Model] = None
>>>   var newModel : Option[Model] = None
>>> 
>>> Where current and new model are instances of the trait for which I implement
serializer
>>> According to documentation https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#raw-and-managed-state>
>>> 
>>> “Raw State is state that operators keep in their own data structures. When
checkpointed, they only write a sequence of bytes into the checkpoint. Flink knows nothing
about the state’s data structures and sees only the raw bytes.”
>>> 
>>> So I was assuming that I need to provide serializer for this.
>>> Am I missing something?
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> 
>>>> ---------- Forwarded message ----------
>>>> From: Chesnay Schepler <chesnay@apache.org <mailto:chesnay@apache.org>>
>>>> Date: Wed, Jul 19, 2017 at 1:34 PM
>>>> Subject: Re: Custom Kryo serializer
>>>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>>>> 
>>>> 
>>>> Hello,
>>>> 
>>>> I assume you're passing the class of your serializer in a StateDescriptor
constructor.
>>>> 
>>>> If so, you could add a breakpoint in Statedescriptor#initializeSerializerUnlessSet,
>>>> and check what typeInfo is created and which serializer is created as a result.
>>>> 
>>>> One thing you could try right away is registering your serializer for the
Model implementations,
>>>> instead of the trait.
>>>> 
>>>> Regards,
>>>> Chesnay
>>>> 
>>>> 
>>>> On 14.07.2017 15:50, Boris Lublinsky wrote:
>>>>> Hi 
>>>>> I have several implementations of my Model trait, 
>>>>> 
>>>>> trait Model {
>>>>>   def score(input : AnyVal) : AnyVal
>>>>>   def cleanup() : Unit
>>>>>   def toBytes() : Array[Byte]
>>>>>   def getType : Long
>>>>> }
>>>>> 
>>>>> neither one of them are serializable, but are used in the state definition.
>>>>> So I implemented custom serializer
>>>>> 
>>>>> import com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io/>.{Input,
Output}
>>>>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>>>>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>>>>> 
>>>>> 
>>>>> class ModelSerializerKryo extends Serializer[Model]{
>>>>>   
>>>>>   super.setAcceptsNull(false)
>>>>>   super.setImmutable(true)
>>>>> 
>>>>>   /** Reads bytes and returns a new object of the specified concrete
type.
>>>>>     * <p>
>>>>>     * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)}
must be called with the parent object to
>>>>>     * ensure it can be referenced by the child objects. Any serializer
that uses {@link Kryo} to read a child object may need to
>>>>>     * be reentrant.
>>>>>     * <p>
>>>>>     * This method should not be called directly, instead this serializer
can be passed to {@link Kryo} read methods that accept a
>>>>>     * serialier.
>>>>>     *
>>>>>     * @return May be null if { @link #getAcceptsNull()} is true. */
>>>>>   
>>>>>   override def read(kryo: Kryo, input: Input, `type`: Class[Model]):
Model = {
>>>>> 
>>>>>     import ModelSerializerKryo._
>>>>> 
>>>>>     val mType = input.readLong().asInstanceOf[Int]
>>>>>     val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
>>>>>     factories.get(mType) match {
>>>>>       case Some(factory) => factory.restore(bytes)
>>>>>       case _ => throw new Exception(s"Unknown model type $mType to
restore")
>>>>>     }
>>>>>   }
>>>>> 
>>>>>   /** Writes the bytes for the object to the output.
>>>>>     * <p>
>>>>>     * This method should not be called directly, instead this serializer
can be passed to {@link Kryo} write methods that accept a
>>>>>     * serialier.
>>>>>     *
>>>>>     * @param value May be null if { @link #getAcceptsNull()} is true.
*/
>>>>>   
>>>>>   override def write(kryo: Kryo, output: Output, value: Model): Unit
= {
>>>>>     output.writeLong(value.getType)
>>>>>     output.write(value.toBytes)
>>>>>   }
>>>>> }
>>>>> 
>>>>> object ModelSerializerKryo{
>>>>>   private val factories = Map(ModelDescriptor.ModelType.PMML.value ->
PMMLModel,
>>>>>     ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>>>>> }
>>>>> And added the following 
>>>>> 
>>>>> // Add custom serializer
>>>>> env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])
>>>>> 
>>>>> To configure it.
>>>>> I can see checkpoint messages at the output console, but I never hist
a break point in serializer.
>>>>> Any suggestions?
>>>>>  
>>>>> 
>>>>> 
>>>>> 
>>>>> Boris Lublinsky
>>>>> FDP Architect
>>>>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>>>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>>>> 
>>>> 
>>> 
>>> Begin forwarded message:
>>> 
>>> From: Boris Lublinsky <boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>>
>>> Subject: Custom Kryo serializer
>>> Date: July 14, 2017 at 8:50:22 AM CDT
>>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>>> 
>>> Hi 
>>> I have several implementations of my Model trait, 
>>> 
>>> trait Model {
>>>   def score(input : AnyVal) : AnyVal
>>>   def cleanup() : Unit
>>>   def toBytes() : Array[Byte]
>>>   def getType : Long
>>> }
>>> 
>>> neither one of them are serializable, but are used in the state definition.
>>> So I implemented custom serializer
>>> 
>>> import com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io/>.{Input,
Output}
>>> import com.esotericsoftware.kryo.{Kryo, Serializer}
>>> import com.lightbend.model.modeldescriptor.ModelDescriptor
>>> 
>>> 
>>> class ModelSerializerKryo extends Serializer[Model]{
>>>   
>>>   super.setAcceptsNull(false)
>>>   super.setImmutable(true)
>>> 
>>>   /** Reads bytes and returns a new object of the specified concrete type.
>>>     * <p>
>>>     * Before Kryo can be used to read child objects, {@link Kryo#reference(Object)}
must be called with the parent object to
>>>     * ensure it can be referenced by the child objects. Any serializer that uses
{@link Kryo} to read a child object may need to
>>>     * be reentrant.
>>>     * <p>
>>>     * This method should not be called directly, instead this serializer can
be passed to {@link Kryo} read methods that accept a
>>>     * serialier.
>>>     *
>>>     * @return May be null if { @link #getAcceptsNull()} is true. */
>>>   
>>>   override def read(kryo: Kryo, input: Input, `type`: Class[Model]): Model =
{
>>> 
>>>     import ModelSerializerKryo._
>>> 
>>>     val mType = input.readLong().asInstanceOf[Int]
>>>     val bytes = Stream.continually(input.readByte()).takeWhile(_ != -1).toArray
>>>     factories.get(mType) match {
>>>       case Some(factory) => factory.restore(bytes)
>>>       case _ => throw new Exception(s"Unknown model type $mType to restore")
>>>     }
>>>   }
>>> 
>>>   /** Writes the bytes for the object to the output.
>>>     * <p>
>>>     * This method should not be called directly, instead this serializer can
be passed to {@link Kryo} write methods that accept a
>>>     * serialier.
>>>     *
>>>     * @param value May be null if { @link #getAcceptsNull()} is true. */
>>>   
>>>   override def write(kryo: Kryo, output: Output, value: Model): Unit = {
>>>     output.writeLong(value.getType)
>>>     output.write(value.toBytes)
>>>   }
>>> }
>>> 
>>> object ModelSerializerKryo{
>>>   private val factories = Map(ModelDescriptor.ModelType.PMML.value -> PMMLModel,
>>>     ModelDescriptor.ModelType.TENSORFLOW.value -> TensorFlowModel)
>>> }
>>> And added the following 
>>> 
>>> // Add custom serializer
>>> env.getConfig.addDefaultKryoSerializer(classOf[Model], classOf[ModelSerializerKryo])
>>> 
>>> To configure it.
>>> I can see checkpoint messages at the output console, but I never hist a break
point in serializer.
>>> Any suggestions?
>>>  
>>> 
>>> 
>>> 
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublinsky@lightbend.com <mailto:boris.lublinsky@lightbend.com>
>>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 
> 


Mime
View raw message