samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chinmay Soman <chinmay.cere...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Sun, 22 Mar 2015 20:53:46 GMT
It says UnresolvedAddressException.

Can you check that you can telnet to 2181 and 9092 on that remote host ?
Also, you might try giving an IP address instead.

On Sun, Mar 22, 2015 at 1:24 PM, Ash W Matheson <ash.matheson@gmail.com>
wrote:

> OK, so the logs are much more detailed and useful, but it's not making any
> more sense:
>
> 2015-03-22 20:13:11 ClientUtils$ [INFO] Fetching metadata from broker
> id:0,host:redacted:9092 with correlation id 0 for 1 topic(s) Set(myTopic)
> 2015-03-22 20:13:11 SyncProducer [ERROR] Producer connection to
> redacted:9092 unsuccessful
> java.nio.channels.UnresolvedAddressException
>         at sun.nio.ch.Net.checkAddress(Net.java:127)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
>         at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
>         at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>         at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>         at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
>         at
>
> org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:40)
>         at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:197)
>         at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
>         at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
>         at
>
> org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
>         at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:138)
>         at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:137)
>         at
>
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>         at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:136)
>         at
>
> org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:125)
>         at
>
> org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
>         at
>
> org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:58)
>         at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>         at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>         at
>
> org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:58)
>         at
> org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:108)
>         at
> org.apache.samza.util.Util$.assignContainerToSSPTaskNames(Util.scala:127)
>         at
>
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:77)
>         at
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
>         at
> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> 2015-03-22 20:13:11 ClientUtils$ [WARN] Fetching topic metadata with
> correlation id 0 for topics [Set(redacted)] from broker
> [id:0,host:redacted,port:9092] failed
>
> So, it's not connecting ... which has me confused because I'm pushing data
> (from that python script) using the machine I'm running this test on.  I'll
> do some more digging and report back the results.
>
> On Sun, Mar 22, 2015 at 11:55 AM, Ash W Matheson <ash.matheson@gmail.com>
> wrote:
>
> > Ignore that last email - was reading the page stupidly.
> >
> > On Sun, Mar 22, 2015 at 11:52 AM, Ash W Matheson <ash.matheson@gmail.com
> >
> > wrote:
> >
> >> Is there any easy way to do that without recompiling samza?  I'm trying
> >> to localize that into the 'hello-samza' and looking at
> >> http://samza.apache.org/learn/documentation/latest/jobs/logging.html
> >> leads me to believe that I have to do this in the base samza project
> (not
> >> hello-samza).
> >>
> >> On Sun, Mar 22, 2015 at 11:37 AM, Ash W Matheson <
> ash.matheson@gmail.com>
> >> wrote:
> >>
> >>> Sure - I'll do that in a bit and send it up to pastebin.
> >>>
> >>> On Sun, Mar 22, 2015 at 11:35 AM, Chinmay Soman <
> >>> chinmay.cerebro@gmail.com> wrote:
> >>>
> >>>> Can you please enable debug level logging and paste the log?
> >>>>
> >>>> On Sun, Mar 22, 2015 at 11:28 AM, Ash W Matheson <
> >>>> ash.matheson@gmail.com>
> >>>> wrote:
> >>>>
> >>>> > No, it's behind some corporate stuff - I just redacted it so I
could
> >>>> share
> >>>> > it up.
> >>>> >
> >>>> > On Sun, Mar 22, 2015 at 11:17 AM, Chinmay Soman <
> >>>> chinmay.cerebro@gmail.com
> >>>> > >
> >>>> > wrote:
> >>>> >
> >>>> > > Just for sanity check, is the broker host 'redacted:9092'
or '
> >>>> > > redactedec:9092'.
> >>>> > >
> >>>> > > Just wanted to rule out any typos. Are the 2 above hosts the
same
> ?
> >>>> > >
> >>>> > > On Sun, Mar 22, 2015 at 11:08 AM, Ash W Matheson <
> >>>> ash.matheson@gmail.com
> >>>> > >
> >>>> > > wrote:
> >>>> > >
> >>>> > > > Also, here's the producer: http://pastebin.com/qMNJabTZ
> >>>> > > >
> >>>> > > >
> >>>> > > > On Sun, Mar 22, 2015 at 10:57 AM, Ash W Matheson <
> >>>> > ash.matheson@gmail.com
> >>>> > > >
> >>>> > > > wrote:
> >>>> > > >
> >>>> > > > > Yep, first thing I checked (got bitten by that earlier
in the
> >>>> week
> >>>> > with
> >>>> > > > no
> >>>> > > > > data actually in the topic).
> >>>> > > > >
> >>>> > > > > On Sun, Mar 22, 2015 at 10:56 AM, Chinmay Soman
<
> >>>> > > > chinmay.cerebro@gmail.com
> >>>> > > > > > wrote:
> >>>> > > > >
> >>>> > > > >> Can you double check that you can read data
from your Kafka
> >>>> broker ?
> >>>> > > > >>
> >>>> > > > >> > ./deploy/kafka/bin/kafka-topics.sh --describe
--zookeeper
> >>>> > > > localhost:2181
> >>>> > > > >> --topic myTopic
> >>>> > > > >> > ./deploy/kafka/bin/kafka-console-consumer.sh
--zookeeper
> >>>> > > > localhost:2181
> >>>> > > > >> --topic myTopic --from-beginning
> >>>> > > > >>
> >>>> > > > >> I've seen cases where if the Kafka broker isn't
shutdown
> >>>> properly,
> >>>> > > > >> something like this happens.
> >>>> > > > >>
> >>>> > > > >> On Sun, Mar 22, 2015 at 10:35 AM, Ash W Matheson
<
> >>>> > > > ash.matheson@gmail.com>
> >>>> > > > >> wrote:
> >>>> > > > >>
> >>>> > > > >> > Hey all,
> >>>> > > > >> >
> >>>> > > > >> > Evaluating Samza currently and am running
into some odd
> >>>> issues.
> >>>> > > > >> >
> >>>> > > > >> > I'm currently working off the 'hello-samza'
repo and trying
> >>>> to
> >>>> > > parse a
> >>>> > > > >> > simple kafka topic that I've produced through
an extenal
> >>>> java app
> >>>> > > > >> (nothing
> >>>> > > > >> > other than a series of sentences) and it's
failing pretty
> >>>> hard for
> >>>> > > me.
> >>>> > > > >> The
> >>>> > > > >> > base 'hello-samza' set of apps works fine,
but as soon as I
> >>>> change
> >>>> > > the
> >>>> > > > >> > configuration to look at a different Kafka/zookeeper
I get
> >>>> the
> >>>> > > > >> following in
> >>>> > > > >> > the userlogs:
> >>>> > > > >> >
> >>>> > > > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN]
Unable to fetch
> >>>> last
> >>>> > > > offsets
> >>>> > > > >> > for streams [myTopic] due to kafka.common.KafkaException:
> >>>> fetching
> >>>> > > > topic
> >>>> > > > >> > metadata for topics [Set(myTopic)] from
broker
> >>>> > > > >> > [ArrayBuffer(id:0,host:redacted,port:9092)]
failed.
> Retrying.
> >>>> > > > >> >
> >>>> > > > >> >
> >>>> > > > >> > The modifications are pretty straightforward.
 In the
> >>>> > > > >> > Wikipedia-parser.properties, I've changed
the following:
> >>>> > > > >> > task.inputs=kafka.myTopic
> >>>> > > > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
> >>>> > > > >> > systems.kafka.consumer.auto.offset.reset=smallest
> >>>> > > > >> > systems.kafka.producer.metadata.broker.list=redacted:9092
> >>>> > > > >> >
> >>>> > > > >> > and in the actual java file WikipediaParserStreamTask.java
> >>>> > > > >> >   public void process(IncomingMessageEnvelope
envelope,
> >>>> > > > MessageCollector
> >>>> > > > >> > collector, TaskCoordinator coordinator)
{
> >>>> > > > >> >     Map<String, Object> jsonObject
= (Map<String, Object>)
> >>>> > > > >> > envelope.getMessage();
> >>>> > > > >> >     WikipediaFeedEvent event = new
> >>>> WikipediaFeedEvent(jsonObject);
> >>>> > > > >> >
> >>>> > > > >> >     try {
> >>>> > > > >> >         System.out.println(event.getRawEvent());
> >>>> > > > >> >
> >>>> > > > >> > And then following the compile/extract/run
process outlined
> >>>> in the
> >>>> > > > >> > hello-samza website.
> >>>> > > > >> >
> >>>> > > > >> > Any thoughts?  I've looked online for any
'super simple'
> >>>> examples
> >>>> > of
> >>>> > > > >> > ingesting kafka in samza with very little
success.
> >>>> > > > >> >
> >>>> > > > >>
> >>>> > > > >>
> >>>> > > > >>
> >>>> > > > >> --
> >>>> > > > >> Thanks and regards
> >>>> > > > >>
> >>>> > > > >> Chinmay Soman
> >>>> > > > >>
> >>>> > > > >
> >>>> > > > >
> >>>> > > >
> >>>> > >
> >>>> > >
> >>>> > >
> >>>> > > --
> >>>> > > Thanks and regards
> >>>> > >
> >>>> > > Chinmay Soman
> >>>> > >
> >>>> >
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Thanks and regards
> >>>>
> >>>> Chinmay Soman
> >>>>
> >>>
> >>>
> >>
> >
>



-- 
Thanks and regards

Chinmay Soman

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message