flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wouter Zorgdrager <zorgdrag...@gmail.com>
Subject KafkaProducer with generic (Avro) serialization schema
Date Wed, 25 Apr 2018 14:16:43 GMT
Dear reader,

I'm currently working on writing a KafkaProducer which is able to serialize
a generic type using avro4s.
However this serialization schema is not serializable itself. Here is my
code for this:

The serialization schema:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
extends SerializationSchema[IN] {

  override def serialize(element: IN): Array[Byte] = {
    val byteArray = new ByteArrayOutputStream()
    val avroSer = AvroOutputStream.binary[IN](byteArray)

    return byteArray.toByteArray

The job code:
case class Person(name : String, age : Int, address : Address)
case class Address(city : String, street : String)

class SimpleJob {

  private lazy val serSchema : AvroSerializationSchema[Person] = new

  def start() = {
    val testPerson = Person("Test", 100, Address("Test", "Test"))

    val env = StreamExecutionEnvironment.getExecutionEnvironment


    env.execute("Flink sample job")

  def createKafkaSink() : RichSinkFunction[Person] = {
    //set some properties
    val properties = new Properties()
    properties.put("bootstrap.servers", "")
    properties.put("zookeeper.connect", "")

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)


The code does compile, however it gives the following error on
runtime: InvalidProgramException: Object
is not serializable.

I assume this means that my custom SerializationSchema is not serializable
due to the use of SchemaFor, FromRecord and ToRecord.
Anyone knows a solution or workaround?

Thanks in advance!

View raw message