kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gufran Pathan <Gufran.Pat...@mu-sigma.com>
Subject Kafka High Level Consumer Connector shuts down after 10 seconds
Date Mon, 24 Mar 2014 19:44:01 GMT
Hi,

I'm facing an issue exactly similar to the one issued by someone else a few days ago (see
below for the previous thread transcript).

I'm using a High Level Consumer java program to consume messages. The consumer ends after
10 seconds (exactly the same time as faced by the other user). I've tried increasing the "zookeeper.session.timeout.ms"
to "40000" and the  "zookeeper.sync.time.ms" to "20000" but still no difference. A fellow
user suggested that the issue is due to GC and it should be tuned. Any other thoughts?

Attaching the logs and the Consumer code herewith.

Here's the log info I get from the zookeeper side:

[2014-03-24 23:45:58,371] INFO Accepted socket connection from /172.25.2.122:55327 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-03-24 23:45:58,375] INFO Client attempting to establish new session at /172.25.2.122:55327
(org.apache.zookeeper.server.NIOServerCnxn)
[2014-03-24 23:45:58,379] INFO Established session 0x144f5342bc60007 with negotiated timeout
40000 for client /172.25.2.122:55327 (org.apache.zookeeper.server.NIOServerCnxn)
[2014-03-24 23:46:09,129] INFO Processed session termination for sessionid: 0x144f5342bc60007
(org.apache.zookeeper.server.PrepRequestProcessor)
[2014-03-24 23:46:09,138] WARN EndOfStreamException: Unable to read additional data from client
sessionid 0x144f5342bc60007, likely client has closed socket (org.apache.zookeeper.server.NIOServerCnxn)
[2014-03-24 23:46:09,139] INFO Closed socket connection for client /172.25.2.122:55327 which
had sessionid 0x144f5342bc60007 (org.apache.zookeeper.server.NIOServerCnxn)

The Consumer logs:

INFO  2014-03-24 23:45:58,413 [main] kafka.utils.VerifiableProperties  - Verifying properties
INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  - Property auto.commit.interval.ms
is overridden to 1000
INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  - Property group.id
is overridden to group1
INFO  2014-03-24 23:45:58,435 [main] kafka.utils.VerifiableProperties  - Property zookeeper.connect
is overridden to 172.25.1.94:2181
INFO  2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties  - Property zookeeper.session.timeout.ms
is overridden to 40000
INFO  2014-03-24 23:45:58,436 [main] kafka.utils.VerifiableProperties  - Property zookeeper.sync.time.ms
is overridden to 20000
INFO  2014-03-24 23:45:58,485 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
Connecting to zookeeper instance at 172.25.1.94:2181
INFO  2014-03-24 23:45:58,493 [ZkClient-EventThread-9-172.25.1.94:2181] org.I0Itec.zkclient.ZkEventThread
 - Starting ZkClient event thread.
INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  - Client environment:zookeeper.version=3.3.1-942149,
built on 05/07/2010 17:14 GMT
INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  - Client environment:host.name=LAPSZ0914.mu-sigma.local
INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.7.0_51
INFO  2014-03-24 23:45:58,499 [main] org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle
Corporation
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:java.home=C:\Program
Files\Java\jre7
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:java.class.path=C:\Users\*****\NewWorkSpace\KafkaNew\target\classes;D:\Modules\sl4j\slf4j-1.7.6\slf4j-log4j12-1.7.6.jar;C:\Users\*****\.m2\repository\org\apache\kafka\kafka_2.9.1\0.8.0-beta1\kafka_2.9.1-0.8.0-beta1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-library\2.9.1\scala-library-2.9.1.jar;C:\Users\*****\.m2\repository\org\scala-lang\scala-compiler\2.9.1\scala-compiler-2.9.1.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\*****\.m2\repository\com\yammer\metrics\metrics-annotation\2.2.0\metrics-annotation-2.2.0.jar;C:\Users\*****\.m2\repository\com\101tec\zkclient\0.3\zkclient-0.3.jar;C:\Users\*****\.m2\repository\org\apache\zookeeper\zookeeper\3.3.1\zookeeper-3.3.1.jar;C:\Users\*****\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\*****\.m2\repository\log4j\log4j\1.2.14\log4j-1.2.14.jar;C:\Users\*****\.m2\repository\net\sf\jopt-simple\jopt-simple\3.2\jopt-simple-3.2.jar;C:\Users\*****\.m2\repository\junit\junit\3.8.1\junit-3.8.1.jar;C:\Users\*****\.m2\repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:java.library.path=C:\Program
Files\Java\jre7\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program Files/Java/jre7/bin/client;C:/Program
Files/Java/jre7/bin;C:/Program Files/Java/jre7/lib/i386;C:\Python27\;C:\Python27\Scripts;C:\Program
Files\Common Files\Microsoft Shared\Microsoft Online Services;C:\Program Files\RSA SecurID
Token Common;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program
Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management
Engine Components\IPT;C:\Program Files\Intel\OpenCL SDK\2.0\bin\x86;c:\Program Files\Microsoft
SQL Server\100\Tools\Binn\VSShell\Common7\IDE\;c:\Program Files\Microsoft SQL Server\100\Tools\Binn\;c:\Program
Files\Microsoft SQL Server\100\DTS\Binn\;C:\Program Files\SAS\SharedFiles\Formats;C:\Program
Files\Java\jdk1.7.0_51\bin;C:\Program Files\apache-maven-3.0.5\bin;C:\Program Files\Git\cmd;
D:\Modules\Storm\apache-storm-0.9.1-incubating\bin;C:\Program Files\GnuWin32\bin;C:\Program
Files\sbt\\bin;C:\Users\*****\AppData\Roaming\Python\Scripts;C:\Program Files\Apache Software
Foundation\apache-maven-3.1.1\bin;D:\Installation Deck\Software-2\Eclipse\eclipse-jee-juno-win32\eclipse;;.
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:java.io.tmpdir=C:\Users\GUFRAN~1.PAT\AppData\Local\Temp\
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:java.compiler=<NA>
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:os.name=Windows
7
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:os.arch=x86
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:os.version=6.1
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:user.name=*****
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:user.home=C:\Users\*****
INFO  2014-03-24 23:45:58,500 [main] org.apache.zookeeper.ZooKeeper  - Client environment:user.dir=C:\Users\*****\NewWorkSpace\KafkaNew
INFO  2014-03-24 23:45:58,501 [main] org.apache.zookeeper.ZooKeeper  - Initiating client connection,
connectString=172.25.1.94:2181 sessionTimeout=40000 watcher=org.I0Itec.zkclient.ZkClient@1d08c1b
INFO  2014-03-24 23:45:58,520 [main-SendThread()] org.apache.zookeeper.ClientCnxn  - Opening
socket connection to server /172.25.1.94:2181
INFO  2014-03-24 23:45:58,526 [main-SendThread(vm.centos.com:2181)] org.apache.zookeeper.ClientCnxn
 - Socket connection established to vm.centos.com/172.25.1.94:2181, initiating session
INFO  2014-03-24 23:45:58,538 [main-SendThread(vm.centos.com:2181)] org.apache.zookeeper.ClientCnxn
 - Session establishment complete on server vm.centos.com/172.25.1.94:2181, sessionid = 0x144f5342bc60007,
negotiated timeout = 40000
INFO  2014-03-24 23:45:58,540 [main-EventThread] org.I0Itec.zkclient.ZkClient  - zookeeper
state changed (SyncConnected)
INFO  2014-03-24 23:45:58,559 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
starting auto committer every 1000 ms
INFO  2014-03-24 23:45:58,656 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
begin registering consumer group1_LAPSZ0914-1395684958481-7fc24c0a in ZK
INFO  2014-03-24 23:45:58,680 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
end registering consumer group1_LAPSZ0914-1395684958481-7fc24c0a in ZK
INFO  2014-03-24 23:45:58,682 [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor] kafka.consumer.ZookeeperConsumerConnector
 - [group1_LAPSZ0914-1395684958481-7fc24c0a], starting watcher executor thread for consumer
group1_LAPSZ0914-1395684958481-7fc24c0a
INFO  2014-03-24 23:45:58,710 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
begin rebalancing consumer group1_LAPSZ0914-1395684958481-7fc24c0a try #0
INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  - Verifying properties
INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  - Property metadata.broker.list
is overridden to vm:9092
INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  - Property request.timeout.ms
is overridden to 30000
INFO  2014-03-24 23:45:58,867 [main] kafka.utils.VerifiableProperties  - Property client.id
is overridden to group1
INFO  2014-03-24 23:45:58,888 [main] kafka.client.ClientUtils$  - Fetching metadata from broker
id:0,host:vm,port:9092 with correlation id 0 for 1 topic(s) Set(partitioned)
INFO  2014-03-24 23:45:58,893 [main] kafka.producer.SyncProducer  - Connected to vm:9092 for
producing
INFO  2014-03-24 23:45:58,914 [main] kafka.producer.SyncProducer  - Disconnecting from vm:9092
INFO  2014-03-24 23:45:58,924 [main] kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545]
Stopping leader finder thread
INFO  2014-03-24 23:45:58,925 [main] kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545]
Stopping all fetchers
INFO  2014-03-24 23:45:58,926 [main] kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545]
All connections stopped
INFO  2014-03-24 23:45:58,927 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
Cleared all relevant queues for this fetcher
INFO  2014-03-24 23:45:58,928 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
Cleared the data chunks in all the consumer message iterators
INFO  2014-03-24 23:45:58,928 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
Committing all offsets after clearing the fetcher queues
INFO  2014-03-24 23:45:58,929 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
Releasing partition ownership
INFO  2014-03-24 23:45:58,932 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
Consumer group1_LAPSZ0914-1395684958481-7fc24c0a rebalancing the following partitions: ArrayBuffer(0,
1, 2, 3, 4) for topic partitioned with consumers: List(group1_LAPSZ0914-1395684958481-7fc24c0a-0,
group1_LAPSZ0914-1395684958481-7fc24c0a-1, group1_LAPSZ0914-1395684958481-7fc24c0a-2, group1_LAPSZ0914-1395684958481-7fc24c0a-3,
group1_LAPSZ0914-1395684958481-7fc24c0a-4)
INFO  2014-03-24 23:45:58,934 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-4 attempting to claim partition 4
INFO  2014-03-24 23:45:58,939 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-3 attempting to claim partition 3
INFO  2014-03-24 23:45:58,942 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-1 attempting to claim partition 1
INFO  2014-03-24 23:45:58,949 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-2 attempting to claim partition 2
INFO  2014-03-24 23:45:58,953 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-0 attempting to claim partition 0
INFO  2014-03-24 23:45:58,967 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-1 successfully owned partition 1 for topic partitioned
INFO  2014-03-24 23:45:58,970 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-2 successfully owned partition 2 for topic partitioned
INFO  2014-03-24 23:45:58,973 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-0 successfully owned partition 0 for topic partitioned
INFO  2014-03-24 23:45:58,977 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-4 successfully owned partition 4 for topic partitioned
INFO  2014-03-24 23:45:58,980 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
group1_LAPSZ0914-1395684958481-7fc24c0a-3 successfully owned partition 3 for topic partitioned
INFO  2014-03-24 23:45:58,981 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
Updating the cache
INFO  2014-03-24 23:45:58,984 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
Consumer group1_LAPSZ0914-1395684958481-7fc24c0a selected partitions : partitioned:0: fetched
offset = 1939: consumed offset = 1939,partitioned:1: fetched offset = 1969: consumed offset
= 1969,partitioned:2: fetched offset = 30917: consumed offset = 30917,partitioned:3: fetched
offset = 52147: consumed offset = 52147,partitioned:4: fetched offset = 1876: consumed offset
= 1876
INFO  2014-03-24 23:45:58,988 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
end rebalancing consumer group1_LAPSZ0914-1395684958481-7fc24c0a try #0
INFO  2014-03-24 23:45:58,989 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  - [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread],
Starting
 INFO  2014-03-24 23:45:59,000 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.utils.VerifiableProperties  - Verifying properties
INFO  2014-03-24 23:45:59,000 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.utils.VerifiableProperties  - Property metadata.broker.list is overridden to vm:9092
INFO  2014-03-24 23:45:59,000 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.utils.VerifiableProperties  - Property request.timeout.ms is overridden to 30000
INFO  2014-03-24 23:45:59,000 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.utils.VerifiableProperties  - Property client.id is overridden to group1
INFO  2014-03-24 23:45:59,001 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.client.ClientUtils$  - Fetching metadata from broker id:0,host:vm,port:9092 with correlation
id 0 for 1 topic(s) Set(partitioned)
INFO  2014-03-24 23:45:59,004 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.producer.SyncProducer  - Connected to vm:9092 for producing
INFO  2014-03-24 23:45:59,043 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.producer.SyncProducer  - Disconnecting from vm:9092
INFO  2014-03-24 23:45:59,056 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545] Adding fetcher
for partition [partitioned,2], initOffset 30917 to broker 0 with fetcherId 0
INFO  2014-03-24 23:45:59,056 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545] Adding fetcher
for partition [partitioned,3], initOffset 52147 to broker 0 with fetcherId 0
INFO  2014-03-24 23:45:59,056 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545] Adding fetcher
for partition [partitioned,1], initOffset 1969 to broker 0 with fetcherId 0
INFO  2014-03-24 23:45:59,056 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545] Adding fetcher
for partition [partitioned,0], initOffset 1939 to broker 0 with fetcherId 0
INFO  2014-03-24 23:45:59,056 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545] Adding fetcher
for partition [partitioned,4], initOffset 1876 to broker 0 with fetcherId 0
INFO  2014-03-24 23:45:59,058 [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
kafka.consumer.ConsumerFetcherThread  - [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
Starting
 INFO  2014-03-24 23:46:08,991 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
ZKConsumerConnector shutting down
INFO  2014-03-24 23:46:08,992 [main] kafka.utils.KafkaScheduler  - Forcing shutdown of Kafka
scheduler
INFO  2014-03-24 23:46:08,992 [main] kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545]
Stopping leader finder thread
INFO  2014-03-24 23:46:08,992 [main] kafka.consumer.ConsumerFetcherManager$LeaderFinderThread
 - [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutting down
INFO  2014-03-24 23:46:08,993 [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread]
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread  - [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread],
Stopped
 INFO  2014-03-24 23:46:08,993 [main] kafka.consumer.ConsumerFetcherManager$LeaderFinderThread
 - [group1_LAPSZ0914-1395684958481-7fc24c0a-leader-finder-thread], Shutdown completed
INFO  2014-03-24 23:46:08,993 [main] kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545]
Stopping all fetchers
INFO  2014-03-24 23:46:08,993 [main] kafka.consumer.ConsumerFetcherThread  - [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
Shutting down
INFO  2014-03-24 23:46:09,257 [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
 java.nio.channels.ClosedByInterruptException
                at java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source)
                at sun.nio.ch.SocketChannelImpl.read(Unknown Source)
                at sun.nio.ch.SocketAdaptor$SocketInputStream.read(Unknown Source)
                at sun.nio.ch.ChannelInputStream.read(Unknown Source)
                at java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source)
                at kafka.utils.Utils$.read(Utils.scala:394)
                at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
                at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
                at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
                at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
                at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
                at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
                at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
                at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
                at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
                at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
                at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
                at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
                at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
                at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
                at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
                at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
                at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
                at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
INFO  2014-03-24 23:46:09,259 [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0]
kafka.consumer.ConsumerFetcherThread  - [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
Stopped
 INFO  2014-03-24 23:46:09,259 [main] kafka.consumer.ConsumerFetcherThread  - [ConsumerFetcherThread-group1_LAPSZ0914-1395684958481-7fc24c0a-0-0],
Shutdown completed
INFO  2014-03-24 23:46:09,259 [main] kafka.consumer.ConsumerFetcherManager  - [ConsumerFetcherManager-1395684958545]
All connections stopped
INFO  2014-03-24 23:46:09,283 [ZkClient-EventThread-9-172.25.1.94:2181] org.I0Itec.zkclient.ZkEventThread
 - Terminate ZkClient event thread.
INFO  2014-03-24 23:46:09,293 [main] org.apache.zookeeper.ZooKeeper  - Session: 0x144f5342bc60007
closed
INFO  2014-03-24 23:46:09,294 [main] kafka.consumer.ZookeeperConsumerConnector  - [group1_LAPSZ0914-1395684958481-7fc24c0a],
ZKConsumerConnector shut down completed
INFO  2014-03-24 23:46:09,694 [group1_LAPSZ0914-1395684958481-7fc24c0a_watcher_executor] kafka.consumer.ZookeeperConsumerConnector
 - [group1_LAPSZ0914-1395684958481-7fc24c0a], stopping watcher executor thread for consumer
group1_LAPSZ0914-1395684958481-7fc24c0a

My Consumer code:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestConsumer {
                private final ConsumerConnector consumer;
                    private final String topic;
                    private  ExecutorService executor;

                    public TestConsumer(String a_zookeeper, String a_groupId, String a_topic)
{
                        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                                createConsumerConfig(a_zookeeper, a_groupId));
                        this.topic = a_topic;
                    }

                    public void shutdown() {
                        if (consumer != null) consumer.shutdown();
                        if (executor != null) executor.shutdown();
                    }

                    public void run(int a_numThreads) {
                        Map<String, Integer> topicCountMap = new HashMap<String,
Integer>();
                        topicCountMap.put(topic, new Integer(a_numThreads));
                        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
= consumer.createMessageStreams(topicCountMap);
                        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

                        // now launch all the threads
                        //
                        executor = Executors.newFixedThreadPool(a_numThreads);

                        // now create an object to consume the messages
                        //
                        int threadNumber = 0;
                        for (final KafkaStream stream : streams) {
                            executor.submit(new ConsumerTest(stream, threadNumber));
                            threadNumber++;
                        }
                    }

                    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {
                        Properties props = new Properties();
                        props.put("zookeeper.connect", a_zookeeper);
                        props.put("group.id", a_groupId);
                        props.put("zookeeper.session.timeout.ms", "40000");
                        props.put("zookeeper.sync.time.ms", "20000");
                        props.put("auto.commit.interval.ms", "1000");

                        return new ConsumerConfig(props);
                    }

                    public static void main(String[] args) {
                        String zooKeeper = "172.25.1.94:2181";
                        String groupId = "group1";
                        String topic = "partitioned";
                        int threads = 5;

                        TestConsumer example = new TestConsumer(zooKeeper, groupId, topic);
                        example.run(threads);

                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException ie) {

                        }
                        example.shutdown();
                    }


}


From

Neha Narkhede <neha.narkh...@gmail.com>

Subject

Re: Kafka High Level Consumer Connector shuts down after 10 seconds

Date

Mon, 10 Mar 2014 16:48:31 GMT

Session termination can happen either when client or zookeeper process
pauses (due to GC) or when the client process terminates. A sustainable
solution is to tune GC settings. For now, you can try increasing the
zookeeper.session.timeout.ms.




On Sun, Mar 9, 2014 at 3:44 PM, Ameya Bhagat <ameya.bhagat@gmail.com> wrote:

> I am using a high level consumer as described at:
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> I am noticing that my consumer does not run forever and ends after some
> time (< 15s). At the zookeeper side, I see the following:
>
> INFO Processed session termination for sessionid: 0x144a4854325004d
> (org.apache.zookeeper.server.PrepRequestProcessor)
> INFO Closed socket connection for client /127.0.0.1:59899 which had
> sessionid 0x144a4854325004d (org.apache.zookeeper.server.NIOServerCnxn)
>
> I am using default configurations. How do I make my consumer listen
> forever?
>
> Thanks
> Ameya
>



--
Thanks & Regards,
Gufran Pathan | +91-9566811502 | www.mu-sigma.com<http://www.mu-sigma.com/> |

Disclaimer: http://www.mu-sigma.com/disclaimer.html

Mime
View raw message