kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shrikant Patel <SPa...@pdxinc.com>
Subject Kafka on AWS - issue publishing from remote producer
Date Sat, 26 Sep 2015 05:38:13 GMT
I have Kafka 2.10-0.8.2.1 and zookeeper installed on the AWS EC2. The instance is working fine.

[kafka@ip-xx-xx-xx-xx bin]$ ./kafka-topics.sh --topic topic1 --zookeeper localhost:2181 --describe
Topic:topic1    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

When producer running on my local machine (remote) tries to publish to topic1 it fails with
exception below. It does create the topic1 (If topic1 didn't exist), Also on kakfa log I do
see that producer was able to connect to kafka server.
[2015-09-26 01:24:26,874] INFO Closing socket connection to /X0X.23X.X4X.X3X. (kafka.network.Processor).
These indicate the local producer is able to connect to kafka instance on AWS.

If I run producer on Kafka server itself, its able to publish to topic1 without any issue.
I am not sure I understand what the issue with local producer. Any pointer will be helpful???

C:\JAVA_INSTALLATION\kafka\kafka_2.10-0.8.2.1>bin\windows\kafka-console-producer.bat --broker-list
public-ip-address-of-kafka-ec2:9092 --topic topic1 --queue-enqueuetimeout-ms 10000 --request-timeout-ms
10000
[2015-09-25 23:08:00,146] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
12
[2015-09-25 23:08:02,977] WARN Failed to send producer request with correlation id 2 to broker
0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
       at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:03,653] WARN Failed to send producer request with correlation id 5 to broker
0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:04,296] WARN Failed to send producer request with correlation id 8 to broker
0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:04,954] WARN Failed to send producer request with correlation id 11 to broker
0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1 with correlation
ids in [0,12] (kafka.producer.async.DefaultEventHandler)
[2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)


[2015-09-25 23:08:04,954] WARN Failed to send producer request with correlation id 11 to broker
0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1 with correlation
ids in [0,12] (kafka.producer.async.DefaultEventHandler)
[2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Thanks,
Shri


________________________________
This message and its contents (to include attachments) are the property of National Health
Systems, Inc. and may contain confidential and proprietary information. This email and any
files transmitted with it are intended solely for the use of the individual or entity to whom
they are addressed. You are hereby notified that any unauthorized disclosure, copying, or
distribution of this message, or the taking of any unauthorized action based on information
contained herein is strictly prohibited. Unauthorized use of information contained herein
may subject you to civil and criminal prosecution and penalties. If you are not the intended
recipient, you should delete this message immediately and notify the sender immediately by
telephone or by replying to this transmission.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message