samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ash W Matheson <ash.mathe...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Mon, 23 Mar 2015 02:21:12 GMT
looks like somewhere between Friday and today some security settings may
have changed - telneting into the kafka/zookeeper boxes was broker from the
test samza host. So it *seems* to be running without any exceptions now.

However, I'm still not seeing any output in any of the logs in
deploy/yarn/logs/userlogs (and subdirectories).  I was assuming
System.out.println(...) would stuff results into either application.  any
thoughts on that?

On Sun, Mar 22, 2015 at 1:53 PM, Chinmay Soman <chinmay.cerebro@gmail.com>
wrote:

> It says UnresolvedAddressException.
>
> Can you check that you can telnet to 2181 and 9092 on that remote host ?
> Also, you might try giving an IP address instead.
>
> On Sun, Mar 22, 2015 at 1:24 PM, Ash W Matheson <ash.matheson@gmail.com>
> wrote:
>
> > OK, so the logs are much more detailed and useful, but it's not making
> any
> > more sense:
> >
> > 2015-03-22 20:13:11 ClientUtils$ [INFO] Fetching metadata from broker
> > id:0,host:redacted:9092 with correlation id 0 for 1 topic(s) Set(myTopic)
> > 2015-03-22 20:13:11 SyncProducer [ERROR] Producer connection to
> > redacted:9092 unsuccessful
> > java.nio.channels.UnresolvedAddressException
> >         at sun.nio.ch.Net.checkAddress(Net.java:127)
> >         at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> >         at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >         at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> >         at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> >         at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> >         at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> >         at
> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> >         at
> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> >         at
> >
> >
> org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:40)
> >         at
> >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:197)
> >         at
> >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
> >         at
> >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
> >         at
> >
> >
> org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
> >         at
> >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:138)
> >         at
> >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:137)
> >         at
> >
> >
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> >         at
> >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:136)
> >         at
> >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:125)
> >         at
> >
> >
> org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
> >         at
> >
> >
> org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:58)
> >         at
> >
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >         at
> >
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> >         at
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> >         at
> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >         at
> >
> >
> org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:58)
> >         at
> > org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:108)
> >         at
> > org.apache.samza.util.Util$.assignContainerToSSPTaskNames(Util.scala:127)
> >         at
> >
> >
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:77)
> >         at
> > org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
> >         at
> > org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> > 2015-03-22 20:13:11 ClientUtils$ [WARN] Fetching topic metadata with
> > correlation id 0 for topics [Set(redacted)] from broker
> > [id:0,host:redacted,port:9092] failed
> >
> > So, it's not connecting ... which has me confused because I'm pushing
> data
> > (from that python script) using the machine I'm running this test on.
> I'll
> > do some more digging and report back the results.
> >
> > On Sun, Mar 22, 2015 at 11:55 AM, Ash W Matheson <ash.matheson@gmail.com
> >
> > wrote:
> >
> > > Ignore that last email - was reading the page stupidly.
> > >
> > > On Sun, Mar 22, 2015 at 11:52 AM, Ash W Matheson <
> ash.matheson@gmail.com
> > >
> > > wrote:
> > >
> > >> Is there any easy way to do that without recompiling samza?  I'm
> trying
> > >> to localize that into the 'hello-samza' and looking at
> > >> http://samza.apache.org/learn/documentation/latest/jobs/logging.html
> > >> leads me to believe that I have to do this in the base samza project
> > (not
> > >> hello-samza).
> > >>
> > >> On Sun, Mar 22, 2015 at 11:37 AM, Ash W Matheson <
> > ash.matheson@gmail.com>
> > >> wrote:
> > >>
> > >>> Sure - I'll do that in a bit and send it up to pastebin.
> > >>>
> > >>> On Sun, Mar 22, 2015 at 11:35 AM, Chinmay Soman <
> > >>> chinmay.cerebro@gmail.com> wrote:
> > >>>
> > >>>> Can you please enable debug level logging and paste the log?
> > >>>>
> > >>>> On Sun, Mar 22, 2015 at 11:28 AM, Ash W Matheson <
> > >>>> ash.matheson@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>> > No, it's behind some corporate stuff - I just redacted it
so I
> could
> > >>>> share
> > >>>> > it up.
> > >>>> >
> > >>>> > On Sun, Mar 22, 2015 at 11:17 AM, Chinmay Soman <
> > >>>> chinmay.cerebro@gmail.com
> > >>>> > >
> > >>>> > wrote:
> > >>>> >
> > >>>> > > Just for sanity check, is the broker host 'redacted:9092'
or '
> > >>>> > > redactedec:9092'.
> > >>>> > >
> > >>>> > > Just wanted to rule out any typos. Are the 2 above hosts
the
> same
> > ?
> > >>>> > >
> > >>>> > > On Sun, Mar 22, 2015 at 11:08 AM, Ash W Matheson <
> > >>>> ash.matheson@gmail.com
> > >>>> > >
> > >>>> > > wrote:
> > >>>> > >
> > >>>> > > > Also, here's the producer: http://pastebin.com/qMNJabTZ
> > >>>> > > >
> > >>>> > > >
> > >>>> > > > On Sun, Mar 22, 2015 at 10:57 AM, Ash W Matheson
<
> > >>>> > ash.matheson@gmail.com
> > >>>> > > >
> > >>>> > > > wrote:
> > >>>> > > >
> > >>>> > > > > Yep, first thing I checked (got bitten by that
earlier in
> the
> > >>>> week
> > >>>> > with
> > >>>> > > > no
> > >>>> > > > > data actually in the topic).
> > >>>> > > > >
> > >>>> > > > > On Sun, Mar 22, 2015 at 10:56 AM, Chinmay Soman
<
> > >>>> > > > chinmay.cerebro@gmail.com
> > >>>> > > > > > wrote:
> > >>>> > > > >
> > >>>> > > > >> Can you double check that you can read
data from your Kafka
> > >>>> broker ?
> > >>>> > > > >>
> > >>>> > > > >> > ./deploy/kafka/bin/kafka-topics.sh
--describe --zookeeper
> > >>>> > > > localhost:2181
> > >>>> > > > >> --topic myTopic
> > >>>> > > > >> > ./deploy/kafka/bin/kafka-console-consumer.sh
--zookeeper
> > >>>> > > > localhost:2181
> > >>>> > > > >> --topic myTopic --from-beginning
> > >>>> > > > >>
> > >>>> > > > >> I've seen cases where if the Kafka broker
isn't shutdown
> > >>>> properly,
> > >>>> > > > >> something like this happens.
> > >>>> > > > >>
> > >>>> > > > >> On Sun, Mar 22, 2015 at 10:35 AM, Ash W
Matheson <
> > >>>> > > > ash.matheson@gmail.com>
> > >>>> > > > >> wrote:
> > >>>> > > > >>
> > >>>> > > > >> > Hey all,
> > >>>> > > > >> >
> > >>>> > > > >> > Evaluating Samza currently and am
running into some odd
> > >>>> issues.
> > >>>> > > > >> >
> > >>>> > > > >> > I'm currently working off the 'hello-samza'
repo and
> trying
> > >>>> to
> > >>>> > > parse a
> > >>>> > > > >> > simple kafka topic that I've produced
through an extenal
> > >>>> java app
> > >>>> > > > >> (nothing
> > >>>> > > > >> > other than a series of sentences)
and it's failing pretty
> > >>>> hard for
> > >>>> > > me.
> > >>>> > > > >> The
> > >>>> > > > >> > base 'hello-samza' set of apps works
fine, but as soon
> as I
> > >>>> change
> > >>>> > > the
> > >>>> > > > >> > configuration to look at a different
Kafka/zookeeper I
> get
> > >>>> the
> > >>>> > > > >> following in
> > >>>> > > > >> > the userlogs:
> > >>>> > > > >> >
> > >>>> > > > >> > 2015-03-22 17:07:09 KafkaSystemAdmin
[WARN] Unable to
> fetch
> > >>>> last
> > >>>> > > > offsets
> > >>>> > > > >> > for streams [myTopic] due to kafka.common.KafkaException:
> > >>>> fetching
> > >>>> > > > topic
> > >>>> > > > >> > metadata for topics [Set(myTopic)]
from broker
> > >>>> > > > >> > [ArrayBuffer(id:0,host:redacted,port:9092)]
failed.
> > Retrying.
> > >>>> > > > >> >
> > >>>> > > > >> >
> > >>>> > > > >> > The modifications are pretty straightforward.
 In the
> > >>>> > > > >> > Wikipedia-parser.properties, I've
changed the following:
> > >>>> > > > >> > task.inputs=kafka.myTopic
> > >>>> > > > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
> > >>>> > > > >> > systems.kafka.consumer.auto.offset.reset=smallest
> > >>>> > > > >> > systems.kafka.producer.metadata.broker.list=redacted:9092
> > >>>> > > > >> >
> > >>>> > > > >> > and in the actual java file
> WikipediaParserStreamTask.java
> > >>>> > > > >> >   public void process(IncomingMessageEnvelope
envelope,
> > >>>> > > > MessageCollector
> > >>>> > > > >> > collector, TaskCoordinator coordinator)
{
> > >>>> > > > >> >     Map<String, Object> jsonObject
= (Map<String,
> Object>)
> > >>>> > > > >> > envelope.getMessage();
> > >>>> > > > >> >     WikipediaFeedEvent event = new
> > >>>> WikipediaFeedEvent(jsonObject);
> > >>>> > > > >> >
> > >>>> > > > >> >     try {
> > >>>> > > > >> >         System.out.println(event.getRawEvent());
> > >>>> > > > >> >
> > >>>> > > > >> > And then following the compile/extract/run
process
> outlined
> > >>>> in the
> > >>>> > > > >> > hello-samza website.
> > >>>> > > > >> >
> > >>>> > > > >> > Any thoughts?  I've looked online
for any 'super simple'
> > >>>> examples
> > >>>> > of
> > >>>> > > > >> > ingesting kafka in samza with very
little success.
> > >>>> > > > >> >
> > >>>> > > > >>
> > >>>> > > > >>
> > >>>> > > > >>
> > >>>> > > > >> --
> > >>>> > > > >> Thanks and regards
> > >>>> > > > >>
> > >>>> > > > >> Chinmay Soman
> > >>>> > > > >>
> > >>>> > > > >
> > >>>> > > > >
> > >>>> > > >
> > >>>> > >
> > >>>> > >
> > >>>> > >
> > >>>> > > --
> > >>>> > > Thanks and regards
> > >>>> > >
> > >>>> > > Chinmay Soman
> > >>>> > >
> > >>>> >
> > >>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> Thanks and regards
> > >>>>
> > >>>> Chinmay Soman
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message