kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ankit tyagi <ankittyagi.mn...@gmail.com>
Subject Re: Potential socket leak in kafka sync producer
Date Fri, 30 Jan 2015 09:29:53 GMT
 Jaikiran,

I have already investigated that this is kafka related. I made a small
application which is used only for publishing messages to kafka. If I use
dynamic thread pool means where number of maxPoolSize  is very large
comparative to corePoolSize and I publish each batch of messages only after
all threads gets destroyed after keepAliveSeconds then FD leak problem
occurs . I suspect when threads gets destroyed some how file handlers is
not getting cleared . So when I trigger explict GC, descriptor count get
reduced by signifact amont because of clean up of those destroyed thread.


we got this problem in our production box where soft and hard limit of file
descriptor was 50000 but for reproducing this issue on my local machine i
have reduced hard limit to 6000 and used 1000 threads to send message to
 kafka (topic had 100 partition with 1 replication factor)





On Fri, Jan 30, 2015 at 2:14 PM, Jaikiran Pai <jai.forums2013@gmail.com>
wrote:

> Looking at that heap dump, this probably is a database connection/resource
> leak (298 connections?) than anything to do with Kafka. Have you
> investigated if there's any DB resource leak in the application and ruled
> out that part?
>
> -Jaikiran
>
>
> On Friday 30 January 2015 01:08 PM, ankit tyagi wrote:
>
>> I have shared object histogram after and before gc on gist
>> https://gist.github.com/ankit1987/f4a04a1350fdd609096d
>>
>> On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai <jai.forums2013@gmail.com>
>> wrote:
>>
>>  What kind of a (managed) component is that which has the @PreDestroy?
>>> Looking at the previous snippet you added, it looks like you are creating
>>> the Producer in some method? If  you are going to close the producer in a
>>> @PreDestroy of the component, then you should be creating the producer in
>>> the @PostConstruct of the same component, so that you have proper
>>> lifecycle
>>> management of those resources.
>>>
>>>
>>> -Jaikiran
>>>
>>> On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:
>>>
>>>  Hi,
>>>>
>>>> I am closing my producer at the time of shutting down my application.
>>>>
>>>> @PreDestroy
>>>>       public void stop()
>>>>       {
>>>>           LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
>>>>           if (myProducer != null) {
>>>>               myProducer.close();
>>>>           }
>>>>       }
>>>>
>>>>
>>>>
>>>> On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy <kumar@nmsworks.co.in
>>>> >
>>>> wrote:
>>>>
>>>>   Hope you are closing the producers. can you share the attachment
>>>> through
>>>>
>>>>> gist/patebin
>>>>>
>>>>> On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
>>>>> ankittyagi.mnnit@gmail.com>
>>>>> wrote:
>>>>>
>>>>>   Hi Jaikiran,
>>>>>
>>>>>> I am using ubuntu and was able to reproduce on redhat too. Please
find
>>>>>>
>>>>>>  the
>>>>>
>>>>>  more information below.
>>>>>>
>>>>>>
>>>>>> *DISTRIB_ID=Ubuntu*
>>>>>> *DISTRIB_RELEASE=12.04*
>>>>>> *DISTRIB_CODENAME=precise*
>>>>>> *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*
>>>>>>
>>>>>> *java version "1.7.0_72"*
>>>>>>
>>>>>> This is happening on client side. Output of lsof was showing that
>>>>>> maximum
>>>>>> fd were FIFO and anon. But after GC FD count was reduced
>>>>>> significantly.
>>>>>>
>>>>>> Below is my Client Code which i am using for publishing message.
>>>>>>
>>>>>>
>>>>>> * private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer;*
>>>>>>
>>>>>> * myProducer =            new Producer<>(new
>>>>>> ProducerConfig(myProducerProperties));*
>>>>>>
>>>>>> *   public void send(*
>>>>>> *        List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>>
>>>>>> msgs)*
>>>>>> *    {*
>>>>>> *        myProducer.send(msgs);*
>>>>>> *    }*
>>>>>>
>>>>>>
>>>>>> we are using sync producer. I am attaching object histo before
>>>>>>
>>>>>>  GC(histo_1)
>>>>>
>>>>>  and after GC(histo_2) in my application.
>>>>>>
>>>>>> On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai <
>>>>>> jai.forums2013@gmail.com
>>>>>> wrote:
>>>>>>
>>>>>>   Which operating system are you on and what Java version? Depending
>>>>>> on
>>>>>> the
>>>>>> OS, you could get tools (like lsof) to show which file descriptors
are
>>>>>>
>>>>>>> being held on to. Is it the client JVM which ends up with these
>>>>>>> leaks?
>>>>>>>
>>>>>>> Also, would it be possible to post a snippet of your application
code
>>>>>>> which shows how you are using the Kafka APIs?
>>>>>>>
>>>>>>> -Jaikiran
>>>>>>> On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
>>>>>>>
>>>>>>>   Hi,
>>>>>>>
>>>>>>>> Currently we are using sync producer client of 0.8.1 version
in our
>>>>>>>> production box . we are getting the following exception while
>>>>>>>>
>>>>>>>>  publishing
>>>>>>>
>>>>>> kafka message
>>>>>>
>>>>>>> *[2015-01-29
>>>>>>>> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]
>>>>>>>>
>>>>>>>>  Fetching
>>>>>>> topic metadata with correlation id 10808 for topics [Set(*
>>>>>>>
>>>>>>>> *kafka_topic_coms_FD_test1)] from broker
>>>>>>>>
>>>>>>>>  [id:0,host:localhost,port:9092]
>>>>>>>
>>>>>> failed*
>>>>>>
>>>>>>> *java.net.ConnectException: Connection refused*
>>>>>>>> *        at sun.nio.ch.Net.connect0(Native Method)*
>>>>>>>> *        at sun.nio.ch.Net.connect(Net.java:465)*
>>>>>>>> *        at sun.nio.ch.Net.connect(Net.java:457)*
>>>>>>>> *        at
>>>>>>>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
>>>>>>>>            at
>>>>>>>>
>>>>>>>>  kafka.network.BlockingChannel.connect(BlockingChannel.scala:
>>>>>>>
>>>>>> 57)
>>>>>>
>>>>>>>            at
>>>>>>>>
>>>>>>>>  kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
>>>>>>>
>>>>>>            at
>>>>>>
>>>>>>>   kafka.producer.SyncProducer.getOrMakeConnection(
>>>>>>>>
>>>>>>> SyncProducer.scala:156)
>>>>>
>>>>>             at
>>>>>>
>>>>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
>>>>>>>> doSend(SyncProducer.scala:68)
>>>>>>>>            at kafka.producer.SyncProducer.
>>>>>>>> send(SyncProducer.scala:112)
>>>>>>>>            at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>>>>>>>>            at
>>>>>>>> kafka.producer.BrokerPartitionInfo.updateInfo(
>>>>>>>> BrokerPartitionInfo.scala:82)
>>>>>>>>
>>>>>>>>
>>>>>>>> we are using dynamic thread pool to publish message to kafka.
My
>>>>>>>> observation is when after keep alive time when threads in
my
>>>>>>>> executor
>>>>>>>>
>>>>>>>>  gets
>>>>>>> destroyed, somehow file descriptor is not getting cleared but
when i
>>>>>>> did
>>>>>>>
>>>>>> explicitly ran the full gc, fd count got reduced by a signification
>>>>>>
>>>>>>> amout.
>>>>>>>
>>>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message