samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Job-Selina Wu <swucaree...@gmail.com>
Subject Re: kafka producer failed
Date Mon, 27 Jul 2015 05:53:45 GMT
Hi, Yi:

    you are right. After I add the producer.close(); the bug is fixed now.

Thanks a lot!

Selina

On Sun, Jul 26, 2015 at 10:28 PM, Yi Pan <nickpan47@gmail.com> wrote:

> Hi, Selina,
>
> Did you forget to close your Kafka producer in your Http servlet? If you
> create a new Kafka producer in each Http request and do not close the
> producer, that might cause this problem.
>
> -Yi
>
> On Sun, Jul 26, 2015 at 9:25 PM, Job-Selina Wu <swucareer99@gmail.com>
> wrote:
>
> > Hi, Yan:
> >
> >     the kaka.log got *java.io.IOException: Too many open files in system*
> >
> > Some detail list below.
> >
> > Sincerely,
> > Selina
> >
> > [2015-07-26 21:07:01,500] INFO Verifying properties
> > (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,545] INFO Property broker.id is overridden to 0
> > (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,545] INFO Property log.cleaner.enable is
> > overridden to false (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,545] INFO Property log.dirs is overridden to
> > /tmp/kafka-logs (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,545] INFO Property
> > log.retention.check.interval.ms is overridden to 300000
> > (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,546] INFO Property log.retention.hours is
> > overridden to 168 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,546] INFO Property log.segment.bytes is
> > overridden to 1073741824 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,546] INFO Property num.io.threads is overridden
> > to 8 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,547] INFO Property num.network.threads is
> > overridden to 3 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,547] INFO Property num.partitions is overridden
> > to 1 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,547] INFO Property
> > num.recovery.threads.per.data.dir is overridden to 1
> > (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,547] INFO Property port is overridden to 9092
> > (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,548] INFO Property socket.receive.buffer.bytes is
> > overridden to 102400 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,548] INFO Property socket.request.max.bytes is
> > overridden to 104857600 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,548] INFO Property socket.send.buffer.bytes is
> > overridden to 102400 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,548] INFO Property zookeeper.connect is
> > overridden to localhost:2181 (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,548] INFO Property
> > zookeeper.connection.timeout.ms is overridden to 6000
> > (kafka.utils.VerifiableProperties)
> > [2015-07-26 21:07:01,607] INFO [Kafka Server 0], starting
> > (kafka.server.KafkaServer)
> > [2015-07-26 21:07:01,608] INFO [Kafka Server 0], Connecting to
> > zookeeper on localhost:2181 (kafka.server.KafkaServer)
> > [2015-07-26 21:07:01,619] INFO Starting ZkClient event thread.
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2015-07-26 21:07:01,631] INFO Client
> > environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09
> > GMT (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,631] INFO Client
> > environment:host.name=selinas-mbp.attlocal.net
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,631] INFO Client
> > environment:java.version=1.8.0_45 (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,631] INFO Client environment:java.vendor=Oracle
> > Corporation (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,631] INFO Client
> >
> >
> environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/jre
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,631] INFO Client
> >
> >
> environment:java.class.path=:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../core/build/dependant-libs-2.10.4*/*.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../examples/build/libs//kafka-examples*.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../clients/build/libs/kafka-clients*.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/jopt-simple-3.2.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/kafka-clients-0.8.2.1.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/kafka_2.10-0.8.2.1-javadoc.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/kafka_2.10-0.8.2.1-scaladoc.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/kafka_2.10-0.8.2.1-sources.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/kafka_2.10-0.8.2.1-test.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/kafka_2.10-0.8.2.1.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/log4j-1.2.16.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/lz4-1.2.0.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/metrics-core-2.2.0.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/scala-library-2.10.4.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/slf4j-api-1.7.6.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/slf4j-log4j12-1.6.1.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/snappy-java-1.1.1.6.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/zkclient-0.3.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../libs/zookeeper-3.4.6.jar:/Users/selina/IdeaProjects/samza-Demo/deploy/kafka/bin/../core/build/libs/kafka_2.10*.jar
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,631] INFO Client
> >
> >
> environment:java.library.path=/Users/selina/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,634] INFO Client
> >
> >
> environment:java.io.tmpdir=/var/folders/pj/f6r7cdgs7nv8lr03z7bbnbdc0000gn/T/
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,634] INFO Client environment:java.compiler=<NA>
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,634] INFO Client environment:os.name=Mac OS X
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,634] INFO Client environment:os.arch=x86_64
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,634] INFO Client environment:os.version=10.10.3
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,634] INFO Client environment:user.name=selina
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,634] INFO Client
> > environment:user.home=/Users/selina (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,634] INFO Client
> > environment:user.dir=/Users/selina/IdeaProjects/samza-Demo/deploy/kafka
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,635] INFO Initiating client connection,
> > connectString=localhost:2181 sessionTimeout=6000
> > watcher=org.I0Itec.zkclient.ZkClient@4d49af10
> > (org.apache.zookeeper.ZooKeeper)
> > [2015-07-26 21:07:01,661] INFO Opening socket connection to server
> > localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using
> > SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> > [2015-07-26 21:07:01,742] INFO Socket connection established to
> > localhost/0:0:0:0:0:0:0:1:2181, initiating session
> > (org.apache.zookeeper.ClientCnxn)
> > [2015-07-26 21:07:01,815] INFO Session establishment complete on
> > server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14ecdb12e690000,
> > negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> > [2015-07-26 21:07:01,816] INFO zookeeper state changed (SyncConnected)
> > (org.I0Itec.zkclient.ZkClient)
> > [2015-07-26 21:07:02,515] INFO Loading logs. (kafka.log.LogManager)
> > [2015-07-26 21:07:02,585] INFO Completed load of log
> > __samza_checkpoint_ver_1_for_demo-parser5_1-0 with log end offset 3
> > (kafka.log.Log)
> > [2015-07-26 21:07:02,594] INFO Completed load of log
> > __samza_checkpoint_ver_1_for_demo-parser6_1-0 with log end offset 12
> > (kafka.log.Log)
> > [2015-07-26 21:07:02,599] INFO Completed load of log
> > __samza_checkpoint_ver_1_for_demo-parser7_1-0 with log end offset 4
> > (kafka.log.Log)
> > [2015-07-26 21:07:02,604] INFO Completed load of log
> > __samza_checkpoint_ver_1_for_wikipedia-parser_1-0 with log end offset
> > 29 (kafka.log.Log)
> > [2015-07-26 21:07:02,608] INFO Completed load of log
> > __samza_checkpoint_ver_1_for_wikipedia-stats_1-0 with log end offset
> > 18 (kafka.log.Log)
> > [2015-07-26 21:07:02,616] INFO Completed load of log demo-duplicate-0
> > with log end offset 19931 (kafka.log.Log)
> > [2015-07-26 21:07:02,621] INFO Completed load of log http-demo-0 with
> > log end offset 23048 (kafka.log.Log)
> > [2015-07-26 21:07:02,627] INFO Completed load of log metrics-0 with
> > log end offset 135 (kafka.log.Log)
> > [2015-07-26 21:07:02,632] INFO Completed load of log wikipedia-edits-0
> > with log end offset 1822 (kafka.log.Log)
> > [2015-07-26 21:07:02,638] INFO Completed load of log wikipedia-raw-0
> > with log end offset 2731 (kafka.log.Log)
> > [2015-07-26 21:07:02,643] INFO Completed load of log wikipedia-stats-0
> > with log end offset 97 (kafka.log.Log)
> > [2015-07-26 21:07:02,649] INFO Completed load of log
> > wikipedia-stats-changelog-0 with log end offset 1795 (kafka.log.Log)
> > [2015-07-26 21:07:02,652] INFO Logs loading complete.
> > (kafka.log.LogManager)
> > [2015-07-26 21:07:02,652] INFO Starting log cleanup with a period of
> > 300000 ms. (kafka.log.LogManager)
> > [2015-07-26 21:07:02,655] INFO Starting log flusher with a default
> > period of 9223372036854775807 ms. (kafka.log.LogManager)
> > [2015-07-26 21:07:02,680] INFO Awaiting socket connections on
> > 0.0.0.0:9092. (kafka.network.Acceptor)
> > [2015-07-26 21:07:02,680] INFO [Socket Server on Broker 0], Started
> > (kafka.network.SocketServer)
> > [2015-07-26 21:07:02,745] INFO Will not load MX4J, mx4j-tools.jar is
> > not in the classpath (kafka.utils.Mx4jLoader$)
> > [2015-07-26 21:07:02,773] INFO 0 successfully elected as leader
> > (kafka.server.ZookeeperLeaderElector)
> > [2015-07-26 21:07:03,115] INFO New leader is 0
> > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> > [2015-07-26 21:07:03,133] INFO Registered broker 0 at path
> > /brokers/ids/0 with address selinas-mbp.attlocal.net:9092.
> > (kafka.utils.ZkUtils$)
> > [2015-07-26 21:07:03,154] INFO [Kafka Server 0], started
> > (kafka.server.KafkaServer)
> > [2015-07-26 21:07:03,285] INFO [ReplicaFetcherManager on broker 0]
> > Removed fetcher for partitions
> >
> >
> [__samza_checkpoint_ver_1_for_demo-parser5_1,0],[wikipedia-raw,0],[__samza_checkpoint_ver_1_for_demo-parser6_1,0],[http-demo,0],[__samza_checkpoint_ver_1_for_demo-parser7_1,0],[__samza_checkpoint_ver_1_for_wikipedia-stats_1,0],[wikipedia-edits,0],[wikipedia-stats,0],[wikipedia-stats-changelog,0],[demo-duplicate,0],[metrics,0],[__samza_checkpoint_ver_1_for_wikipedia-parser_1,0]
> > (kafka.server.ReplicaFetcherManager)
> > [2015-07-26 21:07:03,361] INFO [ReplicaFetcherManager on broker 0]
> > Removed fetcher for partitions
> >
> >
> [__samza_checkpoint_ver_1_for_demo-parser5_1,0],[wikipedia-raw,0],[__samza_checkpoint_ver_1_for_demo-parser6_1,0],[http-demo,0],[__samza_checkpoint_ver_1_for_demo-parser7_1,0],[__samza_checkpoint_ver_1_for_wikipedia-stats_1,0],[wikipedia-edits,0],[wikipedia-stats,0],[wikipedia-stats-changelog,0],[demo-duplicate,0],[metrics,0],[__samza_checkpoint_ver_1_for_wikipedia-parser_1,0]
> > (kafka.server.ReplicaFetcherManager)
> > [2015-07-26 21:07:24,770] INFO Topic creation
> > {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$)
> > [2015-07-26 21:07:24,772] INFO [KafkaApi-0] Auto creation of topic
> > http-demo-Json with 1 partitions and replication factor 1 is
> > successful! (kafka.server.KafkaApis)
> > [2015-07-26 21:07:24,795] INFO [ReplicaFetcherManager on broker 0]
> > Removed fetcher for partitions [http-demo-Json,0]
> > (kafka.server.ReplicaFetcherManager)
> > [2015-07-26 21:07:24,800] INFO Completed load of log http-demo-Json-0
> > with log end offset 0 (kafka.log.Log)
> > [2015-07-26 21:07:24,803] INFO Created log for partition
> > [http-demo-Json,0] in /tmp/kafka-logs with properties
> > {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000,
> > segment.bytes -> 1073741824, flush.ms -> 9223372036854775807,
> > delete.retention.ms -> 86400000, index.interval.bytes -> 4096,
> > retention.bytes -> -1, min.insync.replicas -> 1, cleanup.policy ->
> > delete, unclean.leader.election.enable -> true, segment.ms ->
> > 604800000, max.message.bytes -> 1000012, flush.messages ->
> > 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms ->
> > 604800000, segment.jitter.ms -> 0}. (kafka.log.LogManager)
> > [2015-07-26 21:07:24,804] WARN Partition [http-demo-Json,0] on broker
> > 0: No checkpointed highwatermark is found for partition
> > [http-demo-Json,0] (kafka.cluster.Partition)
> > [2015-07-26 21:07:24,806] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:07:24,833] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > ......
> > ......
> >
> > *[2015-07-26 21:08:04,444] INFO Closing socket connection to
> > /**127.0.0.1* <http://127.0.0.1>*. (kafka.network.Processor)*
> > [2015-07-26 21:08:04,457] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:04,469] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:04,487] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > *java.io.IOException: Too many open files in system*
> >    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> >    at kafka.network.Acceptor.accept(SocketServer.scala:270)
> >    at kafka.network.Acceptor.run(SocketServer.scala:225)
> >    at java.lang.Thread.run(Thread.java:745)
> > [2015-07-26 21:08:04,590] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > java.io.IOException: Too many open files in system
> >    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> >    at kafka.network.Acceptor.accept(SocketServer.scala:270)
> >    at kafka.network.Acceptor.run(SocketServer.scala:225)
> >    at java.lang.Thread.run(Thread.java:745)
> > [2015-07-26 21:08:04,590] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > java.io.IOException: Too many open files in system
> >    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> >    at kafka.network.Acceptor.accept(SocketServer.scala:270)
> >    at kafka.network.Acceptor.run(SocketServer.scala:225)
> >    at java.lang.Thread.run(Thread.java:745)
> > [2015-07-26 21:08:04,696] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:04,708] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:04,720] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:04,833] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:04,848] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:05,172] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > java.io.IOException: Too many open files in system
> >    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> >    at kafka.network.Acceptor.accept(SocketServer.scala:270)
> >    at kafka.network.Acceptor.run(SocketServer.scala:225)
> >    at java.lang.Thread.run(Thread.java:745)
> > [2015-07-26 21:08:05,173] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > java.io.IOException: Too many open files in system
> >    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> >    at kafka.network.Acceptor.accept(SocketServer.scala:270)
> >    at kafka.network.Acceptor.run(SocketServer.scala:225)
> >    at java.lang.Thread.run(Thread.java:745)
> > [2015-07-26 21:08:05,278] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > java.io.IOException: Too many open files in system
> >    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> >    at kafka.network.Acceptor.accept(SocketServer.scala:270)
> >    at kafka.network.Acceptor.run(SocketServer.scala:225)
> >    at java.lang.Thread.run(Thread.java:745)
> > [2015-07-26 21:08:05,293] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > ......
> > [2015-07-26 21:08:08,185] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:08,198] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:08,210] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:08,224] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> >
> > .....
> > .....
> > [2015-07-26 21:08:08,762] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > java.io.IOException: Too many open files in system
> >    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> >    at kafka.network.Acceptor.accept(SocketServer.scala:270)
> >    at kafka.network.Acceptor.run(SocketServer.scala:225)
> >    at java.lang.Thread.run(Thread.java:745)
> >
> > [2015-07-26 21:08:09,725] INFO Closing socket connection to
> > /127.0.0.1. (kafka.network.Processor)
> > [2015-07-26 21:08:09,737] ERROR Error while accepting connection
> > (kafka.network.Acceptor)
> > java.io.IOException: Too many open files in system
> >    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> >    at
> >
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> >    at kafka.network.Acceptor.accept(SocketServer.scala:270)
> >    at kafka.network.Acceptor.run(SocketServer.scala:225)
> >    at java.lang.Thread.run(Thread.java:745)
> > .....
> >
> > On Sun, Jul 26, 2015 at 1:59 PM, Yan Fang <yanfang724@gmail.com> wrote:
> >
> > > You may check the Kafka.log to see what's inside
> > >
> > > Yan Fang
> > >
> > > > On Jul 26, 2015, at 2:01 AM, Job-Selina Wu <swucareer99@gmail.com>
> > > wrote:
> > > >
> > > > The exception is below:
> > > >
> > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > after
> > > 3
> > > > tries.
> > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > after
> > > 3
> > > > tries.
> > > > at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > at kafka.producer.Producer.send(Producer.scala:77)
> > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > > at http.server.HttpDemoHandler.doDemo(HttpDemoHandler.java:71)
> > > > at http.server.HttpDemoHandler.handle(HttpDemoHandler.java:32)
> > > > at
> > > >
> > >
> >
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
> > > > at org.eclipse.jetty.server.Server.handle(Server.java:498)
> > > > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:265)
> > > > at
> > > >
> > >
> >
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:243)
> > > > at
> > > >
> > >
> >
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
> > > > at
> > > >
> > >
> >
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:610)
> > > > at
> > > >
> > >
> >
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:539)
> > > > at java.lang.Thread.run(Thread.java:745)
> > > >
> > > > On Sun, Jul 26, 2015 at 12:42 AM, Job-Selina Wu <
> swucareer99@gmail.com
> > >
> > > > wrote:
> > > >
> > > >> Hi, Yan:
> > > >>
> > > >>      My Http Server send message to Kafka.
> > > >>
> > > >> The server.log at deploy/kafka/logs/server.log shown :
> > > >>
> > > >> [2015-07-26 00:33:51,910] INFO Closing socket connection to /
> > 127.0.0.1.
> > > (kafka.network.Processor)
> > > >> [2015-07-26 00:33:51,984] INFO Closing socket connection to /
> > 127.0.0.1.
> > > (kafka.network.Processor)
> > > >> [2015-07-26 00:33:52,011] INFO Closing socket connection to /
> > 127.0.0.1.
> > > (kafka.network.Processor)
> > > >>
> > > >> .....
> > > >>
> > > >>
> > > >> Your help is highly appreciated.
> > > >>
> > > >> Sincerely,
> > > >>
> > > >> Selina
> > > >>
> > > >>
> > > >>> On Sun, Jul 26, 2015 at 12:01 AM, Yan Fang <yanfang724@gmail.com>
> > > wrote:
> > > >>>
> > > >>> You are giving the Kafka code and the Samza log, which does not
> make
> > > sense
> > > >>> actually...
> > > >>>
> > > >>> Fang, Yan
> > > >>> yanfang724@gmail.com
> > > >>>
> > > >>> On Sat, Jul 25, 2015 at 10:31 PM, Job-Selina Wu <
> > swucareer99@gmail.com
> > > >
> > > >>> wrote:
> > > >>>
> > > >>>> Hi, Yi, Navina and Benjamin:
> > > >>>>
> > > >>>>    Thanks a lot to spending your time to help me this issue.
> > > >>>>
> > > >>>>    The configuration is below. Do you think it could be the
> > > >>> configuration
> > > >>>> problem?
> > > >>>> I tried         props.put("request.required.acks", "0"); and
> > > >>>> props.put("request.required.acks", "1"); both did not work.
> > > >>>>
> > > >>>> --------
> > > >>>> Properties props = new Properties();
> > > >>>>
> > > >>>>    private final Producer<String, String> producer;
> > > >>>>
> > > >>>>    public KafkaProducer() {
> > > >>>>        //BOOTSTRAP.SERVERS
> > > >>>>        props.put("metadata.broker.list", "localhost:9092");
> > > >>>>        props.put("bootstrap.servers", "localhost:9092 ");
> > > >>>>        props.put("serializer.class",
> > > "kafka.serializer.StringEncoder");
> > > >>>>        props.put("partitioner.class",
> > "com.kafka.SimplePartitioner");
> > > >>>>        props.put("request.required.acks", "0");
> > > >>>>
> > > >>>>        ProducerConfig config = new ProducerConfig(props);
> > > >>>>
> > > >>>>        producer = new Producer<String, String>(config);
> > > >>>>    }
> > > >>>>
> > > >>>> --------------
> > > >>>>
> > > >>>>     Exceptions at log are list below.
> > > >>>>
> > > >>>> Your help is highly appreciated.
> > > >>>>
> > > >>>> Sincerely,
> > > >>>> Selina Wu
> > > >>>>
> > > >>>>
> > > >>>> Exceptions at log
> > > >>>
> > >
> >
> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000001/samza-application-master.log
> > > >>>>
> > > >>>> 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop
> > home
> > > >>>> directory
> > > >>>> *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*.
> > > >>>>   at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265)
> > > >>>>   at org.apache.hadoop.util.Shell.<clinit>(Shell.java:290)
> > > >>>>   at
> > org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.hadoop.yarn.conf.YarnConfiguration.<clinit>(YarnConfiguration.java:517)
> > > >>>>   at
> > > >>>>
> > > org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:77)
> > > >>>>   at
> > > >>> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> > > >>>> 2015-07-25 22:03:52 Shell [DEBUG] setsid is not available on this
> > > >>>> machine. So not using it.
> > > >>>> 2015-07-25 22:03:52 Shell [DEBUG] setsid exited with exit code 0
> > > >>>> 2015-07-25 22:03:52 ClientHelper [INFO] trying to connect to RM
> > > >>>> 127.0.0.1:8032
> > > >>>> 2015-07-25 22:03:52 AbstractService [DEBUG] Service:
> > > >>>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered
> state
> > > >>>> INITED
> > > >>>> 2015-07-25 22:03:52 RMProxy [INFO] Connecting to ResourceManager
> at
> > > >>>> /127.0.0.1:8032
> > > >>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> > > >>>> org.apache.hadoop.metrics2.lib.MutableRate
> > > >>>>
> > > org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> > > >>>> with annotation
> > > >>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops,
> > about=,
> > > >>>> always=false, type=DEFAULT, value=[Rate of successful kerberos
> > logins
> > > >>>> and latency (milliseconds)], valueName=Time)
> > > >>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> > > >>>> org.apache.hadoop.metrics2.lib.MutableRate
> > > >>>>
> > > org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
> > > >>>> with annotation
> > > >>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops,
> > about=,
> > > >>>> always=false, type=DEFAULT, value=[Rate of failed kerberos logins
> > and
> > > >>>> latency (milliseconds)], valueName=Time)
> > > >>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> > > >>>> org.apache.hadoop.metrics2.lib.MutableRate
> > > >>>>
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups
> > > >>>> with annotation
> > > >>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops,
> > about=,
> > > >>>> always=false, type=DEFAULT, value=[GetGroups], valueName=Time)
> > > >>>> 2015-07-25 22:03:52 MetricsSystemImpl [DEBUG] UgiMetrics, User and
> > > >>>> group related metrics
> > > >>>> 2015-07-25 22:03:52 KerberosName [DEBUG] Kerberos krb5
> configuration
> > > >>>> not found, setting default realm to empty
> > > >>>> 2015-07-25 22:03:52 Groups [DEBUG]  Creating new Groups object
> > > >>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Trying to load the
> > > >>>> custom-built native-hadoop library...
> > > >>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Failed to load
> > > >>>> native-hadoop with error: java.lang.UnsatisfiedLinkError: no
> hadoop
> > in
> > > >>>> java.library.path
> > > >>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG]
> > > >>>
> > >
> >
> java.library.path=/home//Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
> > > >>>> 2015-07-25 22:03:52 NativeCodeLoader [WARN] Unable to load
> > > >>>> native-hadoop library for your platform... using builtin-java
> > classes
> > > >>>> where applicable....
> > > >>>>
> > > >>>>
> > > >>>> 2015-07-25 22:03:53 KafkaCheckpointManager [WARN] While trying to
> > > >>>> validate topic __samza_checkpoint_ver_1_for_demo-parser7_1:
> > > >>>> *kafka.common.LeaderNotAvailableException. Retrying.*
> > > >>>> 2015-07-25 22:03:53 KafkaCheckpointManager [DEBUG] Exception
> detail:
> > > >>>> kafka.common.LeaderNotAvailableException
> > > >>>>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > >>> Method)
> > > >>>>   at
> > > >>>
> > >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> > > >>>>   at
> > > >>>
> > >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > > >>>>   at
> java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> > > >>>>   at java.lang.Class.newInstance(Class.java:442)
> > > >>>>   at
> > > >>>
> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:84)
> > > >>>>   at
> > > >>>>
> > >
> org.apache.samza.util.KafkaUtil$.maybeThrowException(KafkaUtil.scala:63)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:389)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:386)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.validateTopic(KafkaCheckpointManager.scala:385)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.start(KafkaCheckpointManager.scala:336)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:127)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:55)
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:72)
> > > >>>>   at
> > > >>>>
> > > org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:93)
> > > >>>>   at
> > > >>> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> > > >>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Verifying
> properties
> > > >>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property
> client.id
> > is
> > > >>>> overridden to
> > samza_checkpoint_manager-demo_parser7-1-1437887032962-0
> > > >>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property
> > > >>>> metadata.broker.list is overridden to localhost:9092
> > > >>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property
> > > >>>> request.timeout.ms is overridden to 30000
> > > >>>> 2015-07-25 22:03:53 ClientUtils$ [INFO] Fetching metadata from
> > broker
> > > >>>> id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s)
> > > >>>> Set(__samza_checkpoint_ver_1_for_demo-parser7_1)
> > > >>>> 2015-07-25 22:03:53 BlockingChannel [DEBUG] Created socket with
> > > >>>> SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 408300
> (requested
> > > >>>> -1), SO_SNDBUF = 114324 (requested 102400), connectTimeoutMs =
> > 30000.
> > > >>>> 2015-07-25 22:03:53 SyncProducer [INFO] Connected to
> localhost:9092
> > > >>>> for producing
> > > >>>> 2015-07-25 22:03:53 SyncProducer [INFO] Disconnecting from
> > > >>> localhost:9092
> > > >>>> 2015-07-25 22:03:53 ClientUtils$ [DEBUG] Successfully fetched
> > metadata
> > > >>>> for 1 topic(s) Set(__samza_checkpoint_ver_1_for_demo-parser7_1)
> > > >>>> 2015-07-25 22:03:53 KafkaCheckpointManager [INFO] Successfully
> > > >>>> validated checkpoint topic
> > > >>>> __samza_checkpoint_ver_1_for_demo-parser7_1.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Exception at log
> > > >>>
> > >
> >
> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000002/stderr
> > > >>>>
> > > >>>> 2015-07-25 22:06:03 HttpDemoParserStreamTask [INFO] key=123:
> > > >>>> message=timestamp=06-20-2015 id=123 ip=22.231.113.69
> browser=Chrome
> > > >>>> postalCode=95131 url=http://sample1.com language=ENG
> > > mobileBrand=Apple
> > > >>>> *count=3860*
> > > >>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [INFO] Reconnect
> due
> > to
> > > >>>> socket error: java.io.EOFException: Received -1 when reading from
> > > >>>> channel, socket has likely been closed.
> > > >>>> 2015-07-25 22:06:04 Selector [WARN]
> > > >>>> *Error in I/O with localhost/127.0.0.1 <http://127.0.0.1
> > > >>>>> java.io.EOFException*
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> > > >>>>   at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> > > >>>>   at
> > > >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> > > >>>>   at java.lang.Thread.run(Thread.java:745)
> > > >>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG]
> Disconnecting
> > > >>>> from selinas-mbp.attlocal.net:9092
> > > >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
> > > >>>> Selinas-MBP.attlocal.net/192.168.1.227
> > > >>>> java.io.EOFException
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> > > >>>>   at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> > > >>>>   at
> > > >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> > > >>>>   at java.lang.Thread.run(Thread.java:745)
> > > >>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG]
> Disconnecting
> > > >>>> from selinas-mbp.attlocal.net:9092
> > > >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
> > > >>>> Selinas-MBP.attlocal.net/192.168.1.227
> > > >>>> java.io.EOFException
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> > > >>>>   at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> > > >>>>   at
> > > >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> > > >>>>   at java.lang.Thread.run(Thread.java:745)
> > > >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with localhost/
> > > >>> 127.0.0.1
> > > >>>> java.io.EOFException
> > > >>>>   at
> > > >>>
> > >
> >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> > > >>>>   at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> > > >>>>   at
> > > >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> > > >>>>   at java.lang.Thread.run(Thread.java:745)
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected.
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected.
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected.
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected.
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
> > > >>>> request to node 0
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
> > > >>>> request to node 0
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node
> 0
> > > >>>> for sending metadata request in the next iteration
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node
> 0
> > > >>>> for sending metadata request in the next iteration
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to
> > > >>>> node 0 at selinas-mbp.attlocal.net:9092.
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to
> > > >>>> node 0 at selinas-mbp.attlocal.net:9092.
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
> > > >>>> request to node 0
> > > >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
> > > >>>> request to node 0
> > > >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
> > > >>>> selinas-mbp.attlocal.net/192.168.1.227
> > > >>>> java.net.ConnectException: Connection refused
> > > >>>>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > > >>>>   at
> > > >>>>
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > > >>>>   at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:238)
> > > >>>>   at
> > > >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > > >>>>   at
> > > >>>>
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> > > >>>>   at java.lang.Thread.run(Thread.java:745)
> > > >>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG]
> Disconnecting
> > > >>>> from selinas-mbp.attlocal.net:9092
> > > >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O
> > > >>>>
> > > >>>>
> > > >>>>> On Fri, Jul 24, 2015 at 5:03 PM, Benjamin Black <b@b3k.us>
> wrote:
> > > >>>>>
> > > >>>>> what are the log messages from the kafka brokers? these look like
> > > >>> client
> > > >>>>> messages indicating a broker problem.
> > > >>>>>
> > > >>>>> On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <
> > > swucareer99@gmail.com
> > > >>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi, Yi:
> > > >>>>>>
> > > >>>>>>      I am wondering if the problem can be fixed by the parameter
> > "
> > > >>>>>> max.message.size" at kafka.producer.ProducerConfig for the topic
> > > >>> size?
> > > >>>>>>
> > > >>>>>>      My Http Server send message to Kafka. The last message
> shown
> > > >>> on
> > > >>>>>> console is
> > > >>>>>> "message=timestamp=06-20-2015 id=678 ip=22.231.113.68
> > browser=Safari
> > > >>>>>> postalCode=95066 url=http://sample2.com language=ENG
> > > >>> mobileBrand=Apple
> > > >>>>>> count=4269"
> > > >>>>>>
> > > >>>>>> However the Kafka got Exception from message 4244th
> > > >>>>>> The error is below and Kafka do not accept any new message after
> > > >>> this.
> > > >>>>>>
> > > >>>>>> "[2015-07-24 12:46:11,078] WARN
> > > >>>
> > >
> >
> [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread],
> > > >>>>>> Failed to find leader for Set([http-demo,0])
> > > >>>>>> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> > > >>>>>> kafka.common.KafkaException: fetching topic metadata for topics
> > > >>>>>> [Set(http-demo)] from broker
> > > >>>>> [ArrayBuffer(id:0,host:10.1.10.173,port:9092)]
> > > >>>>>> failed
> > > >>>>>> at
> > > >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > > >>>>>> at
> > > >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> > > >>>>>> at
> > > >>>
> > >
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> > > >>>>>> at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > >>>>>> Caused by: java.nio.channels.ClosedChannelException
> > > >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> > > >>>>>> at
> > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> > > >>>>>> at
> > > >>>
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > > >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > > >>>>>> at
> > > >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > > >>>>>> ... 3 more
> > > >>>>>> [2015-07-24 12:46:11,287] WARN Fetching topic metadata with
> > > >>> correlation
> > > >>>>> id
> > > >>>>>> 21 for topics [Set(http-demo)] from broker
> > > >>>>>> [id:0,host:10.1.10.173,port:9092] failed
> > (kafka.client.ClientUtils$)
> > > >>>>>> java.nio.channels.ClosedChannelException
> > > >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> > > >>>>>> at
> > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> > > >>>>>> at
> > > >>>
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > > >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > > >>>>>> at
> > > >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > > >>>>>> at
> > > >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> > > >>>>>> at
> > > >>>
> > >
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> > > >>>>>> at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> After the Error:
> > > >>>>>> I show the topic, it is right, but can not show the content by
> > > >>> command
> > > >>>>> line
> > > >>>>>>
> > > >>>>>> Selinas-MacBook-Pro:samza-Demo selina$
> > > >>> deploy/kafka/bin/kafka-topics.sh
> > > >>>>>> --list --zookeeper localhost:2181
> > > >>>>>> http-demo
> > > >>>>>> Selinas-MacBook-Pro:samza-Demo selina$
> > > >>>>> deploy/kafka/bin/kafka-console-consumer.sh
> > > >>>>>> --zookeeper localhost:2181 --from-beginning --topic http-demo
> > > >>>>>> [2015-07-24 12:47:38,730] WARN
> > > >>>
> > >
> >
> [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87],
> > > >>>>>> no brokers found when trying to rebalance.
> > > >>>>>> (kafka.consumer.ZookeeperConsumerConnector)
> > > >>>>>>
> > > >>>>>> Attached is my Kafka properties  for server and producer.
> > > >>>>>>
> > > >>>>>> Your help is highly appreciated
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Sincerely,
> > > >>>>>> Selina
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpan47@gmail.com>
> > > >>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi, Selina,
> > > >>>>>>>
> > > >>>>>>> Your question is not clear.
> > > >>>>>>> {quote}
> > > >>>>>>> When the messages was send to Kafka by KafkaProducer, It always
> > > >>> failed
> > > >>>>>>> when the message more than 3000 - 4000 messages.
> > > >>>>>>> {quote}
> > > >>>>>>>
> > > >>>>>>> What's failing? The error stack shows errors on the consumer
> side
> > > >>> and
> > > >>>>> you
> > > >>>>>>> were referring to failures to produce to Kafka. Could you be
> more
> > > >>>>> specific
> > > >>>>>>> regarding to what's your failure scenario?
> > > >>>>>>>
> > > >>>>>>> -Yi
> > > >>>>>>>
> > > >>>>>>> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu <
> > > >>> swucareer99@gmail.com
> > > >>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi,
> > > >>>>>>>>
> > > >>>>>>>>    When the messages was send to Kafka by KafkaProducer, It
> > > >>> always
> > > >>>>>>> failed
> > > >>>>>>>> when the message more than 3000 - 4000 messages. The error is
> > > >>> shown
> > > >>>>>>> below.
> > > >>>>>>>> I am wondering if any topic size I need to set at Samza
> > > >>>> configuration?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> [2015-07-23 17:30:03,792] WARN
> > > >>>
> > >
> >
> [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread],
> > > >>>>>>>> Failed to find leader for Set([http-demo,0])
> > > >>>>>>>> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> > > >>>>>>>> kafka.common.KafkaException: fetching topic metadata for
> topics
> > > >>>>>>>> [Set(http-demo)] from broker [ArrayBuffer()] failed
> > > >>>>>>>>        at
> > > >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > > >>>>>>>>        at
> > > >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> > > >>>>>>>>        at
> > > >>>
> > >
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> > > >>>>>>>>        at
> > > >>>>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > >>>>>>>> ^CConsumed 4327 messages
> > > >>>>>>>>
> > > >>>>>>>> Your reply and comment will be highly appreciated.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Sincerely,
> > > >>>>>>>> Selina
> > > >>
> > > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message