samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chinmay Soman <chinmay.cere...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Tue, 24 Mar 2015 00:08:30 GMT
Hey Ash,

Can you see your job metrics (if you have the Jmx metrics defined) to see
if your job is actually doing anything ? My only guess at this point is the
process method is not being called because somehow there's no incoming
data. I could be totally wrong of course.

On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <ash.matheson@gmail.com>
wrote:

> Just to be clear, here's what's changed from the default hello-samza repo:
>
> wikipedia-parser.properties==========================
> task.inputs=kafka.myTopic
> systems.kafka.consumer.zookeeper.connect=
> ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
> systems.kafka.consumer.auto.offset.reset=smallest
>
> WikipediaParserStreamTask.java =====================
>   public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator) {
>     Map<String, Object> jsonObject = (Map<String, Object>)
> envelope.getMessage();
>     WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
>
>     try {
>       System.out.println(event.getRawEvent());
>       // Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
>
>       // parsedJsonObject.put("channel", event.getChannel());
>       // parsedJsonObject.put("source", event.getSource());
>       // parsedJsonObject.put("time", event.getTime());
>
>       // collector.send(new OutgoingMessageEnvelope(new
> SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
>
> as well as the aforementioned changes to the log4j.xml file.
>
> The data pushed into the 'myTopic' topic is nothing more than a sentence.
>
>
> On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <ash.matheson@gmail.com>
> wrote:
>
> > yep, modified log4j.xml to look like this:
> >
> >   <root>
> >     <priority value="debug" />
> >     <appender-ref ref="RollingAppender"/>
> >     <appender-ref ref="jmx" />
> >   </root>
> >
> > Not sure what you mean by #2.
> >
> > However, I'm running now, not seeing any exceptions, but still not seeing
> > any output from System.out.println(...)
> >
> > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram <
> > nsomasundaram@linkedin.com.invalid> wrote:
> >
> >> Hey Ash,
> >>                1. Did you happen to modify your log4j.xml ?
> >>                2. Can you print the class path that was printed when the
> >> job started ? I am wondering if log4j was not loaded or not present in
> the
> >> path where it’s looking for. If you have been using hello samza, it
> should
> >> have pulled it from Maven.
> >>
> >> Thanks,
> >> Naveen
> >>
> >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <ash.matheson@gmail.com>
> >> wrote:
> >>
> >> > Hey all,
> >> >
> >> > Evaluating Samza currently and am running into some odd issues.
> >> >
> >> > I'm currently working off the 'hello-samza' repo and trying to parse a
> >> > simple kafka topic that I've produced through an extenal java app
> >> (nothing
> >> > other than a series of sentences) and it's failing pretty hard for me.
> >> The
> >> > base 'hello-samza' set of apps works fine, but as soon as I change the
> >> > configuration to look at a different Kafka/zookeeper I get the
> >> following in
> >> > the userlogs:
> >> >
> >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to fetch last
> offsets
> >> > for streams [myTopic] due to kafka.common.KafkaException: fetching
> topic
> >> > metadata for topics [Set(myTopic)] from broker
> >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. Retrying.
> >> >
> >> >
> >> > The modifications are pretty straightforward.  In the
> >> > Wikipedia-parser.properties, I've changed the following:
> >> > task.inputs=kafka.myTopic
> >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
> >> > systems.kafka.consumer.auto.offset.reset=smallest
> >> > systems.kafka.producer.metadata.broker.list=redacted:9092
> >> >
> >> > and in the actual java file WikipediaParserStreamTask.java
> >> >  public void process(IncomingMessageEnvelope envelope,
> MessageCollector
> >> > collector, TaskCoordinator coordinator) {
> >> >    Map<String, Object> jsonObject = (Map<String, Object>)
> >> > envelope.getMessage();
> >> >    WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
> >> >
> >> >    try {
> >> >        System.out.println(event.getRawEvent());
> >> >
> >> > And then following the compile/extract/run process outlined in the
> >> > hello-samza website.
> >> >
> >> > Any thoughts?  I've looked online for any 'super simple' examples of
> >> > ingesting kafka in samza with very little success.
> >>
> >>
> >
>



-- 
Thanks and regards

Chinmay Soman

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message