kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka High Level Consumer Connector shuts down after 10 seconds
Date Mon, 24 Mar 2014 20:35:27 GMT
Hi Gufran,

Does the consumer has any messages to consume during these 10 seconds?
There is also another config called "socket.timeout.ms", and if that amount
of time has elapsed when there is no data coming back through the socket,
the consumer will close automatically. From your logs it seems the consumer
just shutdown itself without any errors thrown, so I am wondering if this
would be the cause.

Guozhang


On Mon, Mar 24, 2014 at 12:44 PM, Gufran Pathan
<Gufran.Pathan@mu-sigma.com>wrote:

>  Hi,
>
>
>
> I'm facing an issue exactly similar to the one issued by someone else a
> few days ago (see below for the previous thread transcript).
>
>
>
> I'm using a High Level Consumer java program to consume messages. The
> consumer ends after 10 seconds (exactly the same time as faced by the other
> user). I've tried increasing the "zookeeper.session.timeout.ms" to
> "40000" and the  "zookeeper.sync.time.ms" to "20000" but still no
> difference. A fellow user suggested that the issue is due to GC and it
> should be tuned. Any other thoughts?
>
>
>
> Attaching the logs and the Consumer code herewith.
>
>
>
> Here's the *log info I get from the zookeeper side*:
>
>
>
> *[2014-03-24 23:45:58,371] INFO Accepted socket connection from
> /172.25.2.122:55327 <http://172.25.2.122:55327>
> (org.apache.zookeeper.server.NIOServerCnxn)*
>
> *[2014-03-24 23:45:58,375] INFO Client attempting to establish new session
> at /172.25.2.122:55327 <http://172.25.2.122:55327>
> (org.apache.zookeeper.server.NIOServerCnxn)*
>
> *[2014-03-24 23:45:58,379] INFO Established session 0x144f5342bc60007 with
> negotiated timeout 40000 for client /172.25.2.122:55327
> <http://172.25.2.122:55327> (org.apache.zookeeper.server.NIOServerCnxn)*
>
> *[2014-03-24 23:46:09,129] INFO Processed session termination for
> sessionid: 0x144f5342bc60007
> (org.apache.zookeeper.server.PrepRequestProcessor)*
>
> *[2014-03-24 23:46:09,138] WARN EndOfStreamException: Unable to read
> additional data from client sessionid 0x144f5342bc60007, likely client has
> closed socket (org.apache.zookeeper.server.NIOServerCnxn)*
>
> *[2014-03-24 23:46:09,139] INFO Closed socket connection for client
> /172.25.2.122:55327 <http://172.25.2.122:55327> which had sessionid
> 0x144f5342bc60007 (org.apache.zookeeper.server.NIOServerCnxn)*
>
>
>
> *The Consumer logs*:
>
>
>
> *INFO  2014-03-24 23:45:58,413 [main] kafka.utils.VerifiableProperties  -
> Verifying properties*
>
> *INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  -
> Property auto.commit.interval.ms <http://auto.commit.interval.ms> is
> overridden to 1000*
>
> *INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  -
> Property group.id <http://group.id> is overridden to group1*
>
> *INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  -
> Property zookeeper.connect is overridden to 172.25.1.94:2181
> <http://172.25.1.94:2181>*
>
> *INFO  2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties  -
> Property zookeeper.session.timeout.ms <http://zookeeper.session.timeout.ms>
> is overridden to 40000*
>
> *INFO  2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties  -
> Property zookeeper.sync.time.ms <http://zookeeper.sync.time.ms> is
> overridden to 20000*
>
> *INFO  2014-03-24 23:45:58,485 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Connecting to zookeeper instance
> at 172.25.1.94:2181 <http://172.25.1.94:2181>*
>
> *INFO  2014-03-24 23:45:58,493 [ZkClient-EventThread-9-172.25.1.94:2181]
> org.I0Itec.zkclient.ZkEventThread  - Starting ZkClient event thread.*
>
> *INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:zookeeper.version=3.3.1-942149, built on 05/07/2010
> 17:14 GMT*
>
> *INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:host.name <http://host.name>=LAPSZ0914.mu-sigma.local*
>
> *INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.version=1.7.0_51*
>
> *INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.vendor=Oracle Corporation*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.home=C:\Program Files\Java\jre7*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client
> environment:java.class.path=C:\Users\*****\NewWorkSpace\KafkaNew\target\classes;D:\Modules\sl4j\slf4j-1.7.6\slf4j-log4j12-1.7.6.jar;C:\Users\*****\.m2\repository\org\apache\kafka\kafka_2.9.1\0.8.0-beta1\kafka_2.9.1-0.8.0-beta1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-library\2.9.1\scala-library-2.9.1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-compiler\2.9.1\scala-compiler-2.9.1.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-annotation\2.2.0\metrics-annotation-2.2.0.jar;C:\Users\*****\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;C:\Users\*****\.m2\repository\org\apache\zookeeper\zookeeper\3.3.1\zookeeper-3.3.1.jar;C:\Users\*****\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\*****\.m2\repository\log4j\log4j\1.2.14\log4j-1.2.14.jar;C:\Users\*****\.m2\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;C:\Users\*****\.m2\repository\junit\junit\3.8.1\junit-3.8.1.jar;C:\Users\*****\.m2\repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.library.path=C:\Program
> Files\Java\jre7\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program
> Files/Java/jre7/bin/client;C:/Program Files/Java/jre7/bin;C:/Program
> Files/Java/jre7/lib/i386;C:\Python27\;C:\Python27\Scripts;C:\Program
> Files\Common Files\Microsoft Shared\Microsoft Online Services;C:\Program
> Files\RSA SecurID Token Common;C:\Program Files\Intel\iCLS
> Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program
> Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program
> Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program
> Files\Intel\OpenCL SDK\2.0\bin\x86;c:\Program Files\Microsoft SQL
> Server\100\Tools\Binn\VSShell\Common7\IDE\;c:\Program Files\Microsoft SQL
> Server\100\Tools\Binn\;c:\Program Files\Microsoft SQL
> Server\100\DTS\Binn\;C:\Program Files\SAS\SharedFiles\Formats;C:\Program
> Files\Java\jdk1.7.0_51\bin;C:\Program
> Files\apache-maven-3.0.5\bin;C:\Program Files\Git\cmd;
> D:\Modules\Storm\apache-storm-0.9.1-incubating\bin;C:\Program
> Files\GnuWin32\bin;C:\Program
> Files\sbt\\bin;C:\Users\*****\AppData\Roaming\Python\Scripts;C:\Program
> Files\Apache Software Foundation\apache-maven-3.1.1\bin;D:\Installation
> Deck\Software-2\Eclipse\eclipse-jee-juno-win32\eclipse;;.*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.io.tmpdir=C:\Users\GUFRAN~1.PAT\AppData\Local\Temp\*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:java.compiler=<NA>*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:os.name <http://os.name>=Windows 7*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:os.arch=x86*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:os.version=6.1*
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:user.name <http://user.name>=******
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:user.home=C:\Users\******
>
> *INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  -
> Client environment:user.dir=C:\Users\*****\NewWorkSpace\KafkaNew*
>
> *INFO  2014-03-24 23:45:58,501 [main] org.apache.zookeeper.ZooKeeper  -
> Initiating client connection, connectString=172.25.1.94:2181
> <http://172.25.1.94:2181> sessionTimeout=40000
> watcher=org.I0Itec.zkclient.ZkClient@1d08c1b*
>
> *INFO  2014-03-24 23:45:58,520 [main-SendThread()]
> org.apache.zookeeper.ClientCnxn  - Opening socket connection to server
> /172.25.1.94:2181 <http://172.25.1.94:2181>*
>
> *INFO  2014-03-24 23:45:58,526 [main-SendThread(vm.centos.com:2181
> <http://vm.centos.com:2181>)] org.apache.zookeeper.ClientCnxn  - Socket
> connection established to vm.centos.com/172.25.1.94:2181
> <http://vm.centos.com/172.25.1.94:2181>, initiating session*
>
> *INFO  2014-03-24 23:45:58,538 [main-SendThread(vm.centos.com:2181
> <http://vm.centos.com:2181>)] org.apache.zookeeper.ClientCnxn  - Session
> establishment complete on server vm.centos.com/172.25.1.94:2181
> <http://vm.centos.com/172.25.1.94:2181>, sessionid = 0x144f5342bc60007,
> negotiated timeout = 40000*
>
> *INFO  2014-03-24 23:45:58,540 [main-EventThread]
> org.I0Itec.zkclient.ZkClient  - zookeeper state changed (SyncConnected)*
>
> *INFO  2014-03-24 23:45:58,559 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], starting auto committer every
> 1000 ms*
>
> *INFO  2014-03-24 23:45:58,656 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], begin registering consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a in ZK*
>
> *INFO  2014-03-24 23:45:58,680 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], end registering consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a in ZK*
>
> *INFO  2014-03-24 23:45:58,682
> [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], starting watcher executor thread
> for consumer group1_LAPSZ0914-1395684958481-7fc24c0a*
>
> *INFO  2014-03-24 23:45:58,710 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], begin rebalancing consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a try #0*
>
> *INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  -
> Verifying properties*
>
> *INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  -
> Property metadata.broker.list is overridden to vm:9092*
>
> *INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  -
> Property request.timeout.ms <http://request.timeout.ms> is overridden to
> 30000*
>
> *INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  -
> Property client.id <http://client.id> is overridden to group1*
>
> *INFO  2014-03-24 23:45:58,888 [main] kafka.client.ClientUtils$  -
> Fetching metadata from broker id:0,host:vm,port:9092 with correlation id 0
> for 1 topic(s) Set(partitioned)*
>
> *INFO  2014-03-24 23:45:58,893 [main] kafka.producer.SyncProducer  -
> Connected to vm:9092 for producing*
>
> *INFO  2014-03-24 23:45:58,914 [main] kafka.producer.SyncProducer  -
> Disconnecting from vm:9092*
>
> *INFO  2014-03-24 23:45:58,924 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Stopping leader finder thread*
>
> *INFO  2014-03-24 23:45:58,925 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Stopping all fetchers*
>
> *INFO  2014-03-24 23:45:58,926 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] All connections stopped*
>
> *INFO  2014-03-24 23:45:58,927 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Cleared all relevant queues for
> this fetcher*
>
> *INFO  2014-03-24 23:45:58,928 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Cleared the data chunks in all
> the consumer message iterators*
>
> *INFO  2014-03-24 23:45:58,928 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Committing all offsets after
> clearing the fetcher queues*
>
> *INFO  2014-03-24 23:45:58,929 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Releasing partition ownership*
>
> *INFO  2014-03-24 23:45:58,932 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a rebalancing the following
> partitions: ArrayBuffer(0, 1, 2, 3, 4) for topic partitioned with
> consumers: List(group1_LAPSZ0914-1395684958481-7fc24c0a-0,
> group1_LAPSZ0914-1395684958481-7fc24c0a-1,
> group1_LAPSZ0914-1395684958481-7fc24c0a-2,
> group1_LAPSZ0914-1395684958481-7fc24c0a-3,
> group1_LAPSZ0914-1395684958481-7fc24c0a-4)*
>
> *INFO  2014-03-24 23:45:58,934 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-4 attempting to claim partition 4*
>
> *INFO  2014-03-24 23:45:58,939 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-3 attempting to claim partition 3*
>
> *INFO  2014-03-24 23:45:58,942 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-1 attempting to claim partition 1*
>
> *INFO  2014-03-24 23:45:58,949 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-2 attempting to claim partition 2*
>
> *INFO  2014-03-24 23:45:58,953 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-0 attempting to claim partition 0*
>
> *INFO  2014-03-24 23:45:58,967 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-1 successfully owned partition 1
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,970 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-2 successfully owned partition 2
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,973 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-0 successfully owned partition 0
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,977 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-4 successfully owned partition 4
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,980 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a],
> group1_LAPSZ0914-1395684958481-7fc24c0a-3 successfully owned partition 3
> for topic partitioned*
>
> *INFO  2014-03-24 23:45:58,981 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Updating the cache*
>
> *INFO  2014-03-24 23:45:58,984 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], Consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a selected partitions :
> partitioned:0: fetched offset = 1939: consumed offset = 1939,partitioned:1:
> fetched offset = 1969: consumed offset = 1969,partitioned:2: fetched offset
> = 30917: consumed offset = 30917,partitioned:3: fetched offset = 52147:
> consumed offset = 52147,partitioned:4: fetched offset = 1876: consumed
> offset = 1876*
>
> *INFO  2014-03-24 23:45:58,988 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], end rebalancing consumer
> group1_LAPSZ0914-1395684958481-7fc24c0a try #0*
>
> *INFO  2014-03-24 23:45:58,989
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Starting *
>
> * INFO  2014-03-24 23:45:59,000
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.utils.VerifiableProperties  - Verifying properties*
>
> *INFO  2014-03-24 23:45:59,000
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.utils.VerifiableProperties  - Property metadata.broker.list is
> overridden to vm:9092*
>
> *INFO  2014-03-24 23:45:59,000
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.utils.VerifiableProperties  - Property request.timeout.ms
> <http://request.timeout.ms> is overridden to 30000*
>
> *INFO  2014-03-24 23:45:59,000
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.utils.VerifiableProperties  - Property client.id <http://client.id>
> is overridden to group1*
>
> *INFO  2014-03-24 23:45:59,001
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.client.ClientUtils$  - Fetching metadata from broker
> id:0,host:vm,port:9092 with correlation id 0 for 1 topic(s)
> Set(partitioned)*
>
> *INFO  2014-03-24 23:45:59,004
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.producer.SyncProducer  - Connected to vm:9092 for producing*
>
> *INFO  2014-03-24 23:45:59,043
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.producer.SyncProducer  - Disconnecting from vm:9092*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,2], initOffset 30917 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,3], initOffset 52147 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,1], initOffset 1969 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,0], initOffset 1939 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,056
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Adding fetcher for partition
> [partitioned,4], initOffset 1876 to broker 0 with fetcherId 0*
>
> *INFO  2014-03-24 23:45:59,058
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
> kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
> Starting *
>
> * INFO  2014-03-24 23:46:08,991 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], ZKConsumerConnector shutting
> down*
>
> *INFO  2014-03-24 23:46:08,992 [main] kafka.utils.KafkaScheduler  -
> Forcing shutdown of Kafka scheduler*
>
> *INFO  2014-03-24 23:46:08,992 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Stopping leader finder thread*
>
> *INFO  2014-03-24 23:46:08,992 [main]
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutting
> down*
>
> *INFO  2014-03-24 23:46:08,993
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Stopped *
>
> * INFO  2014-03-24 23:46:08,993 [main]
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutdown
> completed*
>
> *INFO  2014-03-24 23:46:08,993 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] Stopping all fetchers*
>
> *INFO  2014-03-24 23:46:08,993 [main]
> kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
> Shutting down*
>
> *INFO  2014-03-24 23:46:09,257
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
> kafka.consumer.SimpleConsumer  - Reconnect due to socket error: *
>
> * java.nio.channels.ClosedByInterruptException*
>
> *                at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)*
>
> *                at sun.nio.ch.SocketChannelImpl.read(Unknown Source)*
>
> *                at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(Unknown Source)*
>
> *                at sun.nio.ch.ChannelInputStream.read(Unknown Source)*
>
> *                at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)*
>
> *                at kafka.utils.Utils$.read(Utils.scala:394)*
>
> *                at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)*
>
> *                at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)*
>
> *                at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)*
>
> *                at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)*
>
> *                at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)*
>
> *                at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)*
>
> *                at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)*
>
> *                at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)*
>
> *                at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)*
>
> *                at
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)*
>
> *                at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)*
>
> *                at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)*
>
> *                at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)*
>
> *INFO  2014-03-24 23:46:09,259
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
> kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
> Stopped *
>
> * INFO  2014-03-24 23:46:09,259 [main]
> kafka.consumer.ConsumerFetcherThread  -
> [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
> Shutdown completed*
>
> *INFO  2014-03-24 23:46:09,259 [main]
> kafka.consumer.ConsumerFetcherManager  -
> [ConsumerFetcherManager-1395684958545] All connections stopped*
>
> *INFO  2014-03-24 23:46:09,283 [ZkClient-EventThread-9-172.25.1.94:2181]
> org.I0Itec.zkclient.ZkEventThread  - Terminate ZkClient event thread.*
>
> *INFO  2014-03-24 23:46:09,293 [main] org.apache.zookeeper.ZooKeeper  -
> Session: 0x144f5342bc60007 closed*
>
> *INFO  2014-03-24 23:46:09,294 [main]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], ZKConsumerConnector shut down
> completed*
>
> *INFO  2014-03-24 23:46:09,694
> [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor]
> kafka.consumer.ZookeeperConsumerConnector  -
> [group1_LAPSZ0914-1395684958481-7fc24c0a], stopping watcher executor thread
> for consumer group1_LAPSZ0914-1395684958481-7fc24c0a*
>
>
>
> *My Consumer code*:
>
>
>
> *import kafka.consumer.ConsumerConfig;*
>
> *import kafka.consumer.KafkaStream;*
>
> *import kafka.javaapi.consumer.ConsumerConnector;*
>
>  *import java.util.HashMap;*
>
> *import java.util.List;*
>
> *import java.util.Map;*
>
> *import java.util.Properties;*
>
> *import java.util.concurrent.ExecutorService;*
>
> *import java.util.concurrent.Executors;*
>
>
>
> *public class TestConsumer {*
>
> *                private final ConsumerConnector consumer;*
>
> *                    private final String topic;*
>
> *                    private  ExecutorService executor;*
>
>
>
> *                    public TestConsumer(String a_zookeeper, String
> a_groupId, String a_topic) {*
>
> *                        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(*
>
> *                                createConsumerConfig(a_zookeeper,
> a_groupId));*
>
> *                        this.topic = a_topic;*
>
> *                    }*
>
>
>
> *                    public void shutdown() {*
>
> *                        if (consumer != null) consumer.shutdown();*
>
> *                        if (executor != null) executor.shutdown();*
>
> *                    }*
>
>
>
> *                    public void run(int a_numThreads) {*
>
> *                        Map<String, Integer> topicCountMap = new
> HashMap<String, Integer>();*
>
> *                        topicCountMap.put(topic, new
> Integer(a_numThreads));*
>
> *                        Map<String, List<KafkaStream<byte[], byte[]>>>
> consumerMap = consumer.createMessageStreams(topicCountMap);*
>
> *                        List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(topic);*
>
>
>
> *                        // now launch all the threads*
>
> *                        //*
>
> *                        executor =
> Executors.newFixedThreadPool(a_numThreads);*
>
>
>
> *                        // now create an object to consume the messages*
>
> *                        //*
>
> *                        int threadNumber = 0;*
>
> *                        for (final KafkaStream stream : streams) {*
>
> *                            executor.submit(new ConsumerTest(stream,
> threadNumber));*
>
> *                            threadNumber++;*
>
> *                        }*
>
> *                    }*
>
>
>
> *                    private static ConsumerConfig
> createConsumerConfig(String a_zookeeper, String a_groupId) {*
>
> *                        Properties props = new Properties();*
>
> *                        props.put("zookeeper.connect", a_zookeeper);*
>
> *                        props.put("group.id <http://group.id>",
> a_groupId);*
>
> *                        props.put("zookeeper.session.timeout.ms
> <http://zookeeper.session.timeout.ms>", "40000");*
>
> *                        props.put("zookeeper.sync.time.ms
> <http://zookeeper.sync.time.ms>", "20000");*
>
> *                        props.put("auto.commit.interval.ms
> <http://auto.commit.interval.ms>", "1000");*
>
>
>
> *                        return new ConsumerConfig(props);*
>
> *                    }*
>
>
>
> *                    public static void main(String[] args) {*
>
> *                        String zooKeeper = "172.25.1.94:2181
> <http://172.25.1.94:2181>";*
>
> *                        String groupId = "group1";*
>
> *                        String topic = "partitioned";*
>
> *                        int threads = 5;*
>
>
>
> *                        TestConsumer example = new
> TestConsumer(zooKeeper, groupId, topic);*
>
> *                        example.run(threads);*
>
>
>
> *                        try {*
>
> *                            Thread.sleep(10000);*
>
> *                        } catch (InterruptedException ie) {*
>
>
>
> *                        }*
>
> *                        example.shutdown();*
>
> *                    }*
>
>
>
>
>
> *}*
>
>
>
>
>
> *From*
>
> Neha Narkhede <neha.narkh...@gmail.com>
>
> *Subject*
>
> Re: Kafka High Level Consumer Connector shuts down after 10 seconds
>
> *Date*
>
> Mon, 10 Mar 2014 16:48:31 GMT
>
> Session termination can happen either when client or zookeeper process
>
> pauses (due to GC) or when the client process terminates. A sustainable
>
> solution is to tune GC settings. For now, you can try increasing the
>
> zookeeper.session.timeout.ms.
>
>
>
>
>
>
>
>
>
> On Sun, Mar 9, 2014 at 3:44 PM, Ameya Bhagat <ameya.bhagat@gmail.com>
> wrote:
>
>
>
> > I am using a high level consumer as described at:
>
> > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> >
>
> > I am noticing that my consumer does not run forever and ends after some
>
> > time (< 15s). At the zookeeper side, I see the following:
>
> >
>
> > INFO Processed session termination for sessionid: 0x144a4854325004d
>
> > (org.apache.zookeeper.server.PrepRequestProcessor)
>
> > INFO Closed socket connection for client /127.0.0.1:59899 which had
>
> > sessionid 0x144a4854325004d (org.apache.zookeeper.server.NIOServerCnxn)
>
> >
>
> > I am using default configurations. How do I make my consumer listen
>
> > forever?
>
> >
>
> > Thanks
>
> > Ameya
>
> >
>
>
>
>
>
> --
>
> Thanks & Regards,
>
> Gufran Pathan *| *+91-9566811502 *| *www.mu-sigma.com *|*
>
>
>  Disclaimer: http://www.mu-sigma.com/disclaimer.html
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message