kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shannon Lloyd <shanl...@gmail.com>
Subject Re: How to cleanly shut down ConsumerConnector
Date Tue, 09 Dec 2014 05:17:34 GMT
Not explicitly. Some additional background might help. I'm running an
integration test using an embedded Kafka cluster and ZK quorum (ie all in
process). In my @Before method I fire up the cluster. In my @After method I
shut the cluster down. When I call shutdown on the connector during my
test, the test is actually erroring out and shutting down the cluster. BUT
even with a try catch(Throwable) around the shutdown call, nothing gets
caught, so something outside this call thread is throwing an error and
killing the test.
On 9 Dec 2014 14:45, "Jun Rao" <jun@confluent.io> wrote:

> kafka.network.Processor is on the broker. Are you killing the brokers as
> well?
> Thanks,
> Jun
> On Thu, Dec 4, 2014 at 5:33 PM, Shannon Lloyd <shanloid@gmail.com> wrote:
> > Hi,
> >
> > I am using the high-level consumer on 0.8.2-beta. I'm attempting to
> close a
> > ConsumerConnector (actually a handful of connectors), but am not having
> > much luck actually getting it to close cleanly. When I call shutdown on
> the
> > connector, I see an error in my application's log (these are always
> > IOExceptions in kafka.network.Processor - either Broken Pipe in
> > FileDispatcherImpl.write0 or else Connection reset by peer in
> > FileDispatcherImpl.read0), but the shutdown call itself does not return
> > until the socket.timeout.ms has expired (I've tested this by setting
> this
> > to all sorts of different values and confirmed that shutdown() always
> > returns after this timeout, but never before).
> >
> > I don't know if it matters, but my code that works with the connector is
> > running on a separate thread via an ExecutorService (essentially I'm
> > consuming with one thread per group/topic combination (yes, one thread
> for
> > all partitions within the topic)).
> >
> > FWIW, everything else seems to work fine - I can connect, set up the
> > KafkaStream, pull down messages etc. It's just the shutting down that
> > doesn't seem to be working. The reason I need this to work cleanly is
> that
> > my use case requires me to shut down specific connectors and re-create
> them
> > later, potentially numerous times during the running of my application. I
> > could potentially redesign things to keep each connector around after it
> is
> > no longer needed, cache it and re-use it later, but this still doesn't
> > solve the problem of how I eventually shut everything down cleanly.
> >
> > Thanks,
> > Shannon
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message