samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jagadish Venkatraman <jagadish1...@gmail.com>
Subject Re: Samza not consuming
Date Fri, 18 Mar 2016 05:26:05 GMT
Hi David,

Appreciate the feedback. Currently, Samza is single-threaded. The process()
callback executes exactly in the context of the same thread that processes
events. You can take a process dump (often over a period of time
repeatedly) and figure out where the threads are stuck to debug these kind
of issues. In general, it is a good idea to timeout any remote calls you
make.

We're building multi-threaded support into Samza (SAMZA-863) including
asynchronous processing (to support remote calls and async IO). Please have
a look at the Jira and provide feedback on the API in the ticket. We can
better align the design/implementation to address that.

Thanks,
Jagadish



On Thu, Mar 17, 2016 at 7:56 PM, David Yu <david.yu@optimizely.com> wrote:

> Looks like this has nothing to do with checkpointing. Our samza job has an
> issue communicating an external service, which left the particular
> process() call waiting indefinitely. And it doesn't look like samza has a
> way to timeout a processing cycle.
>
> On Thu, Mar 17, 2016 at 5:42 PM, David Yu <david.yu@optimizely.com> wrote:
>
> > Strangely, I was not able to get checkpoint value for one particular
> > partition. Could this cause the job to be stuck?
> >
> > On Thu, Mar 17, 2016 at 5:23 PM, David Yu <david.yu@optimizely.com>
> wrote:
> >
> >> Hi, I wanna resurface this thread because I'm still facing issues with
> >> our samza not receiving events.
> >>
> >> Our samza job metric "SamzaContainerMetrics.process-calls" dropped to
> >> zero today again. So does "SamzaContainerMetrics.process-envelopes" (of
> >> course). Current topic offset and task checkpoint revealed that
> everything
> >> looks good:
> >>
> >> Topic partition 18 offset (as of now) = *488986*
> >> Current checkpoint for taskname Partition 18: tasknames.Partition
> >> 18.systems.kafka.streams.nogoalids.partitions.18 = *474222*
> >>
> >> Even after redeployment of the job, everything still seemed stuck :(
> >>
> >> Any ideas that could help me debug this will be appreciated.
> >>
> >>
> >> On Wed, Mar 16, 2016 at 4:19 PM, David Yu <david.yu@optimizely.com>
> >> wrote:
> >>
> >>> No, instead, I updated the checkpoint topic with the "upcoming"
> offsets.
> >>> (I should have done a check before that though).
> >>>
> >>> So a related question: if I delete the checkpoint topic from Kafka,
> that
> >>> would essentially clear up all the offset info and samza will be able
> to
> >>> recreate this topic with the latest offsets (e.g. smallest). Is that
> >>> correct? Just wanna find an easy way to do a "reprocess all" kind of
> >>> operation.
> >>>
> >>> Thanks.
> >>>
> >>> On Wed, Mar 16, 2016 at 3:25 PM, Navina Ramesh <
> >>> nramesh@linkedin.com.invalid> wrote:
> >>>
> >>>> Strange. I am unable to comment on the behavior because I don't know
> >>>> what
> >>>> your checkpoints looked like in the checkpoint topic.
> >>>>
> >>>> Did you try reading the checkpoint topic log ?
> >>>>
> >>>> If you setting systems.kafka.streams.nogoalids.samza.reset.offset =
> >>>> true,
> >>>> you are essentially ignoring checkpoints for that stream. Do verify
> that
> >>>> you are reading from the correct offset in the stream :)
> >>>>
> >>>> Thanks!
> >>>> Navina
> >>>>
> >>>> On Wed, Mar 16, 2016 at 3:16 PM, David Yu <david.yu@optimizely.com>
> >>>> wrote:
> >>>>
> >>>> > Finally seeing events flowing again.
> >>>> >
> >>>> > Yes, the "systems.kafka.consumer.auto.offset.reset" option is
> >>>> probably not
> >>>> > a factor here. And yes, I am using checkpointing (kafka). Not sure
> if
> >>>> the
> >>>> > offsets are messed up. But I was able to use
> >>>> > "systems.kafka.streams.nogoalids.samza.reset.offset=true" to reset
> the
> >>>> > offsets to the newest ones. After that, events started coming.
> Still,
> >>>> it is
> >>>> > unclear to me how things got stuck in the first place.
> >>>> >
> >>>> > On Wed, Mar 16, 2016 at 2:31 PM, Navina Ramesh
> >>>> > <nramesh@linkedin.com.invalid
> >>>> > > wrote:
> >>>> >
> >>>> > > HI David,
> >>>> > > This configuration you have tweaked
> >>>> > > (systems.kafka.consumer.auto.offset.reset) is honored only
when
> one
> >>>> of
> >>>> > the
> >>>> > > following condition holds:
> >>>> > > * topic doesn't exist
> >>>> > > * checkpoint is older than the maximum message history retained
by
> >>>> the
> >>>> > > brokers
> >>>> > >
> >>>> > > So, my questions are :
> >>>> > > Are you using checkpointing? If you do, you can read the
> checkpoint
> >>>> topic
> >>>> > > to see the offset that is being used to fetch data.
> >>>> > >
> >>>> > > If you are not using checkpoints, then samza uses
> >>>> > > systems.kafka.samza.offset.default to decide whether to start
> >>>> reading
> >>>> > from
> >>>> > > the earliest (oldest data) or upcoming (newest data) offset
in the
> >>>> > stream.
> >>>> > >
> >>>> > > This could explain from where your job is trying to consume
and
> you
> >>>> can
> >>>> > > cross-check with the broker.
> >>>> > > For the purpose of debugging, you can print a debug line in
> >>>> process()
> >>>> > > method to print the offset of the message you are processing
> >>>> > > (message.getOffset). Please remember to remove the debug line
> after
> >>>> > > troubleshooting. Else you risk filling up your logs.
> >>>> > >
> >>>> > > Let me know if you have more questions.
> >>>> > >
> >>>> > > Thanks!
> >>>> > > Navina
> >>>> > >
> >>>> > > On Wed, Mar 16, 2016 at 2:12 PM, David Yu <
> david.yu@optimizely.com>
> >>>> > wrote:
> >>>> > >
> >>>> > > > I'm trying to debug our samza job, which seem to be stuck
from
> >>>> > consuming
> >>>> > > > from our Kafka stream.
> >>>> > > >
> >>>> > > > Every time I redeploy the job, only the same handful
of events
> get
> >>>> > > > consumed, and then no more events get processed. I manually
> >>>> checked to
> >>>> > > make
> >>>> > > > sure the input stream is live and flowing. I also tried
both the
> >>>> > > following:
> >>>> > > >
> >>>> > > > systems.kafka.consumer.auto.offset.reset=largest
> >>>> > > > systems.kafka.consumer.auto.offset.reset=smallest
> >>>> > > >
> >>>> > > > I'm also seeing the following from the log:
> >>>> > > >
> >>>> > > > ... partitionMetadata={Partition
> >>>> > > > [partition=0]=SystemStreamPartitionMetadata
> [oldestOffset=144907,
> >>>> > > > newestOffset=202708, upcomingOffset=202709], Partition
> >>>> > > > [partition=5]=SystemStreamPartitionMetadata
> [oldestOffset=140618,
> >>>> > > > newestOffset=200521, upcomingOffset=200522], ...
> >>>> > > >
> >>>> > > >
> >>>> > > > Not sure what other ways I could diagnose this problem.
Any
> >>>> suggestion
> >>>> > is
> >>>> > > > appreciated.
> >>>> > > >
> >>>> > >
> >>>> > >
> >>>> > >
> >>>> > > --
> >>>> > > Navina R.
> >>>> > >
> >>>> >
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Navina R.
> >>>>
> >>>
> >>>
> >>
> >
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message