samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chinmay Soman <chinmay.cere...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Mon, 23 Mar 2015 16:33:48 GMT
Hey Ash,

Yeah - I've confirmed that System.out.println should result in a message
getting logged in the 'stdout' (at least while running in the local YARN
mode).

Not sure why you're not seeing the same behaviour. I'm assuming you've
already confirmed that you're seeing data in your input Kafka topic ? You
might want to double check this by running the console consumer from
whatever node is processing your job.

Other than that - we'll have to go back to debug mode to get more info.

On Sun, Mar 22, 2015 at 7:21 PM, Ash W Matheson <ash.matheson@gmail.com>
wrote:

> looks like somewhere between Friday and today some security settings may
> have changed - telneting into the kafka/zookeeper boxes was broker from the
> test samza host. So it *seems* to be running without any exceptions now.
>
> However, I'm still not seeing any output in any of the logs in
> deploy/yarn/logs/userlogs (and subdirectories).  I was assuming
> System.out.println(...) would stuff results into either application.  any
> thoughts on that?
>
> On Sun, Mar 22, 2015 at 1:53 PM, Chinmay Soman <chinmay.cerebro@gmail.com>
> wrote:
>
> > It says UnresolvedAddressException.
> >
> > Can you check that you can telnet to 2181 and 9092 on that remote host ?
> > Also, you might try giving an IP address instead.
> >
> > On Sun, Mar 22, 2015 at 1:24 PM, Ash W Matheson <ash.matheson@gmail.com>
> > wrote:
> >
> > > OK, so the logs are much more detailed and useful, but it's not making
> > any
> > > more sense:
> > >
> > > 2015-03-22 20:13:11 ClientUtils$ [INFO] Fetching metadata from broker
> > > id:0,host:redacted:9092 with correlation id 0 for 1 topic(s)
> Set(myTopic)
> > > 2015-03-22 20:13:11 SyncProducer [ERROR] Producer connection to
> > > redacted:9092 unsuccessful
> > > java.nio.channels.UnresolvedAddressException
> > >         at sun.nio.ch.Net.checkAddress(Net.java:127)
> > >         at
> > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
> > >         at
> > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > >         at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> > >         at
> > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> > >         at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > >         at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > >         at
> > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > >         at
> > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
> > >         at
> > >
> > >
> >
> org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:40)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:197)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:138)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:137)
> > >         at
> > >
> > >
> >
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:136)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:125)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:58)
> > >         at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> > >         at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> > >         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > >         at
> > >
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> > >         at
> > > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> > >         at
> > >
> > >
> >
> org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:58)
> > >         at
> > > org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:108)
> > >         at
> > >
> org.apache.samza.util.Util$.assignContainerToSSPTaskNames(Util.scala:127)
> > >         at
> > >
> > >
> >
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:77)
> > >         at
> > > org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
> > >         at
> > > org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> > > 2015-03-22 20:13:11 ClientUtils$ [WARN] Fetching topic metadata with
> > > correlation id 0 for topics [Set(redacted)] from broker
> > > [id:0,host:redacted,port:9092] failed
> > >
> > > So, it's not connecting ... which has me confused because I'm pushing
> > data
> > > (from that python script) using the machine I'm running this test on.
> > I'll
> > > do some more digging and report back the results.
> > >
> > > On Sun, Mar 22, 2015 at 11:55 AM, Ash W Matheson <
> ash.matheson@gmail.com
> > >
> > > wrote:
> > >
> > > > Ignore that last email - was reading the page stupidly.
> > > >
> > > > On Sun, Mar 22, 2015 at 11:52 AM, Ash W Matheson <
> > ash.matheson@gmail.com
> > > >
> > > > wrote:
> > > >
> > > >> Is there any easy way to do that without recompiling samza?  I'm
> > trying
> > > >> to localize that into the 'hello-samza' and looking at
> > > >>
> http://samza.apache.org/learn/documentation/latest/jobs/logging.html
> > > >> leads me to believe that I have to do this in the base samza project
> > > (not
> > > >> hello-samza).
> > > >>
> > > >> On Sun, Mar 22, 2015 at 11:37 AM, Ash W Matheson <
> > > ash.matheson@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Sure - I'll do that in a bit and send it up to pastebin.
> > > >>>
> > > >>> On Sun, Mar 22, 2015 at 11:35 AM, Chinmay Soman <
> > > >>> chinmay.cerebro@gmail.com> wrote:
> > > >>>
> > > >>>> Can you please enable debug level logging and paste the log?
> > > >>>>
> > > >>>> On Sun, Mar 22, 2015 at 11:28 AM, Ash W Matheson <
> > > >>>> ash.matheson@gmail.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>> > No, it's behind some corporate stuff - I just redacted
it so I
> > could
> > > >>>> share
> > > >>>> > it up.
> > > >>>> >
> > > >>>> > On Sun, Mar 22, 2015 at 11:17 AM, Chinmay Soman <
> > > >>>> chinmay.cerebro@gmail.com
> > > >>>> > >
> > > >>>> > wrote:
> > > >>>> >
> > > >>>> > > Just for sanity check, is the broker host 'redacted:9092'
or '
> > > >>>> > > redactedec:9092'.
> > > >>>> > >
> > > >>>> > > Just wanted to rule out any typos. Are the 2 above
hosts the
> > same
> > > ?
> > > >>>> > >
> > > >>>> > > On Sun, Mar 22, 2015 at 11:08 AM, Ash W Matheson
<
> > > >>>> ash.matheson@gmail.com
> > > >>>> > >
> > > >>>> > > wrote:
> > > >>>> > >
> > > >>>> > > > Also, here's the producer: http://pastebin.com/qMNJabTZ
> > > >>>> > > >
> > > >>>> > > >
> > > >>>> > > > On Sun, Mar 22, 2015 at 10:57 AM, Ash W Matheson
<
> > > >>>> > ash.matheson@gmail.com
> > > >>>> > > >
> > > >>>> > > > wrote:
> > > >>>> > > >
> > > >>>> > > > > Yep, first thing I checked (got bitten
by that earlier in
> > the
> > > >>>> week
> > > >>>> > with
> > > >>>> > > > no
> > > >>>> > > > > data actually in the topic).
> > > >>>> > > > >
> > > >>>> > > > > On Sun, Mar 22, 2015 at 10:56 AM, Chinmay
Soman <
> > > >>>> > > > chinmay.cerebro@gmail.com
> > > >>>> > > > > > wrote:
> > > >>>> > > > >
> > > >>>> > > > >> Can you double check that you can
read data from your
> Kafka
> > > >>>> broker ?
> > > >>>> > > > >>
> > > >>>> > > > >> > ./deploy/kafka/bin/kafka-topics.sh
--describe
> --zookeeper
> > > >>>> > > > localhost:2181
> > > >>>> > > > >> --topic myTopic
> > > >>>> > > > >> > ./deploy/kafka/bin/kafka-console-consumer.sh
> --zookeeper
> > > >>>> > > > localhost:2181
> > > >>>> > > > >> --topic myTopic --from-beginning
> > > >>>> > > > >>
> > > >>>> > > > >> I've seen cases where if the Kafka
broker isn't shutdown
> > > >>>> properly,
> > > >>>> > > > >> something like this happens.
> > > >>>> > > > >>
> > > >>>> > > > >> On Sun, Mar 22, 2015 at 10:35 AM,
Ash W Matheson <
> > > >>>> > > > ash.matheson@gmail.com>
> > > >>>> > > > >> wrote:
> > > >>>> > > > >>
> > > >>>> > > > >> > Hey all,
> > > >>>> > > > >> >
> > > >>>> > > > >> > Evaluating Samza currently and
am running into some odd
> > > >>>> issues.
> > > >>>> > > > >> >
> > > >>>> > > > >> > I'm currently working off the
'hello-samza' repo and
> > trying
> > > >>>> to
> > > >>>> > > parse a
> > > >>>> > > > >> > simple kafka topic that I've
produced through an
> extenal
> > > >>>> java app
> > > >>>> > > > >> (nothing
> > > >>>> > > > >> > other than a series of sentences)
and it's failing
> pretty
> > > >>>> hard for
> > > >>>> > > me.
> > > >>>> > > > >> The
> > > >>>> > > > >> > base 'hello-samza' set of apps
works fine, but as soon
> > as I
> > > >>>> change
> > > >>>> > > the
> > > >>>> > > > >> > configuration to look at a different
Kafka/zookeeper I
> > get
> > > >>>> the
> > > >>>> > > > >> following in
> > > >>>> > > > >> > the userlogs:
> > > >>>> > > > >> >
> > > >>>> > > > >> > 2015-03-22 17:07:09 KafkaSystemAdmin
[WARN] Unable to
> > fetch
> > > >>>> last
> > > >>>> > > > offsets
> > > >>>> > > > >> > for streams [myTopic] due to
> kafka.common.KafkaException:
> > > >>>> fetching
> > > >>>> > > > topic
> > > >>>> > > > >> > metadata for topics [Set(myTopic)]
from broker
> > > >>>> > > > >> > [ArrayBuffer(id:0,host:redacted,port:9092)]
failed.
> > > Retrying.
> > > >>>> > > > >> >
> > > >>>> > > > >> >
> > > >>>> > > > >> > The modifications are pretty
straightforward.  In the
> > > >>>> > > > >> > Wikipedia-parser.properties,
I've changed the
> following:
> > > >>>> > > > >> > task.inputs=kafka.myTopic
> > > >>>> > > > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
> > > >>>> > > > >> > systems.kafka.consumer.auto.offset.reset=smallest
> > > >>>> > > > >> >
> systems.kafka.producer.metadata.broker.list=redacted:9092
> > > >>>> > > > >> >
> > > >>>> > > > >> > and in the actual java file
> > WikipediaParserStreamTask.java
> > > >>>> > > > >> >   public void process(IncomingMessageEnvelope
envelope,
> > > >>>> > > > MessageCollector
> > > >>>> > > > >> > collector, TaskCoordinator coordinator)
{
> > > >>>> > > > >> >     Map<String, Object>
jsonObject = (Map<String,
> > Object>)
> > > >>>> > > > >> > envelope.getMessage();
> > > >>>> > > > >> >     WikipediaFeedEvent event
= new
> > > >>>> WikipediaFeedEvent(jsonObject);
> > > >>>> > > > >> >
> > > >>>> > > > >> >     try {
> > > >>>> > > > >> >         System.out.println(event.getRawEvent());
> > > >>>> > > > >> >
> > > >>>> > > > >> > And then following the compile/extract/run
process
> > outlined
> > > >>>> in the
> > > >>>> > > > >> > hello-samza website.
> > > >>>> > > > >> >
> > > >>>> > > > >> > Any thoughts?  I've looked online
for any 'super
> simple'
> > > >>>> examples
> > > >>>> > of
> > > >>>> > > > >> > ingesting kafka in samza with
very little success.
> > > >>>> > > > >> >
> > > >>>> > > > >>
> > > >>>> > > > >>
> > > >>>> > > > >>
> > > >>>> > > > >> --
> > > >>>> > > > >> Thanks and regards
> > > >>>> > > > >>
> > > >>>> > > > >> Chinmay Soman
> > > >>>> > > > >>
> > > >>>> > > > >
> > > >>>> > > > >
> > > >>>> > > >
> > > >>>> > >
> > > >>>> > >
> > > >>>> > >
> > > >>>> > > --
> > > >>>> > > Thanks and regards
> > > >>>> > >
> > > >>>> > > Chinmay Soman
> > > >>>> > >
> > > >>>> >
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> Thanks and regards
> > > >>>>
> > > >>>> Chinmay Soman
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> >
> >
> >
> > --
> > Thanks and regards
> >
> > Chinmay Soman
> >
>



-- 
Thanks and regards

Chinmay Soman

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message