kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: weird SerializationException when consumer is fetching and parsing record in streams application
Date Thu, 30 Mar 2017 16:04:31 GMT
Not that i'm aware of

On Thu, 30 Mar 2017 at 16:00 Sachin Mittal <sjmittal@gmail.com> wrote:

> Damian,
> Is there any way where I can just dump out the contents at a given offset
> from a given log segment file.
>
> I am not sure how DumpLogSegment
> <
> https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment
> >
> helps. I already know the log segment file where that message is. Thing is
> size is 1 GB and there is no easy way to inspect that file and actually see
> what the payload is.
>
> Thanks
> Sachin
>
>
> On Thu, Mar 30, 2017 at 7:18 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
> > Sachin,
> >
> > Not sure if this will help, but you might want to try running
> > https://cwiki.apache.org/confluence/display/KAFKA/
> > System+Tools#SystemTools-DumpLogSegment
> > on the partition that is causing you problems.
> >
> > Thanks
> > Damian
> >
> > On Thu, 30 Mar 2017 at 14:29 Michael Noll <michael@confluent.io> wrote:
> >
> > > Sachin,
> > >
> > > there's a JIRA that seems related to what you're seeing:
> > > https://issues.apache.org/jira/browse/KAFKA-4740
> > >
> > > Perhaps you could check the above and report back?
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Thu, Mar 30, 2017 at 3:23 PM, Michael Noll <michael@confluent.io>
> > > wrote:
> > >
> > > > Hmm, I re-read the stacktrace again. It does look like the value-side
> > > > being the culprit (as Sachin suggested earlier).
> > > >
> > > > -Michael
> > > >
> > > >
> > > > On Thu, Mar 30, 2017 at 3:18 PM, Michael Noll <michael@confluent.io>
> > > > wrote:
> > > >
> > > >> Sachin,
> > > >>
> > > >> you have this line:
> > > >>
> > > >> > builder.stream(Serdes.String(), serde, "advice-stream")
> > > >>
> > > >> Could the problem be that not the record values are causing the
> > problem
> > > >> -- because your value deserializer does try-catch any such errors
--
> > but
> > > >> that the record *keys* are malformed?  The built-in
> `Serdes.String()`
> > > does
> > > >> not try-catch deserialization errors, and from a quick look at the
> > > source
> > > >> it seems that the `Fetcher` class (clients/src/main/java/org/apa
> > > >> che/kafka/clients/consumer/internals/Fetcher.java) is throwing your
> > > >> error above ("Error deserializing key/value for partition..."), and
> > the
> > > >> Fetcher is swallowing the more specific SerializationException of
> > > >> `String.Serdes()` (but it will include the original
> > exception/Throwable
> > > in
> > > >> its own SerializationException).
> > > >>
> > > >> -Michael
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Mar 30, 2017 at 2:52 PM, Sachin Mittal <sjmittal@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> My streams application does run in debug mode only.
> > > >>> Also I have checked the code around these lines
> > > >>>
> > > >>>   at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.
> > java:791)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>   at org.apache.kafka.common.record.Record.value(Record.java:268)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>   at org.apache.kafka.clients.consumer.internals.Fetcher.parseRec
> > > >>> ord(Fetcher.java:867)
> > > >>> ~[kafka-clients-0.10.2.0.jar:na]
> > > >>>
> > > >>> I don't see any log statement which will give me more information.
> > > >>>
> > > >>> https://github.com/apache/kafka/blob/0.10.2/clients/src/main
> > > >>> /java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L867
> > > >>>
> > > >>> The issue is happening at this line and perhaps handling the
> > exception
> > > >>> and
> > > >>> setting the value to be null may be better options.
> > > >>> Yes at client side nothing can be done because exception is
> happening
> > > >>> before this.valueDeserializer.deserialize can be called.
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>>
> > > >>> On Thu, Mar 30, 2017 at 4:28 PM, Damian Guy <damian.guy@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>> > The suggestions in that FAQ won't help as it is too late,
i.e.,
> the
> > > >>> message
> > > >>> > has already been received into Streams.
> > > >>> > You could create a simple app that uses the Consumer, seeks
to
> the
> > > >>> offset,
> > > >>> > and tries to read the message. If you did this in debug mode
you
> > > might
> > > >>> find
> > > >>> > out some more information.
> > > >>> >
> > > >>> >
> > > >>> > On Thu, 30 Mar 2017 at 11:50 Sachin Mittal <sjmittal@gmail.com>
> > > wrote:
> > > >>> >
> > > >>> > > Well I try to read that offset via kafka-console-consumer.sh
> too
> > > and
> > > >>> it
> > > >>> > > fails with same error.
> > > >>> > >
> > > >>> > > So was wondering if I can apply any of the suggestion
as per
> > > >>> > >
> > > >>> > > http://docs.confluent.io/3.2.0/streams/faq.html#handling-
> > > >>> > corrupted-records-and-deserialization-errors-poison-pill-messages
> > > >>> > >
> > > >>> > > If there is any other was just to get the contents of
that
> > message
> > > it
> > > >>> > would
> > > >>> > > be helpful.
> > > >>> > >
> > > >>> > > Thanks
> > > >>> > > Sachin
> > > >>> > >
> > > >>> > >
> > > >>> > > On Thu, Mar 30, 2017 at 4:11 PM, Damian Guy <
> > damian.guy@gmail.com>
> > > >>> > wrote:
> > > >>> > >
> > > >>> > > > Hi Sachin,
> > > >>> > > >
> > > >>> > > > Have you tried firing up a consumer (non-streams),
seeking to
> > > that
> > > >>> > offset
> > > >>> > > > on the topic and seeing what the message is? Might
be easier
> to
> > > >>> debug?
> > > >>> > > Like
> > > >>> > > > you say, it is failing in the consumer.
> > > >>> > > > Thanks,
> > > >>> > > > Damian
> > > >>> > > >
> > > >>> > > > On Thu, 30 Mar 2017 at 10:35 Sachin Mittal <
> sjmittal@gmail.com
> > >
> > > >>> wrote:
> > > >>> > > >
> > > >>> > > > > I think I am following the third option.
> > > >>> > > > >
> > > >>> > > > > My pipeline is:
> > > >>> > > > >
> > > >>> > > > > serde= Serdes.serdeFrom(new VSerializer(),
new
> VDeserializer
> > > ());
> > > >>> > > > >
> > > >>> > > > > builder.stream(Serdes.String(), serde, "advice-stream")
> > > >>> > > > >   .filter(new Predicate<String, V>()
{ ...})
> > > >>> > > > >   .groupByKey()
> > > >>> > > > >   .aggregate(new Initializer<V1>() {...},
new
> > > Aggregator<String,
> > > >>> V,
> > > >>> > > V1>()
> > > >>> > > > > {...}, windows, supplier)
> > > >>> > > > >   .mapValues(new ValueMapper<V1, V2>()
{ ... })
> > > >>> > > > >   .foreach(new ForeachAction<Windowed<String>,
V2>() {...
> > });
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > and In VDeserializer (implements Deserializer<V>)
I am
> doing
> > > >>> > something
> > > >>> > > > like
> > > >>> > > > > this:
> > > >>> > > > >
> > > >>> > > > >     public V deserialize(String paramString,
byte[]
> > > >>> > paramArrayOfByte) {
> > > >>> > > > >         if (paramArrayOfByte == null) { return
null;}
> > > >>> > > > >         V data = null;
> > > >>> > > > >         try {
> > > >>> > > > >             data = objectMapper.readValue(paramArrayOfByte,
> > new
> > > >>> > > > > TypeReference<V>() {});
> > > >>> > > > >         } catch (Exception e) {
> > > >>> > > > >             e.printStackTrace();
> > > >>> > > > >         }
> > > >>> > > > >         return data;
> > > >>> > > > >     }
> > > >>> > > > >
> > > >>> > > > > So I am catching any exception that may happen
when
> > > >>> deserializing the
> > > >>> > > > data.
> > > >>> > > > >
> > > >>> > > > > This is what third option suggest (if I am
not mistaken).
> > > >>> > > > >
> > > >>> > > > > Please let me know given the pipeline we which
option would
> > be
> > > >>> best
> > > >>> > and
> > > >>> > > > how
> > > >>> > > > > can we incorporate that in our pipeline.
> > > >>> > > > >
> > > >>> > > > > Also not exception is happening when reading
from source
> > topic
> > > >>> which
> > > >>> > is
> > > >>> > > > > "advice-stream", so looks like flow is not
going to
> pipeline
> > at
> > > >>> all
> > > >>> > for
> > > >>> > > > us
> > > >>> > > > > to handle. It is terminating right at consumer
poll.
> > > >>> > > > >
> > > >>> > > > > Thanks
> > > >>> > > > > Sachin
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > On Thu, Mar 30, 2017 at 2:22 PM, Michael Noll
<
> > > >>> michael@confluent.io>
> > > >>> > > > > wrote:
> > > >>> > > > >
> > > >>> > > > > > Could this be a corrupted message ("poison
pill") in your
> > > >>> topic?
> > > >>> > > > > >
> > > >>> > > > > > If so, take a look at
> > > >>> > > > > > http://docs.confluent.io/current/streams/faq.html#
> > > >>> > > > > >
> > > >>> > > > > handling-corrupted-records-and-deserialization-errors-
> > > >>> > > > poison-pill-messages
> > > >>> > > > > >
> > > >>> > > > > > FYI: We're currently investigating a
more elegant way to
> > > >>> address
> > > >>> > such
> > > >>> > > > > > poison pill problems.  If you have feedback
on that
> front,
> > > feel
> > > >>> > free
> > > >>> > > to
> > > >>> > > > > > share it with us. :-)
> > > >>> > > > > >
> > > >>> > > > > > -Michael
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > On Wed, Mar 29, 2017 at 10:07 PM, Sachin
Mittal <
> > > >>> > sjmittal@gmail.com>
> > > >>> > > > > > wrote:
> > > >>> > > > > >
> > > >>> > > > > > > Hi,
> > > >>> > > > > > > This is for first time we are getting
a weird
> exception.
> > > >>> > > > > > > After this the streams caches.
> > > >>> > > > > > >
> > > >>> > > > > > > Only work around is to manually
seek and commit offset
> > to a
> > > >>> > greater
> > > >>> > > > > > number
> > > >>> > > > > > > and we are needing this manual intervention
again and
> > > again.
> > > >>> > > > > > >
> > > >>> > > > > > > Any idea what is causing it and
how can we circumvent
> > this.
> > > >>> > > > > > >
> > > >>> > > > > > > Note this error happens in both
cases when 10.2 client
> or
> > > >>> 10.1.1
> > > >>> > > > client
> > > >>> > > > > > > connect to kafka server 10.1.1
> > > >>> > > > > > >
> > > >>> > > > > > > So this does not looks like version
issue.
> > > >>> > > > > > >
> > > >>> > > > > > > Also we have following setting
> > > >>> > > > > > > message.max.bytes=5000013
> > > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> > "5048576"
> > > >>> > > > > > > ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
"5048576"
> > > >>> > > > > > >
> > > >>> > > > > > > Rest is all default and also increasing
the value for
> > > >>> > > > > > > ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG
did not
> > > >>> help.
> > > >>> > > > > > >
> > > >>> > > > > > > Stack trace below.
> > > >>> > > > > > >
> > > >>> > > > > > > Thanks
> > > >>> > > > > > > Sachin
> > > >>> > > > > > >
> > > >>> > > > > > >
> > > >>> > > > > > > org.apache.kafka.common.errors.SerializationException:
> > > Error
> > > >>> > > > > > deserializing
> > > >>> > > > > > > key/value for partition advice-stream-6
at offset
> > 45153795
> > > >>> > > > > > > java.lang.IllegalArgumentException:
null
> > > >>> > > > > > >   at java.nio.Buffer.limit(Buffer.java:275)
> > > >>> ~[na:1.8.0_122-ea]
> > > >>> > > > > > >   at org.apache.kafka.common.utils.
> > > >>> Utils.sizeDelimited(Utils.
> > > >>> > > > java:791)
> > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >>> > > > > > >   at
> org.apache.kafka.common.record.Record.value(Record.
> > > >>> > java:268)
> > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >>> > > > > > >   at org.apache.kafka.clients.
> > consumer.internals.Fetcher.
> > > >>> > > > > > > parseRecord(Fetcher.java:867)
> > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >>> > > > > > >   at org.apache.kafka.clients.
> > consumer.internals.Fetcher.
> > > >>> > > > > > > parseCompletedFetch(Fetcher.java:775)
> > > >>> > ~[kafka-clients-0.10.2.0.jar:
> > > >>> > > > na]
> > > >>> > > > > > >   at org.apache.kafka.clients.
> > consumer.internals.Fetcher.
> > > >>> > > > > > > fetchedRecords(Fetcher.java:473)
> > > >>> ~[kafka-clients-0.10.2.0.jar:
> > > >>> > na]
> > > >>> > > > > > >   at org.apache.kafka.clients.consumer.KafkaConsumer.
> > > >>> > > > > > > pollOnce(KafkaConsumer.java:1062)
> > > >>> ~[kafka-clients-0.10.2.0.jar:
> > > >>> > na]
> > > >>> > > > > > >   at org.apache.kafka.clients.
> > consumer.KafkaConsumer.poll(
> > > >>> > > > > > > KafkaConsumer.java:995)
> > > >>> > > > > > > ~[kafka-clients-0.10.2.0.jar:na]
> > > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > > >>> > > > StreamThread.runLoop(
> > > >>> > > > > > > StreamThread.java:592)
> > > >>> > > > > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > >>> > > > > > >   at org.apache.kafka.streams.processor.internals.
> > > >>> > > > > > > StreamThread.run(StreamThread.java:378)
> > > >>> > ~[kafka-streams-0.10.2.1-
> > > >>> > > > > > > SNAPSHOT.jar:na]
> > > >>> > > > > > >
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >>
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message