samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ash W Matheson <ash.mathe...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Tue, 24 Mar 2015 19:42:25 GMT
How do y'all feel about mixing languages?  My sentence producer (as you've
seen) is written in Python.  It wouldn't take long to convert it, but it
seems like a lot of extra effort for little gain.

If it's not a big deal, where in the hello-samza project should it live?

On Tue, Mar 24, 2015 at 10:45 AM, Chinmay Soman <chinmay.cerebro@gmail.com>
wrote:

> Hey Ash,
>
> Yeah I think more examples would be great ! Feel free to open a RB.
>
> About the config: It might be a bit of an overhead in general and requires
> some getting used to. I realize that after I've started writing Samza jobs
> myself. I'm kinda new to Samza myself, but I always kept making some silly
> mistakes which is difficult to validate early on. Its one of those things
> which is very difficult to get right (cause everything has its pros and
> cons). Hopefully I can finish up my patch regarding exactly that problem -
> config validation.
>
> On Mon, Mar 23, 2015 at 10:04 PM, Ash W Matheson <ash.matheson@gmail.com>
> wrote:
>
> > just to clarify, adding:
> >
> > serializers.registry.string.class=org.apache.samza.
> > serializers.StringSerdeFactory
> > systems.kafka.streams.myTopic.samza.msg.serde=string
> >
> > to the property file and updating the java source to:
> > System.out.println((String)envelope.getMessage());
> >
> > Did the trick. I've updated the pastebin with the appropriate change.
> >
> > Now, through this whole process I've assumed that StreamTask is the
> > appropriate class to derive from.  Essentially, the end goal is to
> decode a
> > compressed bytestream Kafka event (from a different topic, of course) and
> > then feed it to a DB/TSDB/whatever.  I didn't think that I'd need to
> > generate a Job for this and that this stream should be able to feed
> > directly to the output entity.
> >
> > Anyway, over the next couple of days I'll migrate this into something
> that
> > can live in the hello-samza ecosystem as a separate task. I've got the
> > start of a java based DataPusher built as well.
> >
> > On Mon, Mar 23, 2015 at 9:40 PM, Ash W Matheson <ash.matheson@gmail.com>
> > wrote:
> >
> > > Huzzah!  I ... have ... text showing!
> > >
> > > This has been enough of a trial that I think I'll convert this into a
> > very
> > > simple sample project for the repo, if you guys are interested.
> > >
> > > Diff coming once I have it cleaned up into something less ugly.
> > >
> > > -Ash
> > >
> > > On Mon, Mar 23, 2015 at 9:27 PM, Chinmay Soman <
> > chinmay.cerebro@gmail.com>
> > > wrote:
> > >
> > >> >I changed the systems.kafka.samza.msg.serde=json to 'string' a while
> > >> back,
> > >> but that caused a separate exception.  However that was many, MANY
> > >> attempts
> > >> ago.
> > >>
> > >> This may not work because that will set all serialization formats
> (input
> > >> and output) to json / string. In your case you're inputting string and
> > >> outputting json. So you might have to set that explicitly.
> > >>
> > >> On Mon, Mar 23, 2015 at 9:24 PM, Chinmay Soman <
> > chinmay.cerebro@gmail.com
> > >> >
> > >> wrote:
> > >>
> > >> > Since you're producing String data to 'myTopic', can you try setting
> > the
> > >> > string serialization in your config ?
> > >> >
> > >> >
> > >>
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> > >> >
> > >> > systems.kafka.streams.myTopic.samza.msg.serde=string
> > >> >
> > >> >
> > >> > On Mon, Mar 23, 2015 at 9:17 PM, Ash W Matheson <
> > ash.matheson@gmail.com
> > >> >
> > >> > wrote:
> > >> >
> > >> >> more info - new exception message:
> > >> >>
> > >> >> Exception in thread "main"
> > >> >> org.apache.samza.system.SystemConsumersException: Cannot
> deserialize
> > an
> > >> >> incoming message.
> > >> >>
> > >> >> Updated the diff in pastebin with the changes.
> > >> >>
> > >> >> On Mon, Mar 23, 2015 at 8:41 PM, Ash W Matheson <
> > >> ash.matheson@gmail.com>
> > >> >> wrote:
> > >> >>
> > >> >> > Gah!  Yeah, those were gone several revisions ago but didn't get
> > >> nuked
> > >> >> in
> > >> >> > the last iteration.
> > >> >> >
> > >> >> > OK, let me do a quick test to see if that was my problem all
> along.
> > >> >> >
> > >> >> > On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh <
> > >> >> > nramesh@linkedin.com.invalid> wrote:
> > >> >> >
> > >> >> >> Hey Ash,
> > >> >> >> I was referring to the lines before the try block.
> > >> >> >>
> > >> >> >> Map<String, Object> jsonObject = (Map<String, Object>)
> > >> >> >> envelope.getMessage();
> > >> >> >>     WikipediaFeedEvent event = new
> WikipediaFeedEvent(jsonObject);
> > >> >> >>
> > >> >> >>     try {
> > >> >> >>       System.out.println("[DWH] should see this");
> > >> >> >>       System.out.println(event.getRawEvent());
> > >> >> >> …
> > >> >> >>
> > >> >> >>
> > >> >> >> Did you remove those lines as well?
> > >> >> >>
> > >> >> >> Navina
> > >> >> >>
> > >> >> >> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.matheson@gmail.com>
> > >> wrote:
> > >> >> >>
> > >> >> >> >Just looking at the diff I posted and it's:
> > >> >> >> >
> > >> >> >> >
> > >> >> >> >   1.      try {
> > >> >> >> >   2. -      Map<String, Object> parsedJsonObject =
> > >> >> >> >parse(event.getRawEvent(
> > >> >> >> >   ));
> > >> >> >> >   3. +      System.out.println("[DWH] should see this");
> > >> >> >> >   4. +      System.out.println(event.getRawEvent());
> > >> >> >> >   5. +      // Map<String, Object> parsedJsonObject = parse(
> > >> >> >> >   event.getRawEvent());
> > >> >> >> >
> > >> >> >> >
> > >> >> >> >I've removed the Map and added two System.out.println calls.
> So
> > >> no,
> > >> >> >> there
> > >> >> >> >shouldn't be any reference to
> > >> >> >> >Map<String, Object> parsedJsonObject =
> > parse(event.getRawEvent());
> > >> >> >> >in the source java file.
> > >> >> >> >
> > >> >> >> >
> > >> >> >> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <
> > >> >> ash.matheson@gmail.com>
> > >> >> >> >wrote:
> > >> >> >> >
> > >> >> >> >> I'm in transit right now but if memory serves me everything
> > >> should
> > >> >> be
> > >> >> >> >> commented out of that method except for the
> System.out.println
> > >> call.
> > >> >> >> >>I'll
> > >> >> >> >> be home shortly and can confirm.
> > >> >> >> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh"
> > >> >> <nramesh@linkedin.com.invalid
> > >> >> >> >
> > >> >> >> >> wrote:
> > >> >> >> >>
> > >> >> >> >>> Hi Ash,
> > >> >> >> >>> I just ran wikipedia-parser with your patch. Looks like you
> > have
> > >> >> set
> > >> >> >> >>>the
> > >> >> >> >>> message serde correctly in the configs. However, the
> original
> > >> code
> > >> >> >> >>>still
> > >> >> >> >>> converts it into a Map for consumption in the
> > >> WikipediaFeedEvent.
> > >> >> >> >>> I am seeing the following (expected):
> > >> >> >> >>>
> > >> >> >> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler [ERROR]
> > >> Uncaught
> > >> >> >> >>> exception in thread (name=main). Exiting process now.
> > >> >> >> >>> java.lang.ClassCastException: java.lang.String cannot be
> cast
> > to
> > >> >> >> >>> java.util.Map
> > >> >> >> >>>  at
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi
> > >> >> >> >>>aPa
> > >> >> >> >>> rserStreamTask.java:38)
> > >> >> >> >>>  at
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(
> > >> >> >> >>>Tas
> > >> >> >> >>> kInstance.scala:133)
> > >> >> >> >>>
> > >> >> >> >>> Did you make the changes to fix this error? Your patch
> doesn¹t
> > >> >> seem to
> > >> >> >> >>> have that.
> > >> >> >> >>> Line 38 Map<String, Object> jsonObject = (Map<String,
> Object>)
> > >> >> >> >>> envelope.getMessage();
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>> Lmk so I can investigate further.
> > >> >> >> >>>
> > >> >> >> >>> Cheers!
> > >> >> >> >>> Navina
> > >> >> >> >>>
> > >> >> >> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <
> ash.matheson@gmail.com
> > >
> > >> >> wrote:
> > >> >> >> >>>
> > >> >> >> >>> >If anyone's interested, I've posted a diff of the project
> > here:
> > >> >> >> >>> >http://pastebin.com/6ZW6Y1Vu
> > >> >> >> >>> >and the python publisher here:
> http://pastebin.com/2NvTFDFx
> > >> >> >> >>> >
> > >> >> >> >>> >if you want to take a stab at it.
> > >> >> >> >>> >
> > >> >> >> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson
> > >> >> >> >>><ash.matheson@gmail.com>
> > >> >> >> >>> >wrote:
> > >> >> >> >>> >
> > >> >> >> >>> >> Ok, so very simple test, all running on a local machine,
> > not
> > >> >> across
> > >> >> >> >>> >> networks and all in the hello-samza repo this time
> around.
> > >> >> >> >>> >>
> > >> >> >> >>> >> I've got the datapusher.py file set up to push data into
> > >> >> localhost.
> > >> >> >> >>>One
> > >> >> >> >>> >> event per second.
> > >> >> >> >>> >> And a modified hello-samza where I've modified the
> > >> >> >> >>> >> WikipediaParserStreamTask.java class to simply read
> what's
> > >> >> there.
> > >> >> >> >>> >>
> > >> >> >> >>> >> Running them both now and I'm seeing in the stderr files
> > >> >> >> >>> >>
> > >> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr)
> > >> >> >> >>>the
> > >> >> >> >>> >> following:
> > >> >> >> >>> >>
> > >> >> >> >>> >> Exception in thread "main"
> > >> >> >> >>> >> org.apache.samza.system.SystemConsumersException: Cannot
> > >> >> >> >>>deserialize an
> > >> >> >> >>> >> incoming message.
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
> > >> >> >> >>>>>93)
> > >> >> >> >>> >>     at org.apache.samza.system.SystemConsumers.org
> > >> >> >> >>> >>
> > >> >> >>
> > >> >>
> > >>
> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
> > >> >> >> >>>>>mCo
> > >> >> >> >>> >>nsumers.scala:276)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
> > >> >> >> >>>>>mCo
> > >> >> >> >>> >>nsumers.scala:276)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
> > >> >> >> >>>>>sca
> > >> >> >> >>> >>la:244)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
> > >> >> >> >>>>>sca
> > >> >> >> >>> >>la:244)
> > >> >> >> >>> >>     at
> > >> >> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >> >> >> >>> >>     at
> > >> >> >>
> >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > >> >> >> >>> >>     at
> > >> >> >> >>>
> > >> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >>
> > >> >>
> > >>
> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s
> > >> >> >> >>>>>cal
> > >> >> >> >>> >>a:47)
> > >> >> >> >>> >>     at
> scala.collection.SetLike$class.map(SetLike.scala:93)
> > >> >> >> >>> >>     at scala.collection.AbstractSet.map(Set.scala:47)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:
> > >> >> >> >>>>>276
> > >> >> >> >>> >>)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2
> > >> >> >> >>>>>13)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
> > >> >> >> >>>>>(Ru
> > >> >> >> >>> >>nLoop.scala:81)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
> > >> >> >> >>>>>(Ru
> > >> >> >> >>> >>nLoop.scala:81)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> > >> >> >> >>> >>     at
> > >> >> >>
> > >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc
> > >> >> >> >>>>>ala
> > >> >> >> >>> >>:80)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> > >> >> >> >>> >>     at
> > >> >> >>
> > >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> > >> >> >> >>> >>     at
> > >> >> org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
> > >> >> >> >>> >>     at
> > >> org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
> > >> >> >> >>>>>la:
> > >> >> >> >>> >>108)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
> > >> >> >> >>> >>     at
> > >> >> >> >>>
> > >> >>
> > >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > >> >> >> >>> >> Caused by: org.codehaus.jackson.JsonParseException:
> > >> Unexpected
> > >> >> >> >>> character
> > >> >> >> >>> >> ('M' (code 77)): expected a valid value (number, String,
> > >> array,
> > >> >> >> >>>object,
> > >> >> >> >>> >> 'true', 'false' or 'null')
> > >> >> >> >>> >>  at [Source: [B@5454d285; line: 1, column: 2]
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >>
> > >> >>
> > >>
> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse
> > >> >> >> >>>>>rMi
> > >> >> >> >>> >>nimalBase.java:385)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(
> > >> >> >> >>>>>Jso
> > >> >> >> >>> >>nParserMinimalBase.java:306)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8
> > >> >> >> >>>>>Str
> > >> >> >> >>> >>eamParser.java:1581)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S
> > >> >> >> >>>>>tre
> > >> >> >> >>> >>amParser.java:436)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.
> > >> >> >> >>>>>jav
> > >> >> >> >>> >>a:322)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav
> > >> >> >> >>>>>a:2
> > >> >> >> >>> >>432)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja
> > >> >> >> >>>>>va:
> > >> >> >> >>> >>2389)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
> > >> >> >> >>> >>     at
> > >> >> >> >>>
> > >> >>
> > >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala
> > >> >> >> >>>>>:11
> > >> >> >> >>> >>5)
> > >> >> >> >>> >>     at
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
> > >> >> >> >>>>>90)
> > >> >> >> >>> >>
> > >> >> >> >>> >>
> > >> >> >> >>> >> I changed the systems.kafka.samza.msg.serde=json to
> > 'string'
> > >> a
> > >> >> >> while
> > >> >> >> >>> >>back,
> > >> >> >> >>> >> but that caused a separate exception.  However that was
> > many,
> > >> >> MANY
> > >> >> >> >>> >>attempts
> > >> >> >> >>> >> ago.
> > >> >> >> >>> >>
> > >> >> >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <
> > >> >> >> >>> ash.matheson@gmail.com>
> > >> >> >> >>> >> wrote:
> > >> >> >> >>> >>
> > >> >> >> >>> >>> Ahh, I was going to add it to the run-class.sh script.
> > >> >> >> >>> >>>
> > >> >> >> >>> >>> Yeah, it's already there by default:
> > >> >> >> >>> >>>
> > >> >> >> >>> >>>
> > >> >> >> >>> >>> # Metrics
> > >> >> >> >>> >>> metrics.reporters=snapshot,jmx
> > >> >> >> >>> >>>
> > >> >> >> >>> >>>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met
> > >> >> >> >>>>>>ric
> > >> >> >> >>> >>>sSnapshotReporterFactory
> > >> >> >> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics
> > >> >> >> >>> >>>
> > >> >> >> >>> >>>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
> >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor
> > >> >> >> >>>>>>ter
> > >> >> >> >>> >>>Factory
> > >> >> >> >>> >>>
> > >> >> >> >>> >>> So, where would I see those metrics?
> > >> >> >> >>> >>>
> > >> >> >> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson
> > >> >> >> >>> >>><ash.matheson@gmail.com>
> > >> >> >> >>> >>> wrote:
> > >> >> >> >>> >>>
> > >> >> >> >>> >>>> read: I'm a C++ programmer looking at Java for the
> first
> > >> time
> > >> >> in
> > >> >> >> >>>> 10
> > >> >> >> >>> >>>> years
> > >> >> >> >>> >>>>
> > >> >> >> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson
> > >> >> >> >>> >>>><ash.matheson@gmail.com>
> > >> >> >> >>> >>>> wrote:
> > >> >> >> >>> >>>>
> > >> >> >> >>> >>>>> I'm assuming I have Jmx defined ... where would that
> get
> > >> set?
> > >> >> >> >>> >>>>>
> > >> >> >> >>> >>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman <
> > >> >> >> >>> >>>>> chinmay.cerebro@gmail.com> wrote:
> > >> >> >> >>> >>>>>
> > >> >> >> >>> >>>>>> Hey Ash,
> > >> >> >> >>> >>>>>>
> > >> >> >> >>> >>>>>> Can you see your job metrics (if you have the Jmx
> > metrics
> > >> >> >> >>>defined)
> > >> >> >> >>> >>>>>>to
> > >> >> >> >>> >>>>>> see
> > >> >> >> >>> >>>>>> if your job is actually doing anything ? My only
> guess
> > at
> > >> >> this
> > >> >> >> >>> point
> > >> >> >> >>> >>>>>> is the
> > >> >> >> >>> >>>>>> process method is not being called because somehow
> > >> there's
> > >> >> no
> > >> >> >> >>> >>>>>>incoming
> > >> >> >> >>> >>>>>> data. I could be totally wrong of course.
> > >> >> >> >>> >>>>>>
> > >> >> >> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <
> > >> >> >> >>> >>>>>> ash.matheson@gmail.com>
> > >> >> >> >>> >>>>>> wrote:
> > >> >> >> >>> >>>>>>
> > >> >> >> >>> >>>>>> > Just to be clear, here's what's changed from the
> > >> default
> > >> >> >> >>> >>>>>>hello-samza
> > >> >> >> >>> >>>>>> repo:
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> >
> wikipedia-parser.properties==========================
> > >> >> >> >>> >>>>>> > task.inputs=kafka.myTopic
> > >> >> >> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect=
> > >> >> >> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
> > >> >> >> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> > WikipediaParserStreamTask.java
> =====================
> > >> >> >> >>> >>>>>> >   public void process(IncomingMessageEnvelope
> > envelope,
> > >> >> >> >>> >>>>>> MessageCollector
> > >> >> >> >>> >>>>>> > collector, TaskCoordinator coordinator) {
> > >> >> >> >>> >>>>>> >     Map<String, Object> jsonObject = (Map<String,
> > >> Object>)
> > >> >> >> >>> >>>>>> > envelope.getMessage();
> > >> >> >> >>> >>>>>> >     WikipediaFeedEvent event = new
> > >> >> >> >>> WikipediaFeedEvent(jsonObject);
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> >     try {
> > >> >> >> >>> >>>>>> >       System.out.println(event.getRawEvent());
> > >> >> >> >>> >>>>>> >       // Map<String, Object> parsedJsonObject =
> > >> >> >> >>> >>>>>> parse(event.getRawEvent());
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> >       // parsedJsonObject.put("channel",
> > >> >> event.getChannel());
> > >> >> >> >>> >>>>>> >       // parsedJsonObject.put("source",
> > >> >> event.getSource());
> > >> >> >> >>> >>>>>> >       // parsedJsonObject.put("time",
> > event.getTime());
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> >       // collector.send(new
> > OutgoingMessageEnvelope(new
> > >> >> >> >>> >>>>>> > SystemStream("kafka", "wikipedia-edits"),
> > >> >> parsedJsonObject));
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> > as well as the aforementioned changes to the
> > log4j.xml
> > >> >> file.
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> > The data pushed into the 'myTopic' topic is nothing
> > >> more
> > >> >> than
> > >> >> >> >>>a
> > >> >> >> >>> >>>>>> sentence.
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <
> > >> >> >> >>> >>>>>> ash.matheson@gmail.com>
> > >> >> >> >>> >>>>>> > wrote:
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>> > > yep, modified log4j.xml to look like this:
> > >> >> >> >>> >>>>>> > >
> > >> >> >> >>> >>>>>> > >   <root>
> > >> >> >> >>> >>>>>> > >     <priority value="debug" />
> > >> >> >> >>> >>>>>> > >     <appender-ref ref="RollingAppender"/>
> > >> >> >> >>> >>>>>> > >     <appender-ref ref="jmx" />
> > >> >> >> >>> >>>>>> > >   </root>
> > >> >> >> >>> >>>>>> > >
> > >> >> >> >>> >>>>>> > > Not sure what you mean by #2.
> > >> >> >> >>> >>>>>> > >
> > >> >> >> >>> >>>>>> > > However, I'm running now, not seeing any
> > exceptions,
> > >> but
> > >> >> >> >>>still
> > >> >> >> >>> >>>>>>not
> > >> >> >> >>> >>>>>> seeing
> > >> >> >> >>> >>>>>> > > any output from System.out.println(...)
> > >> >> >> >>> >>>>>> > >
> > >> >> >> >>> >>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen
> > >> Somasundaram <
> > >> >> >> >>> >>>>>> > > nsomasundaram@linkedin.com.invalid> wrote:
> > >> >> >> >>> >>>>>> > >
> > >> >> >> >>> >>>>>> > >> Hey Ash,
> > >> >> >> >>> >>>>>> > >>                1. Did you happen to modify your
> > >> >> log4j.xml
> > >> >> >> ?
> > >> >> >> >>> >>>>>> > >>                2. Can you print the class path
> > that
> > >> was
> > >> >> >> >>> printed
> > >> >> >> >>> >>>>>> when the
> > >> >> >> >>> >>>>>> > >> job started ? I am wondering if log4j was not
> > >> loaded or
> > >> >> >> not
> > >> >> >> >>> >>>>>> present in
> > >> >> >> >>> >>>>>> > the
> > >> >> >> >>> >>>>>> > >> path where it¹s looking for. If you have been
> > using
> > >> >> hello
> > >> >> >> >>> >>>>>>samza,
> > >> >> >> >>> >>>>>> it
> > >> >> >> >>> >>>>>> > should
> > >> >> >> >>> >>>>>> > >> have pulled it from Maven.
> > >> >> >> >>> >>>>>> > >>
> > >> >> >> >>> >>>>>> > >> Thanks,
> > >> >> >> >>> >>>>>> > >> Naveen
> > >> >> >> >>> >>>>>> > >>
> > >> >> >> >>> >>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <
> > >> >> >> >>> >>>>>> ash.matheson@gmail.com>
> > >> >> >> >>> >>>>>> > >> wrote:
> > >> >> >> >>> >>>>>> > >>
> > >> >> >> >>> >>>>>> > >> > Hey all,
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> > Evaluating Samza currently and am running into
> > >> some
> > >> >> odd
> > >> >> >> >>> >>>>>>issues.
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> > I'm currently working off the 'hello-samza'
> repo
> > >> and
> > >> >> >> >>>trying
> > >> >> >> >>> >>>>>>to
> > >> >> >> >>> >>>>>> parse a
> > >> >> >> >>> >>>>>> > >> > simple kafka topic that I've produced through
> an
> > >> >> extenal
> > >> >> >> >>> java
> > >> >> >> >>> >>>>>> app
> > >> >> >> >>> >>>>>> > >> (nothing
> > >> >> >> >>> >>>>>> > >> > other than a series of sentences) and it's
> > failing
> > >> >> >> pretty
> > >> >> >> >>> >>>>>>hard
> > >> >> >> >>> >>>>>> for me.
> > >> >> >> >>> >>>>>> > >> The
> > >> >> >> >>> >>>>>> > >> > base 'hello-samza' set of apps works fine, but
> > as
> > >> >> soon
> > >> >> >> >>>as I
> > >> >> >> >>> >>>>>> change the
> > >> >> >> >>> >>>>>> > >> > configuration to look at a different
> > >> Kafka/zookeeper
> > >> >> I
> > >> >> >> >>>get
> > >> >> >> >>> >>>>>>the
> > >> >> >> >>> >>>>>> > >> following in
> > >> >> >> >>> >>>>>> > >> > the userlogs:
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN]
> > >> Unable to
> > >> >> >> >>>fetch
> > >> >> >> >>> >>>>>>last
> > >> >> >> >>> >>>>>> > offsets
> > >> >> >> >>> >>>>>> > >> > for streams [myTopic] due to
> > >> >> >> kafka.common.KafkaException:
> > >> >> >> >>> >>>>>> fetching
> > >> >> >> >>> >>>>>> > topic
> > >> >> >> >>> >>>>>> > >> > metadata for topics [Set(myTopic)] from broker
> > >> >> >> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)]
> > >> failed.
> > >> >> >> >>> Retrying.
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> > The modifications are pretty straightforward.
> > In
> > >> the
> > >> >> >> >>> >>>>>> > >> > Wikipedia-parser.properties, I've changed the
> > >> >> following:
> > >> >> >> >>> >>>>>> > >> > task.inputs=kafka.myTopic
> > >> >> >> >>> >>>>>> > >> >
> > >> >> systems.kafka.consumer.zookeeper.connect=redacted:2181/
> > >> >> >> >>> >>>>>> > >> >
> > systems.kafka.consumer.auto.offset.reset=smallest
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> systems.kafka.producer.metadata.broker.list=redacted:9092
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> > and in the actual java file
> > >> >> >> >>>WikipediaParserStreamTask.java
> > >> >> >> >>> >>>>>> > >> >  public void process(IncomingMessageEnvelope
> > >> >> envelope,
> > >> >> >> >>> >>>>>> > MessageCollector
> > >> >> >> >>> >>>>>> > >> > collector, TaskCoordinator coordinator) {
> > >> >> >> >>> >>>>>> > >> >    Map<String, Object> jsonObject =
> (Map<String,
> > >> >> >> Object>)
> > >> >> >> >>> >>>>>> > >> > envelope.getMessage();
> > >> >> >> >>> >>>>>> > >> >    WikipediaFeedEvent event = new
> > >> >> >> >>> >>>>>> WikipediaFeedEvent(jsonObject);
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> >    try {
> > >> >> >> >>> >>>>>> > >> >
> System.out.println(event.getRawEvent());
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> > And then following the compile/extract/run
> > process
> > >> >> >> >>>outlined
> > >> >> >> >>> >>>>>>in
> > >> >> >> >>> >>>>>> the
> > >> >> >> >>> >>>>>> > >> > hello-samza website.
> > >> >> >> >>> >>>>>> > >> >
> > >> >> >> >>> >>>>>> > >> > Any thoughts?  I've looked online for any
> 'super
> > >> >> simple'
> > >> >> >> >>> >>>>>> examples of
> > >> >> >> >>> >>>>>> > >> > ingesting kafka in samza with very little
> > success.
> > >> >> >> >>> >>>>>> > >>
> > >> >> >> >>> >>>>>> > >>
> > >> >> >> >>> >>>>>> > >
> > >> >> >> >>> >>>>>> >
> > >> >> >> >>> >>>>>>
> > >> >> >> >>> >>>>>>
> > >> >> >> >>> >>>>>>
> > >> >> >> >>> >>>>>> --
> > >> >> >> >>> >>>>>> Thanks and regards
> > >> >> >> >>> >>>>>>
> > >> >> >> >>> >>>>>> Chinmay Soman
> > >> >> >> >>> >>>>>>
> > >> >> >> >>> >>>>>
> > >> >> >> >>> >>>>>
> > >> >> >> >>> >>>>
> > >> >> >> >>> >>>
> > >> >> >> >>> >>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >>
> > >> >> >>
> > >> >> >
> > >> >>
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > Thanks and regards
> > >> >
> > >> > Chinmay Soman
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Thanks and regards
> > >>
> > >> Chinmay Soman
> > >>
> > >
> > >
> >
>
>
>
> --
> Thanks and regards
>
> Chinmay Soman
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message