samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jordi Blasi Uribarri <jbl...@nextel.es>
Subject RE: cannot be cast to java.lang.String
Date Thu, 26 Mar 2015 15:16:36 GMT
Got it!!!

It was all about (as Chinmay pointed) defining correctly the serializer:

serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
systems.kafka.streams.syslog.samza.msg.serde=string
systems.kafka.streams.samzaout.samza.msg.serde=string

Now going for the next step.

Thanks a lot,

  Jordi
-----Mensaje original-----
De: Jordi Blasi Uribarri [mailto:jblasi@nextel.es] 
Enviado el: jueves, 26 de marzo de 2015 9:21
Para: dev@samza.apache.org
Asunto: RE: cannot be cast to java.lang.String

I am not sure  of understanding what you want to say. Does this mean that the kafka producer
is not valid? I should test a producer specifically made for Samza or is it a configuration
issue? I have working producers and they are sending strings in different data structures,
but they all end being sent as Strings through the Kafka API. (props.put("serializer.class",
"kafka.serializer.StringEncoder");). I hava had the same issue with this code.

Thanks,

  	Jordi

-----Mensaje original-----
De: Chinmay Soman [mailto:chinmay.cerebro@gmail.com]
Enviado el: miércoles, 25 de marzo de 2015 16:59
Para: dev@samza.apache.org
Asunto: Re: cannot be cast to java.lang.String

I think this is a bit specific to Samza. In the KafkaSystemProducer class, it does something
like this:

envelope.getMessage.asInstanceOf[Array[Byte]]
and not just 'byte[]'. This is why we need to be explicit about the serialization format.


On Wed, Mar 25, 2015 at 3:14 AM, Jordi Blasi Uribarri <jblasi@nextel.es>
wrote:

> I am using the Kafka command line producer, so I understand that I am 
> sending a String.
>
>         bin/kafka-console-producer.sh --broker-list localhost:9092 
> --topic syslog
>
> What is actually the difference between a string and a json? Is it 
> just a matter of deserialization or is there any kind of metadata 
> included that specifies the contest type?
>
> How do I enable the debug mode?
>
> Thanks,
>
>         Jordi
>
> -----Mensaje original-----
> De: Chinmay Soman [mailto:chinmay.cerebro@gmail.com]
> Enviado el: lunes, 23 de marzo de 2015 17:55
> Para: dev@samza.apache.org
> Asunto: Re: cannot be cast to java.lang.String
>
> Hey Jordi,
>
> This is because you're sending String and not json in your output topic.
> Try setting string on the output stream as well (if you haven't already).
>
> If you have done that - then please enable debug mode and attach the 
> log somewhere so that we can take a look.
>
> On Mon, Mar 23, 2015 at 9:52 AM, Jordi Blasi Uribarri 
> <jblasi@nextel.es>
> wrote:
>
> > Looks like that was one error.
> >
> > I have set the property like this:
> >         systems.kafka.streams.syslog.samza.msg.serde=string
> >
> > But I am still getting the same error. Now I am seeing a different 
> > thing in the log previous to the exception:
> >
> > 23 mar 2015 05:49:31  INFO KafkaSystemProducer - Creating a new 
> > producer for system kafka.
> > 23 mar 2015 05:49:31  INFO ProducerConfig - ProducerConfig values:
> >         value.serializer = class
> > org.apache.kafka.common.serialization.ByteArraySerializer
> >         key.serializer = class
> > org.apache.kafka.common.serialization.ByteArraySerializer
> >         block.on.buffer.full = true
> >         retry.backoff.ms = 100
> >         buffer.memory = 33554432
> >         batch.size = 16384
> >         metrics.sample.window.ms = 30000
> >         metadata.max.age.ms = 300000
> >         receive.buffer.bytes = 32768
> >         timeout.ms = 30000
> >         max.in.flight.requests.per.connection = 1
> >         bootstrap.servers = [broker01:9092]
> >         metric.reporters = []
> >         client.id = samza_producer-samzafroga_job1-1-1427086163149-3
> >         compression.type = none
> >         retries = 2147483647
> >         max.request.size = 1048576
> >         send.buffer.bytes = 131072
> >         acks = 1
> >         reconnect.backoff.ms = 10
> >         linger.ms = 0
> >         metrics.num.samples = 2
> >         metadata.fetch.timeout.ms = 60000
> >
> > 23 mar 2015 05:49:31 ERROR SamzaContainer - Caught exception in 
> > process loop.
> > java.lang.ClassCastException: java.lang.String cannot be cast to [B
> >         at
> >
> org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProd
> ucer.scala:80)
> >         at
> > org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87)
> >         at
> >
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector
> .scala:61)
> >         at samzafroga.job1.process(job1.java:21)
> >         at
> >
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$s
> p(TaskInstance.scala:129)
> >         at
> >
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(Ta
> skInstanceExceptionHandler.scala:54)
> >         at
> > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128)
> >         at
> >
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc
> ala:114)
> >         at
> > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> >         at
> org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> >         at org.apache.samza.container.RunLoop.process(RunLoop.scala:100)
> >         at org.apache.samza.container.RunLoop.run(RunLoop.scala:83)
> >         at
> > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549)
> >         at
> > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
> >
> > Thanks.
> >
> >         Jordi
> > -----Mensaje original-----
> > De: Chinmay Soman [mailto:chinmay.cerebro@gmail.com]
> > Enviado el: lunes, 23 de marzo de 2015 17:36
> > Para: dev@samza.apache.org
> > Asunto: Re: cannot be cast to java.lang.String
> >
> > Have you tried setting this :
> >
> > systems.kafka.streams.syslog.samza.msg.serde=string   // And assuming
> > you've defined a 'string' serializer in your config
> >
> > OR
> >
> > systems.kafka.streams.syslog.samza.msg.serde=json     // Depending on the
> > corresponding format of your input data
> >
> > On Mon, Mar 23, 2015 at 9:24 AM, Jordi Blasi Uribarri 
> > <jblasi@nextel.es>
> > wrote:
> >
> > > Hi,
> > >
> > > As I understand it, I am setting "kafka" as the system name, "beste"
> > > as the output topic in the system and "syslog" as the input topic.
> > > Both topics syslog and beste are working correctly as I am 
> > > streaming some syslogs to the "syslog" topic and I am testing 
> > > "beste" with an internal application specifically designed. I am 
> > > not sure about the
> > kafka part.
> > >
> > > Thanks.
> > >
> > >         Jordi
> > >
> > > -----Mensaje original-----
> > > De: Chinmay Soman [mailto:chinmay.cerebro@gmail.com]
> > > Enviado el: lunes, 23 de marzo de 2015 17:16
> > > Para: dev@samza.apache.org
> > > Asunto: Re: cannot be cast to java.lang.String
> > >
> > > Hey Jordi,
> > >
> > > I see 3 different stream names.
> > >
> > > 1. new SystemStream("kafka", "beste");
> > >
> > > 2. task.inputs=kafka.syslog
> > >
> > > 3. systems.kafka.streams.frogain.samza.msg.serde=json
> > >
> > > Just for a sanity check, can you double check you're setting the 
> > > config params for the correct stream ?
> > >
> > >
> > > On Mon, Mar 23, 2015 at 3:31 AM, Jordi Blasi Uribarri 
> > > <jblasi@nextel.es>
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > I have managed to get samza up and running an simple test job 
> > > > that just sends the received message. This is the code:
> > > >
> > > > public class job1 implements StreamTask {
> > > >          private final SystemStream OUTPUT_STREAM = new 
> > > > SystemStream("kafka", "beste");
> > > >
> > > >        public void process(IncomingMessageEnvelope envelope,
> > > >            MessageCollector collector,
> > > >            TaskCoordinator coordinator)
> > > >        {
> > > >              String msg = (String)envelope.getMessage();
> > > >              String outmsg = msg;
> > > >              collector.send(new
> > > > OutgoingMessageEnvelope(OUTPUT_STREAM,
> > > > outmsg));
> > > >        }
> > > > }
> > > >
> > > > The properties file that runs it is this one:
> > > >
> > > > task.class=samzafroga.job1
> > > > job.factory.class=org.apache.samza.job.local.ThreadJobFactory
> > > > job.name=samzafroga.job1
> > > >
> > > >
> > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaS
> > > > ys te mF actory systems.kafka.consumer.zookeeper.connect=
> > > > broker01:2181
> > > > systems.kafka.producer.bootstrap.servers=broker01:9092
> > > >
> > > > task.inputs=kafka.syslog
> > > >
> > > > serializers.registry.json.class=org.apache.samza.serializers.Jso
> > > > nS er de Factory
> > > > systems.kafka.streams.frogain.samza.msg.serde=json
> > > >
> > > > When I get a message in the line that contains the following command:
> > > >                 String msg = (String)envelope.getMessage();
> > > >
> > > > I get an exception like this:
> > > >
> > > > 22 mar 2015 23:16:25 ERROR SamzaContainer - Caught exception in 
> > > > process loop.
> > > > java.lang.ClassCastException: [B cannot be cast to java.lang.String
> > > >         at samzafroga.job1.process(job1.java:19)
> > > >         at
> > > >
> > > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$m
> > > cV
> > > $s
> > > p(TaskInstance.scala:129)
> > > >         at
> > > >
> > > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandl
> > > e(
> > > Ta
> > > skInstanceExceptionHandler.scala:54)
> > > >         at
> > > >
> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:128
> )
> > > >         at
> > > >
> > > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.
> > > sc
> > > ala:114)
> > > >         at
> > > >
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37
> )
> > > >         at
> > > org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> > > >         at
> > org.apache.samza.container.RunLoop.process(RunLoop.scala:100)
> > > >         at org.apache.samza.container.RunLoop.run(RunLoop.scala:83)
> > > >         at
> > > >
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:549
> )
> > > >         at
> > > > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala
> > > > :4
> > > > 2)
> > > >
> > > > Is this a configuration issue? I tried changing the serializer 
> > > > to String but I has the same effect.
> > > >
> > > > Thanks,
> > > >
> > > >                Jordi
> > > > ________________________________ Jordi Blasi Uribarri Área I+D+i
> > > >
> > > > jblasi@nextel.es
> > > > Oficina Bilbao
> > > >
> > > > [http://www.nextel.es/wp-content/uploads/Firma_Nextel_2014.png]
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks and regards
> > >
> > > Chinmay Soman
> > >
> >
> >
> >
> > --
> > Thanks and regards
> >
> > Chinmay Soman
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>



--
Thanks and regards

Chinmay Soman
Mime
View raw message