kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: weird SerializationException when consumer is fetching and parsing record in streams application
Date Thu, 30 Mar 2017 10:50:08 GMT
Well I try to read that offset via kafka-console-consumer.sh too and it
fails with same error.

So was wondering if I can apply any of the suggestion as per
http://docs.confluent.io/3.2.0/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages

If there is any other was just to get the contents of that message it would
be helpful.

Thanks
Sachin


On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <damian.guy@gmail.com> wrote:

> Hi Sachin,
>
> Have you tried firing up a consumer (non-streams), seeking to that offset
> on the topic and seeing what the message is? Might be easier to debug? Like
> you say, it is failing in the consumer.
> Thanks,
> Damian
>
> On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmittal@gmail.com> wrote:
>
> > I think I am following the third option.
> >
> > My pipeline is:
> >
> > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> >
> > builder.stream(Serdes.String(), serde, "advice-stream")
> >   .filter(new Predicate<String, V>() { ...})
> >   .groupByKey()
> >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V, V1>()
> > {...}, windows, supplier)
> >   .mapValues(new ValueMapper<V1, V2>() { ... })
> >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> >
> >
> > and In VDeserializer (implements Deserializer<V>) I am doing something
> like
> > this:
> >
> >     public V deserialize(String paramString, byte[] paramArrayOfByte) {
> >         if (paramArrayOfByte == null) { return null;}
> >         V data = null;
> >         try {
> >             data = objectMapper.readValue(paramArrayOfByte, new
> > TypeReference<V>() {});
> >         } catch (Exception e) {
> >             e.printStackTrace();
> >         }
> >         return data;
> >     }
> >
> > So I am catching any exception that may happen when deserializing the
> data.
> >
> > This is what third option suggest (if I am not mistaken).
> >
> > Please let me know given the pipeline we which option would be best and
> how
> > can we incorporate that in our pipeline.
> >
> > Also not exception is happening when reading from source topic which is
> > "advice-stream", so looks like flow is not going to pipeline at all for
> us
> > to handle. It is terminating right at consumer poll.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <michael@confluent.io>
> > wrote:
> >
> > > Could this be a corrupted message ("poison pill") in your topic?
> > >
> > > If so, take a look at
> > > http://docs.confluent.io/current/streams/faq.html#
> > >
> > handling-corrupted-records-and-deserialization-errors-
> poison-pill-messages
> > >
> > > FYI: We're currently investigating a more elegant way to address such
> > > poison pill problems.  If you have feedback on that front, feel free to
> > > share it with us. :-)
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sjmittal@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > > This is for first time we are getting a weird exception.
> > > > After this the streams caches.
> > > >
> > > > Only work around is to manually seek and commit offset to a greater
> > > number
> > > > and we are needing this manual intervention again and again.
> > > >
> > > > Any idea what is causing it and how can we circumvent this.
> > > >
> > > > Note this error happens in both cases when 10.2 client or 10.1.1
> client
> > > > connect to kafka server 10.1.1
> > > >
> > > > So this does not looks like version issue.
> > > >
> > > > Also we have following setting
> > > > message.max.bytes=5000013
> > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > >
> > > > Rest is all default and also increasing the value for
> > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > > >
> > > > Stack trace below.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > org.apache.kafka.common.errors.SerializationException: Error
> > > deserializing
> > > > key/value for partition advice-stream-6 at offset 45153795
> > > > java.lang.IllegalArgumentException: null
> > > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> java:791)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > parseRecord(Fetcher.java:867)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:
> na]
> > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > KafkaConsumer.java:995)
> > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >   at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(
> > > > StreamThread.java:592)
> > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > >   at org.apache.kafka.streams.processor.internals.
> > > > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > > > SNAPSHOT.jar:na]
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message