kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Stein <joe.st...@stealth.ly>
Subject Re: Partition Key Cannot be Send Out by Producer
Date Fri, 21 Nov 2014 01:29:44 GMT
iI would helpful to see the full stack trace. Also, how have you
instantiated your Producer class? Did you set a value for
"serializer.class" in the property?

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/

On Thu, Nov 20, 2014 at 8:15 PM, Haoming Zhang <haoming.zhang@outlook.com>
wrote:

> Hi Harsha,
>
> I just tried to hard code a string message, then convert the message to
> byte array, but no lucky...
>
> The following is how my program works:
>
> Create a hardcode key, which is String, then convert to byte array,
> iterate a network message, send the message one by one:
>       networkelements foreach {
>         case networkelement =>
>           val bytes = Injection(networkelement)
>           logger.info(s"Synchronously sending Tweet $networkelement to
> topic ${producerApp.defaultTopic}")
>
>           val hardKey = "2"
>           val parkey = hardKey.getBytes("UTF8")
>           val topic = producerApp.defaultTopic
>           producerApp.send(parkey, bytes, topic)
>       }
>
>
> Here is how the networkelements created, where NetworkElement is a class
> that created by avro, I think you can ignore it:
> val networkelements = fixture.messages
>
>   val fixture = {
>     val BeginningOfEpoch = 0.seconds
>     val AnyTimestamp = 1234.seconds
>     val now = System.currentTimeMillis().millis
>
>     new {
>       val t1 = new NetworkElement("ANY_USER_1", "ANY_TEXT_1",
> now.toSeconds)
>       val t2 = new NetworkElement("ANY_USER_2", "ANY_TEXT_2",
> BeginningOfEpoch.toSeconds)
>       val t3 = new NetworkElement("ANY_USER_3", "ANY_TEXT_3",
> AnyTimestamp.toSeconds)
>
>       val messages = Seq(t1, t2, t3)
>     }
>   }
>
> BTW, I defined the Key and Val types as following:
>
>   type Key = Array[Byte]
>   type Val = Array[Byte]
>
> Thanks,
> Haoming
>
> > From: kafka@harsha.io
> > To: users@kafka.apache.org
> > Subject: Re: Partition Key Cannot be Send Out by Producer
> > Date: Thu, 20 Nov 2014 16:59:19 -0800
> >
> > also the (key: Key, value: Val, topic: Option[String]) "value" should be
> > a string converted to a byte array.
> > Can you send a example of your key and value data.
> >
> >
> > On Thu, Nov 20, 2014, at 04:53 PM, Haoming Zhang wrote:
> > > Hi Harsha,
> > >
> > > Thanks for suggestion!
> > >
> > > I have checked this link before, and I tried to create the partition
> key
> > > like the following:
> > >           val hardKey = "2"
> > >           val parkey = hardKey.getBytes("UTF8")
> > >
> > > But I still get the same exception. I also tried set "UTF8" as "UTF-8",
> > > but no luck...
> > >
> > > Regards,
> > > Haoming
> > >
> > > > From: kafka@harsha.io
> > > > To: users@kafka.apache.org
> > > > Subject: Re: Partition Key Cannot be Send Out by Producer
> > > > Date: Thu, 20 Nov 2014 16:43:11 -0800
> > > >
> > > > Hi Haoming,
> > > >           Take a look at the code here
> > > >
> https://github.com/stealthly/scala-kafka/blob/master/src/main/scala/KafkaProducer.scala
> > > > for your partKey it should be string and when you converting it into
> > > > byte array you can use partKey.getBytes("UTF8")
> > > > -Harsha
> > > >
> > > > On Thu, Nov 20, 2014, at 03:57 PM, Haoming Zhang wrote:
> > > > > Hi all,
> > > > >
> > > > > I'm a beginner of Kafka, currently I'm stuck by how to send out a
> > > > > KeyedMessage by producer. I would like to design a partition
> > > > > function to route the message based on the key, but the producer
> cannot
> > > > > send the KeyedMessage and I got this exception:
> > > > > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > > > >
> > > > > What I tried is hardcode a partition key ( I tried String and
> Integer,
> > > > > currently it is Integer ), then convert the partition key to Byte
> Array:
> > > > >           val converter = new DataTypeConvertion
> > > > >           val hardKey = 2
> > > > >           val partkey = converter.intToByteArray(hardKey)
> > > > >
> > > > >  Then create a KeyedMessage by the following function:
> > > > >
> > > > >   private def toMessage(value: Val, key: Option[Key] = None, topic:
> > > > >   Option[String] = None): KeyedMessage[Key, Val] = {
> > > > >     val t = topic.get
> > > > >     require(!t.isEmpty, "Topic must not be empty")
> > > > >     key match {
> > > > >       case Some(key) => new KeyedMessage(t, key, value)
> > > > >       case _ => new KeyedMessage(t, value)
> > > > >     }
> > > > >   }
> > > > >
> > > > > Then try to send the KeyedMessage by a Kafka producer:
> > > > >
> > > > >   def send(key: Key, value: Val, topic: Option[String] = None) {
> > > > >     val msg = toMessage(value, Option(key), topic)
> > > > >     print(msg + "\n")
> > > > >     print("msg.key" + msg.key + "\n")
> > > > >     print("msg.message" + msg.message + "\n")
> > > > >     print("msg.partKey" + msg.partKey + "\n")
> > > > >     print("msg.topic" + msg.topic + "\n")
> > > > >     try {
> > > > >       p.send(msg) //P is an instance of producer, exception
> happens in
> > > > >       this line
> > > > >     } catch {
> > > > >       case e: Exception =>
> > > > >         e.printStackTrace()
> > > > >         System.exit(1)
> > > > >     }
> > > > >   }
> > > > >
> > > > > As you can see, I added many print statement in the above
> function, and
> > > > > the following is the output of above function:
> > > > > KeyedMessage(testingInput,[B@7ad40950,[B@7ad40950,[B@7ce18764)
> > > > > msg.key: [B@7ad40950
> > > > > msg.message: [B@7ce18764
> > > > > msg.partKey: [B@7ad40950
> > > > > msg.topic: testingInput
> > > > >
> > > > > The key of KeyedMessage is displayed as [B@7ad40950 , I think it
> is a
> > > > > memory address and the exception (java.lang.ClassCastException: [B
> cannot
> > > > > be cast to java.lang.String) happens when "send" function try to
> convert
> > > > > the Byte Array to String.
> > > > >
> > > > > Am I wrong on creating a key in Byte Array type?
> > > > > Some examples of how to use KeyedMessage will be great!
> > > > >
> > > > > Regards,
> > > > > Haoming
> > > > >
> > > > >
> > >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message