kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Milkowski <amgm2...@gmail.com>
Subject Re: Hadoop example running DataGenerator causes kafka.message.Message cannot be cast to [B exception
Date Wed, 05 Jun 2013 16:37:32 GMT
Jun, thanks for the response

1. new jira https://issues.apache.org/jira/browse/KAFKA-933
2. locally I patched DataGenerator

public void run() throws Exception {

List<KeyedMessage<Integer, String>> list = new
ArrayList<KeyedMessage<Integer, String>>();
for (int i = 0; i < 50; i++) {
Long timestamp = RANDOM.nextLong();
if (timestamp < 0) timestamp = -timestamp;
String messageStr = timestamp.toString();
log.info(" creating message: " + messageStr);
list.add(new KeyedMessage<Integer, String>(topic, null, messageStr));
}

log.info(" send " + list.size() + " " + topic + " count events to " + uri);
producer.send(list);
producer.close();
 generateOffsets();
}

I have a separate request, having to do with upgrading kafka hadoop api's,
will follow up in the separte email to @users

thanks!


On Thu, May 30, 2013 at 11:50 PM, Jun Rao <junrao@gmail.com> wrote:

> This seems to be a bug. We should send the message string, instead the
> Message object in DataGenerator. Could you file a jira?
>
> Thanks,
>
> Jun
>
>
> On Thu, May 30, 2013 at 1:47 PM, Andrew Milkowski <amgm2006@gmail.com
> >wrote:
>
> > Hi,
> >
> > Working of git master codebase
> >
> > and following instructions at
> >
> >
> https://github.com/apache/kafka/blob/trunk/contrib/hadoop-consumer/README
> >
> > https://github.com/apache/kafka
> >
> > when running
> >
> > ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
> >
> > an exception is thrown
> >
> > Exception in thread "main" java.lang.ClassCastException:
> > kafka.message.Message cannot be cast to [B
> > at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:129)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:124)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> > at
> >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > at
> >
> >
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > at
> >
> >
> scala.collection.JavaConversions$JListWrapper.map(JavaConversions.scala:615)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:124)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:54)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:41)
> >
> > please advice, thank you!
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message