flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nortman, Bill" <William.Nort...@pimco.com>
Subject RE: KafkaProducer with generic (Avro) serialization schema
Date Wed, 25 Apr 2018 22:42:54 GMT
The things I would try would first in you are you class Person and Address have getters and
setters and a no argument constructor.

From: Wouter Zorgdrager [mailto:zorgdragerw@gmail.com]
Sent: Wednesday, April 25, 2018 7:17 AM
To: user@flink.apache.org
Subject: KafkaProducer with generic (Avro) serialization schema

Dear reader,

I'm currently working on writing a KafkaProducer which is able to serialize a generic type
using avro4s.
However this serialization schema is not serializable itself. Here is my code for this:

The serialization schema:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN]
{

  override def serialize(element: IN): Array[Byte] = {
    val byteArray = new ByteArrayOutputStream()
    val avroSer = AvroOutputStream.binary[IN](byteArray)
    avroSer.write(element)
    avroSer.flush()
    avroSer.close()

    return byteArray.toByteArray
  }
}

The job code:
case class Person(name : String, age : Int, address : Address)
case class Address(city : String, street : String)

class SimpleJob {

  @transient
  private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]()

  def start() = {
    val testPerson = Person("Test", 100, Address("Test", "Test"))

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.
      fromCollection(Seq(testPerson)).
      addSink(createKafkaSink())

    env.execute("Flink sample job")
  }


  def createKafkaSink() : RichSinkFunction[Person] = {
    //set some properties
    val properties = new Properties()
    properties.put("bootstrap.servers", "127.0.0.01:9092<https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=>")
    properties.put("zookeeper.connect", "127.0.0.1:2181<https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=>")

    new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
  }

}

The code does compile, however it gives the following error on runtime: InvalidProgramException:
Object org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d<mailto:org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d>
is not serializable.

I assume this means that my custom SerializationSchema is not serializable due to the use
of SchemaFor, FromRecord and ToRecord.
Anyone knows a solution or workaround?

Thanks in advance!
Wouter
This message contains confidential information and is intended only for the individual named.
If you are not the named addressee, you should not disseminate, distribute, alter or copy
this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail
by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed
to be secure or without error as information could be intercepted, corrupted, lost, destroyed,
arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability
for any errors or omissions in the contents of this message which arise during or as a result
of e-mail transmission. If verification is required, please request a hard-copy version. This
message is provided for information purposes and should not be construed as a solicitation
or offer to buy or sell any securities or related financial instruments in any jurisdiction.
 Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company
of PIMCO LLC.

The individual providing the information herein is an employee of Pacific Investment Management
Company LLC ("PIMCO"), an SEC-registered investment adviser.  To the extent such individual
advises you regarding a PIMCO investment strategy, he or she does so as an associated person
of PIMCO.  To the extent that any information is provided to you related to a PIMCO-sponsored
investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as
a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer.
 PI is not registered, and does not intend to register, as a municipal advisor and therefore
does not provide advice with respect to the investment of the proceeds of municipal securities
or municipal escrow investments.  In addition, unless otherwise agreed by PIMCO, this communication
and any related attachments are being provided on the express basis that they will not cause
PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the
Internal Revenue Code.
Mime
View raw message