kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: weird SerializationException when consumer is fetching and parsing record in streams application
Date Thu, 30 Mar 2017 10:58:39 GMT
The suggestions in that FAQ won't help as it is too late, i.e., the message
has already been received into Streams.
You could create a simple app that uses the Consumer, seeks to the offset,
and tries to read the message. If you did this in debug mode you might find
out some more information.


On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sjmittal@gmail.com> wrote:

> Well I try to read that offset via kafka-console-consumer.sh too and it
> fails with same error.
>
> So was wondering if I can apply any of the suggestion as per
>
> http://docs.confluent.io/3.2.0/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-messages
>
> If there is any other was just to get the contents of that message it would
> be helpful.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > Have you tried firing up a consumer (non-streams), seeking to that offset
> > on the topic and seeing what the message is? Might be easier to debug?
> Like
> > you say, it is failing in the consumer.
> > Thanks,
> > Damian
> >
> > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <sjmittal@gmail.com> wrote:
> >
> > > I think I am following the third option.
> > >
> > > My pipeline is:
> > >
> > > serde= Serdes.serdeFrom(new VSerializer(), new VDeserializer ());
> > >
> > > builder.stream(Serdes.String(), serde, "advice-stream")
> > >   .filter(new Predicate<String, V>() { ...})
> > >   .groupByKey()
> > >   .aggregate(new Initializer<V1>() {...}, new Aggregator<String, V,
> V1>()
> > > {...}, windows, supplier)
> > >   .mapValues(new ValueMapper<V1, V2>() { ... })
> > >   .foreach(new ForeachAction<Windowed<String>, V2>() {... });
> > >
> > >
> > > and In VDeserializer (implements Deserializer<V>) I am doing something
> > like
> > > this:
> > >
> > >     public V deserialize(String paramString, byte[] paramArrayOfByte) {
> > >         if (paramArrayOfByte == null) { return null;}
> > >         V data = null;
> > >         try {
> > >             data = objectMapper.readValue(paramArrayOfByte, new
> > > TypeReference<V>() {});
> > >         } catch (Exception e) {
> > >             e.printStackTrace();
> > >         }
> > >         return data;
> > >     }
> > >
> > > So I am catching any exception that may happen when deserializing the
> > data.
> > >
> > > This is what third option suggest (if I am not mistaken).
> > >
> > > Please let me know given the pipeline we which option would be best and
> > how
> > > can we incorporate that in our pipeline.
> > >
> > > Also not exception is happening when reading from source topic which is
> > > "advice-stream", so looks like flow is not going to pipeline at all for
> > us
> > > to handle. It is terminating right at consumer poll.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll <michael@confluent.io>
> > > wrote:
> > >
> > > > Could this be a corrupted message ("poison pill") in your topic?
> > > >
> > > > If so, take a look at
> > > > http://docs.confluent.io/current/streams/faq.html#
> > > >
> > > handling-corrupted-records-and-deserialization-errors-
> > poison-pill-messages
> > > >
> > > > FYI: We're currently investigating a more elegant way to address such
> > > > poison pill problems.  If you have feedback on that front, feel free
> to
> > > > share it with us. :-)
> > > >
> > > > -Michael
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin Mittal <sjmittal@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > This is for first time we are getting a weird exception.
> > > > > After this the streams caches.
> > > > >
> > > > > Only work around is to manually seek and commit offset to a greater
> > > > number
> > > > > and we are needing this manual intervention again and again.
> > > > >
> > > > > Any idea what is causing it and how can we circumvent this.
> > > > >
> > > > > Note this error happens in both cases when 10.2 client or 10.1.1
> > client
> > > > > connect to kafka server 10.1.1
> > > > >
> > > > > So this does not looks like version issue.
> > > > >
> > > > > Also we have following setting
> > > > > message.max.bytes=5000013
> > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "5048576"
> > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5048576"
> > > > >
> > > > > Rest is all default and also increasing the value for
> > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG did not help.
> > > > >
> > > > > Stack trace below.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > > org.apache.kafka.common.errors.SerializationException: Error
> > > > deserializing
> > > > > key/value for partition advice-stream-6 at offset 45153795
> > > > > java.lang.IllegalArgumentException: null
> > > > >   at java.nio.Buffer.limit(Buffer.java:275) ~[na:1.8.0_122-ea]
> > > > >   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> > java:791)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > parseRecord(Fetcher.java:867)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > parseCompletedFetch(Fetcher.java:775) ~[kafka-clients-0.10.2.0.jar:
> > na]
> > > > >   at org.apache.kafka.clients.consumer.internals.Fetcher.
> > > > > fetchedRecords(Fetcher.java:473) ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > pollOnce(KafkaConsumer.java:1062) ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > > KafkaConsumer.java:995)
> > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > > >   at org.apache.kafka.streams.processor.internals.
> > StreamThread.runLoop(
> > > > > StreamThread.java:592)
> > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > > >   at org.apache.kafka.streams.processor.internals.
> > > > > StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-
> > > > > SNAPSHOT.jar:na]
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message