samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chinmay Soman <chinmay.cere...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Tue, 24 Mar 2015 17:45:07 GMT
Hey Ash,

Yeah I think more examples would be great ! Feel free to open a RB.

About the config: It might be a bit of an overhead in general and requires
some getting used to. I realize that after I've started writing Samza jobs
myself. I'm kinda new to Samza myself, but I always kept making some silly
mistakes which is difficult to validate early on. Its one of those things
which is very difficult to get right (cause everything has its pros and
cons). Hopefully I can finish up my patch regarding exactly that problem -
config validation.

On Mon, Mar 23, 2015 at 10:04 PM, Ash W Matheson <ash.matheson@gmail.com>
wrote:

> just to clarify, adding:
>
> serializers.registry.string.class=org.apache.samza.
> serializers.StringSerdeFactory
> systems.kafka.streams.myTopic.samza.msg.serde=string
>
> to the property file and updating the java source to:
> System.out.println((String)envelope.getMessage());
>
> Did the trick. I've updated the pastebin with the appropriate change.
>
> Now, through this whole process I've assumed that StreamTask is the
> appropriate class to derive from.  Essentially, the end goal is to decode a
> compressed bytestream Kafka event (from a different topic, of course) and
> then feed it to a DB/TSDB/whatever.  I didn't think that I'd need to
> generate a Job for this and that this stream should be able to feed
> directly to the output entity.
>
> Anyway, over the next couple of days I'll migrate this into something that
> can live in the hello-samza ecosystem as a separate task. I've got the
> start of a java based DataPusher built as well.
>
> On Mon, Mar 23, 2015 at 9:40 PM, Ash W Matheson <ash.matheson@gmail.com>
> wrote:
>
> > Huzzah!  I ... have ... text showing!
> >
> > This has been enough of a trial that I think I'll convert this into a
> very
> > simple sample project for the repo, if you guys are interested.
> >
> > Diff coming once I have it cleaned up into something less ugly.
> >
> > -Ash
> >
> > On Mon, Mar 23, 2015 at 9:27 PM, Chinmay Soman <
> chinmay.cerebro@gmail.com>
> > wrote:
> >
> >> >I changed the systems.kafka.samza.msg.serde=json to 'string' a while
> >> back,
> >> but that caused a separate exception.  However that was many, MANY
> >> attempts
> >> ago.
> >>
> >> This may not work because that will set all serialization formats (input
> >> and output) to json / string. In your case you're inputting string and
> >> outputting json. So you might have to set that explicitly.
> >>
> >> On Mon, Mar 23, 2015 at 9:24 PM, Chinmay Soman <
> chinmay.cerebro@gmail.com
> >> >
> >> wrote:
> >>
> >> > Since you're producing String data to 'myTopic', can you try setting
> the
> >> > string serialization in your config ?
> >> >
> >> >
> >>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >> >
> >> > systems.kafka.streams.myTopic.samza.msg.serde=string
> >> >
> >> >
> >> > On Mon, Mar 23, 2015 at 9:17 PM, Ash W Matheson <
> ash.matheson@gmail.com
> >> >
> >> > wrote:
> >> >
> >> >> more info - new exception message:
> >> >>
> >> >> Exception in thread "main"
> >> >> org.apache.samza.system.SystemConsumersException: Cannot deserialize
> an
> >> >> incoming message.
> >> >>
> >> >> Updated the diff in pastebin with the changes.
> >> >>
> >> >> On Mon, Mar 23, 2015 at 8:41 PM, Ash W Matheson <
> >> ash.matheson@gmail.com>
> >> >> wrote:
> >> >>
> >> >> > Gah!  Yeah, those were gone several revisions ago but didn't get
> >> nuked
> >> >> in
> >> >> > the last iteration.
> >> >> >
> >> >> > OK, let me do a quick test to see if that was my problem all along.
> >> >> >
> >> >> > On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh <
> >> >> > nramesh@linkedin.com.invalid> wrote:
> >> >> >
> >> >> >> Hey Ash,
> >> >> >> I was referring to the lines before the try block.
> >> >> >>
> >> >> >> Map<String, Object> jsonObject = (Map<String, Object>)
> >> >> >> envelope.getMessage();
> >> >> >>     WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
> >> >> >>
> >> >> >>     try {
> >> >> >>       System.out.println("[DWH] should see this");
> >> >> >>       System.out.println(event.getRawEvent());
> >> >> >> …
> >> >> >>
> >> >> >>
> >> >> >> Did you remove those lines as well?
> >> >> >>
> >> >> >> Navina
> >> >> >>
> >> >> >> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.matheson@gmail.com>
> >> wrote:
> >> >> >>
> >> >> >> >Just looking at the diff I posted and it's:
> >> >> >> >
> >> >> >> >
> >> >> >> >   1.      try {
> >> >> >> >   2. -      Map<String, Object> parsedJsonObject =
> >> >> >> >parse(event.getRawEvent(
> >> >> >> >   ));
> >> >> >> >   3. +      System.out.println("[DWH] should see this");
> >> >> >> >   4. +      System.out.println(event.getRawEvent());
> >> >> >> >   5. +      // Map<String, Object> parsedJsonObject = parse(
> >> >> >> >   event.getRawEvent());
> >> >> >> >
> >> >> >> >
> >> >> >> >I've removed the Map and added two System.out.println calls.  So
> >> no,
> >> >> >> there
> >> >> >> >shouldn't be any reference to
> >> >> >> >Map<String, Object> parsedJsonObject =
> parse(event.getRawEvent());
> >> >> >> >in the source java file.
> >> >> >> >
> >> >> >> >
> >> >> >> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <
> >> >> ash.matheson@gmail.com>
> >> >> >> >wrote:
> >> >> >> >
> >> >> >> >> I'm in transit right now but if memory serves me everything
> >> should
> >> >> be
> >> >> >> >> commented out of that method except for the System.out.println
> >> call.
> >> >> >> >>I'll
> >> >> >> >> be home shortly and can confirm.
> >> >> >> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh"
> >> >> <nramesh@linkedin.com.invalid
> >> >> >> >
> >> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >>> Hi Ash,
> >> >> >> >>> I just ran wikipedia-parser with your patch. Looks like you
> have
> >> >> set
> >> >> >> >>>the
> >> >> >> >>> message serde correctly in the configs. However, the original
> >> code
> >> >> >> >>>still
> >> >> >> >>> converts it into a Map for consumption in the
> >> WikipediaFeedEvent.
> >> >> >> >>> I am seeing the following (expected):
> >> >> >> >>>
> >> >> >> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR]
> >> Uncaught
> >> >> >> >>> exception in thread (name=main). Exiting process now.
> >> >> >> >>> java.lang.ClassCastException: java.lang.String cannot be cast
> to
> >> >> >> >>> java.util.Map
> >> >> >> >>>  at
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi
> >> >> >> >>>aPa
> >> >> >> >>> rserStreamTask.java:38)
> >> >> >> >>>  at
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(
> >> >> >> >>>Tas
> >> >> >> >>> kInstance.scala:133)
> >> >> >> >>>
> >> >> >> >>> Did you make the changes to fix this error? Your patch doesn¹t
> >> >> seem to
> >> >> >> >>> have that.
> >> >> >> >>> Line 38 Map<String, Object> jsonObject = (Map<String, Object>)
> >> >> >> >>> envelope.getMessage();
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> Lmk so I can investigate further.
> >> >> >> >>>
> >> >> >> >>> Cheers!
> >> >> >> >>> Navina
> >> >> >> >>>
> >> >> >> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.matheson@gmail.com
> >
> >> >> wrote:
> >> >> >> >>>
> >> >> >> >>> >If anyone's interested, I've posted a diff of the project
> here:
> >> >> >> >>> >http://pastebin.com/6ZW6Y1Vu
> >> >> >> >>> >and the python publisher here: http://pastebin.com/2NvTFDFx
> >> >> >> >>> >
> >> >> >> >>> >if you want to take a stab at it.
> >> >> >> >>> >
> >> >> >> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson
> >> >> >> >>><ash.matheson@gmail.com>
> >> >> >> >>> >wrote:
> >> >> >> >>> >
> >> >> >> >>> >> Ok, so very simple test, all running on a local machine,
> not
> >> >> across
> >> >> >> >>> >> networks and all in the hello-samza repo this time around.
> >> >> >> >>> >>
> >> >> >> >>> >> I've got the datapusher.py file set up to push data into
> >> >> localhost.
> >> >> >> >>>One
> >> >> >> >>> >> event per second.
> >> >> >> >>> >> And a modified hello-samza where I've modified the
> >> >> >> >>> >> WikipediaParserStreamTask.java class to simply read what's
> >> >> there.
> >> >> >> >>> >>
> >> >> >> >>> >> Running them both now and I'm seeing in the stderr files
> >> >> >> >>> >>
> >> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr)
> >> >> >> >>>the
> >> >> >> >>> >> following:
> >> >> >> >>> >>
> >> >> >> >>> >> Exception in thread "main"
> >> >> >> >>> >> org.apache.samza.system.SystemConsumersException: Cannot
> >> >> >> >>>deserialize an
> >> >> >> >>> >> incoming message.
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
> >> >> >> >>>>>93)
> >> >> >> >>> >>     at org.apache.samza.system.SystemConsumers.org
> >> >> >> >>> >>
> >> >> >>
> >> >>
> >> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
> >> >> >> >>>>>mCo
> >> >> >> >>> >>nsumers.scala:276)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
> >> >> >> >>>>>mCo
> >> >> >> >>> >>nsumers.scala:276)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
> >> >> >> >>>>>sca
> >> >> >> >>> >>la:244)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
> >> >> >> >>>>>sca
> >> >> >> >>> >>la:244)
> >> >> >> >>> >>     at
> >> >> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> >> >> >>> >>     at
> >> >> >> >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >> >> >> >>> >>     at
> >> >> >> >>>
> >> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >>
> >> >>
> >> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s
> >> >> >> >>>>>cal
> >> >> >> >>> >>a:47)
> >> >> >> >>> >>     at scala.collection.SetLike$class.map(SetLike.scala:93)
> >> >> >> >>> >>     at scala.collection.AbstractSet.map(Set.scala:47)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:
> >> >> >> >>>>>276
> >> >> >> >>> >>)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2
> >> >> >> >>>>>13)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
> >> >> >> >>>>>(Ru
> >> >> >> >>> >>nLoop.scala:81)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
> >> >> >> >>>>>(Ru
> >> >> >> >>> >>nLoop.scala:81)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >>
> >> >>
> >>
> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> >> >> >> >>> >>     at
> >> >> >>
> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc
> >> >> >> >>>>>ala
> >> >> >> >>> >>:80)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >>
> >> >>
> >>
> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> >> >> >> >>> >>     at
> >> >> >>
> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> >> >> >> >>> >>     at
> >> >> org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
> >> >> >> >>> >>     at
> >> org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >>
> >> >>
> >>
> >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> >> >> >> >>>>>la:
> >> >> >> >>> >>108)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> >> >> >> >>> >>     at
> >> >> >> >>>
> >> >>
> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> >> >> >> >>> >> Caused by: org.codehaus.jackson.JsonParseException:
> >> Unexpected
> >> >> >> >>> character
> >> >> >> >>> >> ('M' (code 77)): expected a valid value (number, String,
> >> array,
> >> >> >> >>>object,
> >> >> >> >>> >> 'true', 'false' or 'null')
> >> >> >> >>> >>  at [Source: [B@5454d285; line: 1, column: 2]
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >>
> >> >>
> >> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse
> >> >> >> >>>>>rMi
> >> >> >> >>> >>nimalBase.java:385)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(
> >> >> >> >>>>>Jso
> >> >> >> >>> >>nParserMinimalBase.java:306)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8
> >> >> >> >>>>>Str
> >> >> >> >>> >>eamParser.java:1581)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S
> >> >> >> >>>>>tre
> >> >> >> >>> >>amParser.java:436)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.
> >> >> >> >>>>>jav
> >> >> >> >>> >>a:322)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav
> >> >> >> >>>>>a:2
> >> >> >> >>> >>432)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja
> >> >> >> >>>>>va:
> >> >> >> >>> >>2389)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >>
> >> >>
> >>
> >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
> >> >> >> >>> >>     at
> >> >> >> >>>
> >> >>
> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala
> >> >> >> >>>>>:11
> >> >> >> >>> >>5)
> >> >> >> >>> >>     at
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
> >> >> >> >>>>>90)
> >> >> >> >>> >>
> >> >> >> >>> >>
> >> >> >> >>> >> I changed the systems.kafka.samza.msg.serde=json to
> 'string'
> >> a
> >> >> >> while
> >> >> >> >>> >>back,
> >> >> >> >>> >> but that caused a separate exception.  However that was
> many,
> >> >> MANY
> >> >> >> >>> >>attempts
> >> >> >> >>> >> ago.
> >> >> >> >>> >>
> >> >> >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <
> >> >> >> >>> ash.matheson@gmail.com>
> >> >> >> >>> >> wrote:
> >> >> >> >>> >>
> >> >> >> >>> >>> Ahh, I was going to add it to the run-class.sh script.
> >> >> >> >>> >>>
> >> >> >> >>> >>> Yeah, it's already there by default:
> >> >> >> >>> >>>
> >> >> >> >>> >>>
> >> >> >> >>> >>> # Metrics
> >> >> >> >>> >>> metrics.reporters=snapshot,jmx
> >> >> >> >>> >>>
> >> >> >> >>> >>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met
> >> >> >> >>>>>>ric
> >> >> >> >>> >>>sSnapshotReporterFactory
> >> >> >> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics
> >> >> >> >>> >>>
> >> >> >> >>> >>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >>
> >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor
> >> >> >> >>>>>>ter
> >> >> >> >>> >>>Factory
> >> >> >> >>> >>>
> >> >> >> >>> >>> So, where would I see those metrics?
> >> >> >> >>> >>>
> >> >> >> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson
> >> >> >> >>> >>><ash.matheson@gmail.com>
> >> >> >> >>> >>> wrote:
> >> >> >> >>> >>>
> >> >> >> >>> >>>> read: I'm a C++ programmer looking at Java for the first
> >> time
> >> >> in
> >> >> >> >>>> 10
> >> >> >> >>> >>>> years
> >> >> >> >>> >>>>
> >> >> >> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson
> >> >> >> >>> >>>><ash.matheson@gmail.com>
> >> >> >> >>> >>>> wrote:
> >> >> >> >>> >>>>
> >> >> >> >>> >>>>> I'm assuming I have Jmx defined ... where would that get
> >> set?
> >> >> >> >>> >>>>>
> >> >> >> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman <
> >> >> >> >>> >>>>> chinmay.cerebro@gmail.com> wrote:
> >> >> >> >>> >>>>>
> >> >> >> >>> >>>>>> Hey Ash,
> >> >> >> >>> >>>>>>
> >> >> >> >>> >>>>>> Can you see your job metrics (if you have the Jmx
> metrics
> >> >> >> >>>defined)
> >> >> >> >>> >>>>>>to
> >> >> >> >>> >>>>>> see
> >> >> >> >>> >>>>>> if your job is actually doing anything ? My only guess
> at
> >> >> this
> >> >> >> >>> point
> >> >> >> >>> >>>>>> is the
> >> >> >> >>> >>>>>> process method is not being called because somehow
> >> there's
> >> >> no
> >> >> >> >>> >>>>>>incoming
> >> >> >> >>> >>>>>> data. I could be totally wrong of course.
> >> >> >> >>> >>>>>>
> >> >> >> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <
> >> >> >> >>> >>>>>> ash.matheson@gmail.com>
> >> >> >> >>> >>>>>> wrote:
> >> >> >> >>> >>>>>>
> >> >> >> >>> >>>>>> > Just to be clear, here's what's changed from the
> >> default
> >> >> >> >>> >>>>>>hello-samza
> >> >> >> >>> >>>>>> repo:
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> > wikipedia-parser.properties==========================
> >> >> >> >>> >>>>>> > task.inputs=kafka.myTopic
> >> >> >> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect=
> >> >> >> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
> >> >> >> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> > WikipediaParserStreamTask.java =====================
> >> >> >> >>> >>>>>> >   public void process(IncomingMessageEnvelope
> envelope,
> >> >> >> >>> >>>>>> MessageCollector
> >> >> >> >>> >>>>>> > collector, TaskCoordinator coordinator) {
> >> >> >> >>> >>>>>> >     Map<String, Object> jsonObject = (Map<String,
> >> Object>)
> >> >> >> >>> >>>>>> > envelope.getMessage();
> >> >> >> >>> >>>>>> >     WikipediaFeedEvent event = new
> >> >> >> >>> WikipediaFeedEvent(jsonObject);
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> >     try {
> >> >> >> >>> >>>>>> >       System.out.println(event.getRawEvent());
> >> >> >> >>> >>>>>> >       // Map<String, Object> parsedJsonObject =
> >> >> >> >>> >>>>>> parse(event.getRawEvent());
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> >       // parsedJsonObject.put("channel",
> >> >> event.getChannel());
> >> >> >> >>> >>>>>> >       // parsedJsonObject.put("source",
> >> >> event.getSource());
> >> >> >> >>> >>>>>> >       // parsedJsonObject.put("time",
> event.getTime());
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> >       // collector.send(new
> OutgoingMessageEnvelope(new
> >> >> >> >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"),
> >> >> parsedJsonObject));
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> > as well as the aforementioned changes to the
> log4j.xml
> >> >> file.
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing
> >> more
> >> >> than
> >> >> >> >>>a
> >> >> >> >>> >>>>>> sentence.
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <
> >> >> >> >>> >>>>>> ash.matheson@gmail.com>
> >> >> >> >>> >>>>>> > wrote:
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>> > > yep, modified log4j.xml to look like this:
> >> >> >> >>> >>>>>> > >
> >> >> >> >>> >>>>>> > >   <root>
> >> >> >> >>> >>>>>> > >     <priority value="debug" />
> >> >> >> >>> >>>>>> > >     <appender-ref ref="RollingAppender"/>
> >> >> >> >>> >>>>>> > >     <appender-ref ref="jmx" />
> >> >> >> >>> >>>>>> > >   </root>
> >> >> >> >>> >>>>>> > >
> >> >> >> >>> >>>>>> > > Not sure what you mean by #2.
> >> >> >> >>> >>>>>> > >
> >> >> >> >>> >>>>>> > > However, I'm running now, not seeing any
> exceptions,
> >> but
> >> >> >> >>>still
> >> >> >> >>> >>>>>>not
> >> >> >> >>> >>>>>> seeing
> >> >> >> >>> >>>>>> > > any output from System.out.println(...)
> >> >> >> >>> >>>>>> > >
> >> >> >> >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen
> >> Somasundaram <
> >> >> >> >>> >>>>>> > > nsomasundaram@linkedin.com.invalid> wrote:
> >> >> >> >>> >>>>>> > >
> >> >> >> >>> >>>>>> > >> Hey Ash,
> >> >> >> >>> >>>>>> > >>                1. Did you happen to modify your
> >> >> log4j.xml
> >> >> >> ?
> >> >> >> >>> >>>>>> > >>                2. Can you print the class path
> that
> >> was
> >> >> >> >>> printed
> >> >> >> >>> >>>>>> when the
> >> >> >> >>> >>>>>> > >> job started ? I am wondering if log4j was not
> >> loaded or
> >> >> >> not
> >> >> >> >>> >>>>>> present in
> >> >> >> >>> >>>>>> > the
> >> >> >> >>> >>>>>> > >> path where it¹s looking for. If you have been
> using
> >> >> hello
> >> >> >> >>> >>>>>>samza,
> >> >> >> >>> >>>>>> it
> >> >> >> >>> >>>>>> > should
> >> >> >> >>> >>>>>> > >> have pulled it from Maven.
> >> >> >> >>> >>>>>> > >>
> >> >> >> >>> >>>>>> > >> Thanks,
> >> >> >> >>> >>>>>> > >> Naveen
> >> >> >> >>> >>>>>> > >>
> >> >> >> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <
> >> >> >> >>> >>>>>> ash.matheson@gmail.com>
> >> >> >> >>> >>>>>> > >> wrote:
> >> >> >> >>> >>>>>> > >>
> >> >> >> >>> >>>>>> > >> > Hey all,
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> > Evaluating Samza currently and am running into
> >> some
> >> >> odd
> >> >> >> >>> >>>>>>issues.
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> > I'm currently working off the 'hello-samza' repo
> >> and
> >> >> >> >>>trying
> >> >> >> >>> >>>>>>to
> >> >> >> >>> >>>>>> parse a
> >> >> >> >>> >>>>>> > >> > simple kafka topic that I've produced through an
> >> >> extenal
> >> >> >> >>> java
> >> >> >> >>> >>>>>> app
> >> >> >> >>> >>>>>> > >> (nothing
> >> >> >> >>> >>>>>> > >> > other than a series of sentences) and it's
> failing
> >> >> >> pretty
> >> >> >> >>> >>>>>>hard
> >> >> >> >>> >>>>>> for me.
> >> >> >> >>> >>>>>> > >> The
> >> >> >> >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but
> as
> >> >> soon
> >> >> >> >>>as I
> >> >> >> >>> >>>>>> change the
> >> >> >> >>> >>>>>> > >> > configuration to look at a different
> >> Kafka/zookeeper
> >> >> I
> >> >> >> >>>get
> >> >> >> >>> >>>>>>the
> >> >> >> >>> >>>>>> > >> following in
> >> >> >> >>> >>>>>> > >> > the userlogs:
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN]
> >> Unable to
> >> >> >> >>>fetch
> >> >> >> >>> >>>>>>last
> >> >> >> >>> >>>>>> > offsets
> >> >> >> >>> >>>>>> > >> > for streams [myTopic] due to
> >> >> >> kafka.common.KafkaException:
> >> >> >> >>> >>>>>> fetching
> >> >> >> >>> >>>>>> > topic
> >> >> >> >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker
> >> >> >> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)]
> >> failed.
> >> >> >> >>> Retrying.
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> > The modifications are pretty straightforward.
> In
> >> the
> >> >> >> >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the
> >> >> following:
> >> >> >> >>> >>>>>> > >> > task.inputs=kafka.myTopic
> >> >> >> >>> >>>>>> > >> >
> >> >> systems.kafka.consumer.zookeeper.connect=redacted:2181/
> >> >> >> >>> >>>>>> > >> >
> systems.kafka.consumer.auto.offset.reset=smallest
> >> >> >> >>> >>>>>> > >> >
> >> >> >> systems.kafka.producer.metadata.broker.list=redacted:9092
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> > and in the actual java file
> >> >> >> >>>WikipediaParserStreamTask.java
> >> >> >> >>> >>>>>> > >> >  public void process(IncomingMessageEnvelope
> >> >> envelope,
> >> >> >> >>> >>>>>> > MessageCollector
> >> >> >> >>> >>>>>> > >> > collector, TaskCoordinator coordinator) {
> >> >> >> >>> >>>>>> > >> >    Map<String, Object> jsonObject = (Map<String,
> >> >> >> Object>)
> >> >> >> >>> >>>>>> > >> > envelope.getMessage();
> >> >> >> >>> >>>>>> > >> >    WikipediaFeedEvent event = new
> >> >> >> >>> >>>>>> WikipediaFeedEvent(jsonObject);
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> >    try {
> >> >> >> >>> >>>>>> > >> >        System.out.println(event.getRawEvent());
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> > And then following the compile/extract/run
> process
> >> >> >> >>>outlined
> >> >> >> >>> >>>>>>in
> >> >> >> >>> >>>>>> the
> >> >> >> >>> >>>>>> > >> > hello-samza website.
> >> >> >> >>> >>>>>> > >> >
> >> >> >> >>> >>>>>> > >> > Any thoughts?  I've looked online for any 'super
> >> >> simple'
> >> >> >> >>> >>>>>> examples of
> >> >> >> >>> >>>>>> > >> > ingesting kafka in samza with very little
> success.
> >> >> >> >>> >>>>>> > >>
> >> >> >> >>> >>>>>> > >>
> >> >> >> >>> >>>>>> > >
> >> >> >> >>> >>>>>> >
> >> >> >> >>> >>>>>>
> >> >> >> >>> >>>>>>
> >> >> >> >>> >>>>>>
> >> >> >> >>> >>>>>> --
> >> >> >> >>> >>>>>> Thanks and regards
> >> >> >> >>> >>>>>>
> >> >> >> >>> >>>>>> Chinmay Soman
> >> >> >> >>> >>>>>>
> >> >> >> >>> >>>>>
> >> >> >> >>> >>>>>
> >> >> >> >>> >>>>
> >> >> >> >>> >>>
> >> >> >> >>> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >>
> >> >> >>
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > Thanks and regards
> >> >
> >> > Chinmay Soman
> >> >
> >>
> >>
> >>
> >> --
> >> Thanks and regards
> >>
> >> Chinmay Soman
> >>
> >
> >
>



-- 
Thanks and regards

Chinmay Soman

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message