kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ankit tyagi <ankittyagi.mn...@gmail.com>
Subject Re: Potential socket leak in kafka sync producer
Date Fri, 30 Jan 2015 07:38:01 GMT
I have shared object histogram after and before gc on gist
https://gist.github.com/ankit1987/f4a04a1350fdd609096d

On Fri, Jan 30, 2015 at 12:43 PM, Jaikiran Pai <jai.forums2013@gmail.com>
wrote:

> What kind of a (managed) component is that which has the @PreDestroy?
> Looking at the previous snippet you added, it looks like you are creating
> the Producer in some method? If  you are going to close the producer in a
> @PreDestroy of the component, then you should be creating the producer in
> the @PostConstruct of the same component, so that you have proper lifecycle
> management of those resources.
>
>
> -Jaikiran
>
> On Friday 30 January 2015 12:20 PM, ankit tyagi wrote:
>
>> Hi,
>>
>> I am closing my producer at the time of shutting down my application.
>>
>> @PreDestroy
>>      public void stop()
>>      {
>>          LOG.info("Stopping Kafka Producer for topic: {}", myTopic);
>>          if (myProducer != null) {
>>              myProducer.close();
>>          }
>>      }
>>
>>
>>
>> On Fri, Jan 30, 2015 at 11:22 AM, Manikumar Reddy <kumar@nmsworks.co.in>
>> wrote:
>>
>>  Hope you are closing the producers. can you share the attachment through
>>> gist/patebin
>>>
>>> On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi <
>>> ankittyagi.mnnit@gmail.com>
>>> wrote:
>>>
>>>  Hi Jaikiran,
>>>>
>>>> I am using ubuntu and was able to reproduce on redhat too. Please find
>>>>
>>> the
>>>
>>>> more information below.
>>>>
>>>>
>>>> *DISTRIB_ID=Ubuntu*
>>>> *DISTRIB_RELEASE=12.04*
>>>> *DISTRIB_CODENAME=precise*
>>>> *DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"*
>>>>
>>>> *java version "1.7.0_72"*
>>>>
>>>> This is happening on client side. Output of lsof was showing that
>>>> maximum
>>>> fd were FIFO and anon. But after GC FD count was reduced significantly.
>>>>
>>>> Below is my Client Code which i am using for publishing message.
>>>>
>>>>
>>>> * private Producer<KafkaPartitionKey, KafkaEventWrapper> myProducer;*
>>>>
>>>> * myProducer =            new Producer<>(new
>>>> ProducerConfig(myProducerProperties));*
>>>>
>>>> *   public void send(*
>>>> *        List<KeyedMessage<KafkaPartitionKey, KafkaEventWrapper>>
>>>> msgs)*
>>>> *    {*
>>>> *        myProducer.send(msgs);*
>>>> *    }*
>>>>
>>>>
>>>> we are using sync producer. I am attaching object histo before
>>>>
>>> GC(histo_1)
>>>
>>>> and after GC(histo_2) in my application.
>>>>
>>>> On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai <jai.forums2013@gmail.com
>>>> >
>>>> wrote:
>>>>
>>>>  Which operating system are you on and what Java version? Depending on
>>>>>
>>>> the
>>>
>>>> OS, you could get tools (like lsof) to show which file descriptors are
>>>>> being held on to. Is it the client JVM which ends up with these leaks?
>>>>>
>>>>> Also, would it be possible to post a snippet of your application code
>>>>> which shows how you are using the Kafka APIs?
>>>>>
>>>>> -Jaikiran
>>>>> On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
>>>>>
>>>>>  Hi,
>>>>>>
>>>>>> Currently we are using sync producer client of 0.8.1 version in our
>>>>>> production box . we are getting the following exception while
>>>>>>
>>>>> publishing
>>>
>>>> kafka message
>>>>>>
>>>>>> *[2015-01-29
>>>>>> 13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]
>>>>>>
>>>>> Fetching
>>>>
>>>>> topic metadata with correlation id 10808 for topics [Set(*
>>>>>> *kafka_topic_coms_FD_test1)] from broker
>>>>>>
>>>>> [id:0,host:localhost,port:9092]
>>>
>>>> failed*
>>>>>> *java.net.ConnectException: Connection refused*
>>>>>> *        at sun.nio.ch.Net.connect0(Native Method)*
>>>>>> *        at sun.nio.ch.Net.connect(Net.java:465)*
>>>>>> *        at sun.nio.ch.Net.connect(Net.java:457)*
>>>>>> *        at
>>>>>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
>>>>>>           at
>>>>>>
>>>>> kafka.network.BlockingChannel.connect(BlockingChannel.scala:
>>>
>>>> 57)
>>>>>>           at
>>>>>>
>>>>> kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
>>>
>>>>           at
>>>>>>
>>>>>>  kafka.producer.SyncProducer.getOrMakeConnection(
>>> SyncProducer.scala:156)
>>>
>>>>           at
>>>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
>>>>>> doSend(SyncProducer.scala:68)
>>>>>>           at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>>>>>>           at
>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>>>>>>           at
>>>>>> kafka.producer.BrokerPartitionInfo.updateInfo(
>>>>>> BrokerPartitionInfo.scala:82)
>>>>>>
>>>>>>
>>>>>> we are using dynamic thread pool to publish message to kafka. My
>>>>>> observation is when after keep alive time when threads in my executor
>>>>>>
>>>>> gets
>>>>
>>>>> destroyed, somehow file descriptor is not getting cleared but when i
>>>>>>
>>>>> did
>>>
>>>> explicitly ran the full gc, fd count got reduced by a signification
>>>>>>
>>>>> amout.
>>>>
>>>>>
>>>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message