samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ash W Matheson <ash.mathe...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Mon, 23 Mar 2015 16:37:34 GMT
Yeah, to make sure that I could get at the topic on the server I re-ran the
producer and kafka-consumer to ensure I hadn't done something stupid - can
produce and consume data at will.  So I'm nuking the 'hello-samza' folder
and restarting from scratch, just to ensure I haven't messed something up
in the process.

I'll get back to the group when I have more info.

-Ash

On Mon, Mar 23, 2015 at 9:33 AM, Chinmay Soman <chinmay.cerebro@gmail.com>
wrote:

> Hey Ash,
>
> Yeah - I've confirmed that System.out.println should result in a message
> getting logged in the 'stdout' (at least while running in the local YARN
> mode).
>
> Not sure why you're not seeing the same behaviour. I'm assuming you've
> already confirmed that you're seeing data in your input Kafka topic ? You
> might want to double check this by running the console consumer from
> whatever node is processing your job.
>
> Other than that - we'll have to go back to debug mode to get more info.
>
> On Sun, Mar 22, 2015 at 7:21 PM, Ash W Matheson <ash.matheson@gmail.com>
> wrote:
>
> > looks like somewhere between Friday and today some security settings may
> > have changed - telneting into the kafka/zookeeper boxes was broker from
> the
> > test samza host. So it *seems* to be running without any exceptions now.
> >
> > However, I'm still not seeing any output in any of the logs in
> > deploy/yarn/logs/userlogs (and subdirectories).  I was assuming
> > System.out.println(...) would stuff results into either application.  any
> > thoughts on that?
> >
> > On Sun, Mar 22, 2015 at 1:53 PM, Chinmay Soman <
> chinmay.cerebro@gmail.com>
> > wrote:
> >
> > > It says UnresolvedAddressException.
> > >
> > > Can you check that you can telnet to 2181 and 9092 on that remote host
> ?
> > > Also, you might try giving an IP address instead.
> > >
> > > On Sun, Mar 22, 2015 at 1:24 PM, Ash W Matheson <
> ash.matheson@gmail.com>
> > > wrote:
> > >
> > > > OK, so the logs are much more detailed and useful, but it's not
> making
> > > any
> > > > more sense:
> > > >
> > > > 2015-03-22 20:13:11 ClientUtils$ [INFO] Fetching metadata from broker
> > > > id:0,host:redacted:9092 with correlation id 0 for 1 topic(s)
> > Set(myTopic)
> > > > 2015-03-22 20:13:11 SyncProducer [ERROR] Producer connection to
> > > > redacted:9092 unsuccessful
> > > > java.nio.channels.UnresolvedAddressException
> > > >         at sun.nio.ch.Net.checkAddress(Net.java:127)
> > > >         at
> > > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> > > >         at
> > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > >         at
> kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> > > >         at
> > > >
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> > > >         at
> > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > >         at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > >         at
> > > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > >         at
> > > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:40)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:197)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:138)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:137)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:136)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:125)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:58)
> > > >         at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> > > >         at
> > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> > > >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > > >         at
> > > >
> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> > > >         at
> > > > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:58)
> > > >         at
> > > > org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:108)
> > > >         at
> > > >
> > org.apache.samza.util.Util$.assignContainerToSSPTaskNames(Util.scala:127)
> > > >         at
> > > >
> > > >
> > >
> >
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:77)
> > > >         at
> > > >
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
> > > >         at
> > > > org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> > > > 2015-03-22 20:13:11 ClientUtils$ [WARN] Fetching topic metadata with
> > > > correlation id 0 for topics [Set(redacted)] from broker
> > > > [id:0,host:redacted,port:9092] failed
> > > >
> > > > So, it's not connecting ... which has me confused because I'm pushing
> > > data
> > > > (from that python script) using the machine I'm running this test on.
> > > I'll
> > > > do some more digging and report back the results.
> > > >
> > > > On Sun, Mar 22, 2015 at 11:55 AM, Ash W Matheson <
> > ash.matheson@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Ignore that last email - was reading the page stupidly.
> > > > >
> > > > > On Sun, Mar 22, 2015 at 11:52 AM, Ash W Matheson <
> > > ash.matheson@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Is there any easy way to do that without recompiling samza? 
I'm
> > > trying
> > > > >> to localize that into the 'hello-samza' and looking at
> > > > >>
> > http://samza.apache.org/learn/documentation/latest/jobs/logging.html
> > > > >> leads me to believe that I have to do this in the base samza
> project
> > > > (not
> > > > >> hello-samza).
> > > > >>
> > > > >> On Sun, Mar 22, 2015 at 11:37 AM, Ash W Matheson <
> > > > ash.matheson@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Sure - I'll do that in a bit and send it up to pastebin.
> > > > >>>
> > > > >>> On Sun, Mar 22, 2015 at 11:35 AM, Chinmay Soman <
> > > > >>> chinmay.cerebro@gmail.com> wrote:
> > > > >>>
> > > > >>>> Can you please enable debug level logging and paste the
log?
> > > > >>>>
> > > > >>>> On Sun, Mar 22, 2015 at 11:28 AM, Ash W Matheson <
> > > > >>>> ash.matheson@gmail.com>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> > No, it's behind some corporate stuff - I just redacted
it so I
> > > could
> > > > >>>> share
> > > > >>>> > it up.
> > > > >>>> >
> > > > >>>> > On Sun, Mar 22, 2015 at 11:17 AM, Chinmay Soman
<
> > > > >>>> chinmay.cerebro@gmail.com
> > > > >>>> > >
> > > > >>>> > wrote:
> > > > >>>> >
> > > > >>>> > > Just for sanity check, is the broker host 'redacted:9092'
> or '
> > > > >>>> > > redactedec:9092'.
> > > > >>>> > >
> > > > >>>> > > Just wanted to rule out any typos. Are the
2 above hosts the
> > > same
> > > > ?
> > > > >>>> > >
> > > > >>>> > > On Sun, Mar 22, 2015 at 11:08 AM, Ash W Matheson
<
> > > > >>>> ash.matheson@gmail.com
> > > > >>>> > >
> > > > >>>> > > wrote:
> > > > >>>> > >
> > > > >>>> > > > Also, here's the producer: http://pastebin.com/qMNJabTZ
> > > > >>>> > > >
> > > > >>>> > > >
> > > > >>>> > > > On Sun, Mar 22, 2015 at 10:57 AM, Ash
W Matheson <
> > > > >>>> > ash.matheson@gmail.com
> > > > >>>> > > >
> > > > >>>> > > > wrote:
> > > > >>>> > > >
> > > > >>>> > > > > Yep, first thing I checked (got bitten
by that earlier
> in
> > > the
> > > > >>>> week
> > > > >>>> > with
> > > > >>>> > > > no
> > > > >>>> > > > > data actually in the topic).
> > > > >>>> > > > >
> > > > >>>> > > > > On Sun, Mar 22, 2015 at 10:56 AM,
Chinmay Soman <
> > > > >>>> > > > chinmay.cerebro@gmail.com
> > > > >>>> > > > > > wrote:
> > > > >>>> > > > >
> > > > >>>> > > > >> Can you double check that you
can read data from your
> > Kafka
> > > > >>>> broker ?
> > > > >>>> > > > >>
> > > > >>>> > > > >> > ./deploy/kafka/bin/kafka-topics.sh
--describe
> > --zookeeper
> > > > >>>> > > > localhost:2181
> > > > >>>> > > > >> --topic myTopic
> > > > >>>> > > > >> > ./deploy/kafka/bin/kafka-console-consumer.sh
> > --zookeeper
> > > > >>>> > > > localhost:2181
> > > > >>>> > > > >> --topic myTopic --from-beginning
> > > > >>>> > > > >>
> > > > >>>> > > > >> I've seen cases where if the
Kafka broker isn't
> shutdown
> > > > >>>> properly,
> > > > >>>> > > > >> something like this happens.
> > > > >>>> > > > >>
> > > > >>>> > > > >> On Sun, Mar 22, 2015 at 10:35
AM, Ash W Matheson <
> > > > >>>> > > > ash.matheson@gmail.com>
> > > > >>>> > > > >> wrote:
> > > > >>>> > > > >>
> > > > >>>> > > > >> > Hey all,
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > Evaluating Samza currently
and am running into some
> odd
> > > > >>>> issues.
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > I'm currently working off
the 'hello-samza' repo and
> > > trying
> > > > >>>> to
> > > > >>>> > > parse a
> > > > >>>> > > > >> > simple kafka topic that
I've produced through an
> > extenal
> > > > >>>> java app
> > > > >>>> > > > >> (nothing
> > > > >>>> > > > >> > other than a series of sentences)
and it's failing
> > pretty
> > > > >>>> hard for
> > > > >>>> > > me.
> > > > >>>> > > > >> The
> > > > >>>> > > > >> > base 'hello-samza' set of
apps works fine, but as
> soon
> > > as I
> > > > >>>> change
> > > > >>>> > > the
> > > > >>>> > > > >> > configuration to look at
a different Kafka/zookeeper
> I
> > > get
> > > > >>>> the
> > > > >>>> > > > >> following in
> > > > >>>> > > > >> > the userlogs:
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > 2015-03-22 17:07:09 KafkaSystemAdmin
[WARN] Unable to
> > > fetch
> > > > >>>> last
> > > > >>>> > > > offsets
> > > > >>>> > > > >> > for streams [myTopic] due
to
> > kafka.common.KafkaException:
> > > > >>>> fetching
> > > > >>>> > > > topic
> > > > >>>> > > > >> > metadata for topics [Set(myTopic)]
from broker
> > > > >>>> > > > >> > [ArrayBuffer(id:0,host:redacted,port:9092)]
failed.
> > > > Retrying.
> > > > >>>> > > > >> >
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > The modifications are pretty
straightforward.  In the
> > > > >>>> > > > >> > Wikipedia-parser.properties,
I've changed the
> > following:
> > > > >>>> > > > >> > task.inputs=kafka.myTopic
> > > > >>>> > > > >> >
> systems.kafka.consumer.zookeeper.connect=redacted:2181/
> > > > >>>> > > > >> > systems.kafka.consumer.auto.offset.reset=smallest
> > > > >>>> > > > >> >
> > systems.kafka.producer.metadata.broker.list=redacted:9092
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > and in the actual java file
> > > WikipediaParserStreamTask.java
> > > > >>>> > > > >> >   public void process(IncomingMessageEnvelope
> envelope,
> > > > >>>> > > > MessageCollector
> > > > >>>> > > > >> > collector, TaskCoordinator
coordinator) {
> > > > >>>> > > > >> >     Map<String, Object>
jsonObject = (Map<String,
> > > Object>)
> > > > >>>> > > > >> > envelope.getMessage();
> > > > >>>> > > > >> >     WikipediaFeedEvent event
= new
> > > > >>>> WikipediaFeedEvent(jsonObject);
> > > > >>>> > > > >> >
> > > > >>>> > > > >> >     try {
> > > > >>>> > > > >> >         System.out.println(event.getRawEvent());
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > And then following the compile/extract/run
process
> > > outlined
> > > > >>>> in the
> > > > >>>> > > > >> > hello-samza website.
> > > > >>>> > > > >> >
> > > > >>>> > > > >> > Any thoughts?  I've looked
online for any 'super
> > simple'
> > > > >>>> examples
> > > > >>>> > of
> > > > >>>> > > > >> > ingesting kafka in samza
with very little success.
> > > > >>>> > > > >> >
> > > > >>>> > > > >>
> > > > >>>> > > > >>
> > > > >>>> > > > >>
> > > > >>>> > > > >> --
> > > > >>>> > > > >> Thanks and regards
> > > > >>>> > > > >>
> > > > >>>> > > > >> Chinmay Soman
> > > > >>>> > > > >>
> > > > >>>> > > > >
> > > > >>>> > > > >
> > > > >>>> > > >
> > > > >>>> > >
> > > > >>>> > >
> > > > >>>> > >
> > > > >>>> > > --
> > > > >>>> > > Thanks and regards
> > > > >>>> > >
> > > > >>>> > > Chinmay Soman
> > > > >>>> > >
> > > > >>>> >
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> --
> > > > >>>> Thanks and regards
> > > > >>>>
> > > > >>>> Chinmay Soman
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks and regards
> > >
> > > Chinmay Soman
> > >
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message