samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yan Fang <yanfang...@gmail.com>
Subject Re: kafka producer failed
Date Sun, 26 Jul 2015 07:01:52 GMT
You are giving the Kafka code and the Samza log, which does not make sense
actually...

Fang, Yan
yanfang724@gmail.com

On Sat, Jul 25, 2015 at 10:31 PM, Job-Selina Wu <swucareer99@gmail.com>
wrote:

> Hi, Yi, Navina and Benjamin:
>
>     Thanks a lot to spending your time to help me this issue.
>
>     The configuration is below. Do you think it could be the configuration
> problem?
> I tried         props.put("request.required.acks", "0"); and
>  props.put("request.required.acks", "1"); both did not work.
>
> --------
> Properties props = new Properties();
>
>     private final Producer<String, String> producer;
>
>     public KafkaProducer() {
>         //BOOTSTRAP.SERVERS
>         props.put("metadata.broker.list", "localhost:9092");
>         props.put("bootstrap.servers", "localhost:9092 ");
>         props.put("serializer.class", "kafka.serializer.StringEncoder");
>         props.put("partitioner.class", "com.kafka.SimplePartitioner");
>         props.put("request.required.acks", "0");
>
>         ProducerConfig config = new ProducerConfig(props);
>
>         producer = new Producer<String, String>(config);
>     }
>
> --------------
>
>      Exceptions at log are list below.
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina Wu
>
>
> Exceptions at log
>
> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000001/samza-application-master.log
>
> 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop home
> directory
> *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*.
>    at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265)
>    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:290)
>    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
>    at
> org.apache.hadoop.yarn.conf.YarnConfiguration.<clinit>(YarnConfiguration.java:517)
>    at
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:77)
>    at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> 2015-07-25 22:03:52 Shell [DEBUG] setsid is not available on this
> machine. So not using it.
> 2015-07-25 22:03:52 Shell [DEBUG] setsid exited with exit code 0
> 2015-07-25 22:03:52 ClientHelper [INFO] trying to connect to RM
> 127.0.0.1:8032
> 2015-07-25 22:03:52 AbstractService [DEBUG] Service:
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state
> INITED
> 2015-07-25 22:03:52 RMProxy [INFO] Connecting to ResourceManager at
> /127.0.0.1:8032
> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
> always=false, type=DEFAULT, value=[Rate of successful kerberos logins
> and latency (milliseconds)], valueName=Time)
> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
> always=false, type=DEFAULT, value=[Rate of failed kerberos logins and
> latency (milliseconds)], valueName=Time)
> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
> org.apache.hadoop.metrics2.lib.MutableRate
> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups
> with annotation
> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
> always=false, type=DEFAULT, value=[GetGroups], valueName=Time)
> 2015-07-25 22:03:52 MetricsSystemImpl [DEBUG] UgiMetrics, User and
> group related metrics
> 2015-07-25 22:03:52 KerberosName [DEBUG] Kerberos krb5 configuration
> not found, setting default realm to empty
> 2015-07-25 22:03:52 Groups [DEBUG]  Creating new Groups object
> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Trying to load the
> custom-built native-hadoop library...
> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Failed to load
> native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in
> java.library.path
> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG]
>
> java.library.path=/home//Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
> 2015-07-25 22:03:52 NativeCodeLoader [WARN] Unable to load
> native-hadoop library for your platform... using builtin-java classes
> where applicable....
>
>
> 2015-07-25 22:03:53 KafkaCheckpointManager [WARN] While trying to
> validate topic __samza_checkpoint_ver_1_for_demo-parser7_1:
> *kafka.common.LeaderNotAvailableException. Retrying.*
> 2015-07-25 22:03:53 KafkaCheckpointManager [DEBUG] Exception detail:
> kafka.common.LeaderNotAvailableException
>    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>    at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>    at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>    at java.lang.Class.newInstance(Class.java:442)
>    at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:84)
>    at
> org.apache.samza.util.KafkaUtil$.maybeThrowException(KafkaUtil.scala:63)
>    at
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:389)
>    at
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:386)
>    at
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>    at
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.validateTopic(KafkaCheckpointManager.scala:385)
>    at
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.start(KafkaCheckpointManager.scala:336)
>    at
> org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:127)
>    at
> org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:55)
>    at
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:72)
>    at
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:93)
>    at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> 2015-07-25 22:03:53 VerifiableProperties [INFO] Verifying properties
> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property client.id is
> overridden to samza_checkpoint_manager-demo_parser7-1-1437887032962-0
> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property
> metadata.broker.list is overridden to localhost:9092
> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property
> request.timeout.ms is overridden to 30000
> 2015-07-25 22:03:53 ClientUtils$ [INFO] Fetching metadata from broker
> id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s)
> Set(__samza_checkpoint_ver_1_for_demo-parser7_1)
> 2015-07-25 22:03:53 BlockingChannel [DEBUG] Created socket with
> SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 408300 (requested
> -1), SO_SNDBUF = 114324 (requested 102400), connectTimeoutMs = 30000.
> 2015-07-25 22:03:53 SyncProducer [INFO] Connected to localhost:9092
> for producing
> 2015-07-25 22:03:53 SyncProducer [INFO] Disconnecting from localhost:9092
> 2015-07-25 22:03:53 ClientUtils$ [DEBUG] Successfully fetched metadata
> for 1 topic(s) Set(__samza_checkpoint_ver_1_for_demo-parser7_1)
> 2015-07-25 22:03:53 KafkaCheckpointManager [INFO] Successfully
> validated checkpoint topic
> __samza_checkpoint_ver_1_for_demo-parser7_1.
>
>
>
> Exception at log
>
> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000002/stderr
>
> 2015-07-25 22:06:03 HttpDemoParserStreamTask [INFO] key=123:
> message=timestamp=06-20-2015 id=123 ip=22.231.113.69 browser=Chrome
> postalCode=95131 url=http://sample1.com language=ENG mobileBrand=Apple
> *count=3860*
> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [INFO] Reconnect due to
> socket error: java.io.EOFException: Received -1 when reading from
> channel, socket has likely been closed.
> 2015-07-25 22:06:04 Selector [WARN]
> *Error in I/O with localhost/127.0.0.1 <http://127.0.0.1
> >java.io.EOFException*
>    at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>    at java.lang.Thread.run(Thread.java:745)
> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting
> from selinas-mbp.attlocal.net:9092
> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
> Selinas-MBP.attlocal.net/192.168.1.227
> java.io.EOFException
>    at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>    at java.lang.Thread.run(Thread.java:745)
> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting
> from selinas-mbp.attlocal.net:9092
> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
> Selinas-MBP.attlocal.net/192.168.1.227
> java.io.EOFException
>    at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>    at java.lang.Thread.run(Thread.java:745)
> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with localhost/127.0.0.1
> java.io.EOFException
>    at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>    at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>    at java.lang.Thread.run(Thread.java:745)
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected.
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected.
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected.
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected.
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
> request to node 0
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
> request to node 0
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node 0
> for sending metadata request in the next iteration
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node 0
> for sending metadata request in the next iteration
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to
> node 0 at selinas-mbp.attlocal.net:9092.
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to
> node 0 at selinas-mbp.attlocal.net:9092.
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
> request to node 0
> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
> request to node 0
> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
> selinas-mbp.attlocal.net/192.168.1.227
> java.net.ConnectException: Connection refused
>    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>    at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>    at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
>    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>    at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>    at java.lang.Thread.run(Thread.java:745)
> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting
> from selinas-mbp.attlocal.net:9092
> 2015-07-25 22:06:04 Selector [WARN] Error in I/O
>
>
> On Fri, Jul 24, 2015 at 5:03 PM, Benjamin Black <b@b3k.us> wrote:
>
> > what are the log messages from the kafka brokers? these look like client
> > messages indicating a broker problem.
> >
> > On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <swucareer99@gmail.com>
> > wrote:
> >
> > > Hi, Yi:
> > >
> > >       I am wondering if the problem can be fixed by the parameter  "
> > > max.message.size" at kafka.producer.ProducerConfig for the topic size?
> > >
> > >       My Http Server send message to Kafka. The last message shown on
> > > console is
> > > "message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari
> > > postalCode=95066 url=http://sample2.com language=ENG mobileBrand=Apple
> > > count=4269"
> > >
> > > However the Kafka got Exception from message 4244th
> > > The error is below and Kafka do not accept any new message after this.
> > >
> > > "[2015-07-24 12:46:11,078] WARN
> > >
> >
> [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread],
> > > Failed to find leader for Set([http-demo,0])
> > > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> > > kafka.common.KafkaException: fetching topic metadata for topics
> > > [Set(http-demo)] from broker
> > [ArrayBuffer(id:0,host:10.1.10.173,port:9092)]
> > > failed
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> > > at
> > >
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > Caused by: java.nio.channels.ClosedChannelException
> > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> > > at
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > > ... 3 more
> > > [2015-07-24 12:46:11,287] WARN Fetching topic metadata with correlation
> > id
> > > 21 for topics [Set(http-demo)] from broker
> > > [id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$)
> > > java.nio.channels.ClosedChannelException
> > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> > > at
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> > > at
> > >
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"
> > >
> > >
> > > After the Error:
> > > I show the topic, it is right, but can not show the content by command
> > line
> > >
> > > Selinas-MacBook-Pro:samza-Demo selina$ deploy/kafka/bin/kafka-topics.sh
> > > --list --zookeeper localhost:2181
> > > http-demo
> > > Selinas-MacBook-Pro:samza-Demo selina$
> > deploy/kafka/bin/kafka-console-consumer.sh
> > > --zookeeper localhost:2181 --from-beginning --topic http-demo
> > > [2015-07-24 12:47:38,730] WARN
> > >
> >
> [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87],
> > > no brokers found when trying to rebalance.
> > > (kafka.consumer.ZookeeperConsumerConnector)
> > >
> > > Attached is my Kafka properties  for server and producer.
> > >
> > > Your help is highly appreciated
> > >
> > >
> > > Sincerely,
> > > Selina
> > >
> > >
> > >
> > > On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpan47@gmail.com> wrote:
> > >
> > >> Hi, Selina,
> > >>
> > >> Your question is not clear.
> > >> {quote}
> > >> When the messages was send to Kafka by KafkaProducer, It always failed
> > >> when the message more than 3000 - 4000 messages.
> > >> {quote}
> > >>
> > >> What's failing? The error stack shows errors on the consumer side and
> > you
> > >> were referring to failures to produce to Kafka. Could you be more
> > specific
> > >> regarding to what's your failure scenario?
> > >>
> > >> -Yi
> > >>
> > >> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu <swucareer99@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> >     When the messages was send to Kafka by KafkaProducer, It always
> > >> failed
> > >> > when the message more than 3000 - 4000 messages. The error is shown
> > >> below.
> > >> > I am wondering if any topic size I need to set at Samza
> configuration?
> > >> >
> > >> >
> > >> > [2015-07-23 17:30:03,792] WARN
> > >> >
> > >> >
> > >>
> >
> [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread],
> > >> > Failed to find leader for Set([http-demo,0])
> > >> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> > >> > kafka.common.KafkaException: fetching topic metadata for topics
> > >> > [Set(http-demo)] from broker [ArrayBuffer()] failed
> > >> >         at
> > >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
> > >> >         at
> > >> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> > >> >         at
> > >> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >> > ^CConsumed 4327 messages
> > >> >
> > >> > Your reply and comment will be highly appreciated.
> > >> >
> > >> >
> > >> > Sincerely,
> > >> > Selina
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message