samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ash W Matheson <ash.mathe...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Tue, 24 Mar 2015 05:04:31 GMT
just to clarify, adding:

serializers.registry.string.class=org.apache.samza.
serializers.StringSerdeFactory
systems.kafka.streams.myTopic.samza.msg.serde=string

to the property file and updating the java source to:
System.out.println((String)envelope.getMessage());

Did the trick. I've updated the pastebin with the appropriate change.

Now, through this whole process I've assumed that StreamTask is the
appropriate class to derive from.  Essentially, the end goal is to decode a
compressed bytestream Kafka event (from a different topic, of course) and
then feed it to a DB/TSDB/whatever.  I didn't think that I'd need to
generate a Job for this and that this stream should be able to feed
directly to the output entity.

Anyway, over the next couple of days I'll migrate this into something that
can live in the hello-samza ecosystem as a separate task. I've got the
start of a java based DataPusher built as well.

On Mon, Mar 23, 2015 at 9:40 PM, Ash W Matheson <ash.matheson@gmail.com>
wrote:

> Huzzah!  I ... have ... text showing!
>
> This has been enough of a trial that I think I'll convert this into a very
> simple sample project for the repo, if you guys are interested.
>
> Diff coming once I have it cleaned up into something less ugly.
>
> -Ash
>
> On Mon, Mar 23, 2015 at 9:27 PM, Chinmay Soman <chinmay.cerebro@gmail.com>
> wrote:
>
>> >I changed the systems.kafka.samza.msg.serde=json to 'string' a while
>> back,
>> but that caused a separate exception.  However that was many, MANY
>> attempts
>> ago.
>>
>> This may not work because that will set all serialization formats (input
>> and output) to json / string. In your case you're inputting string and
>> outputting json. So you might have to set that explicitly.
>>
>> On Mon, Mar 23, 2015 at 9:24 PM, Chinmay Soman <chinmay.cerebro@gmail.com
>> >
>> wrote:
>>
>> > Since you're producing String data to 'myTopic', can you try setting the
>> > string serialization in your config ?
>> >
>> >
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>> >
>> > systems.kafka.streams.myTopic.samza.msg.serde=string
>> >
>> >
>> > On Mon, Mar 23, 2015 at 9:17 PM, Ash W Matheson <ash.matheson@gmail.com
>> >
>> > wrote:
>> >
>> >> more info - new exception message:
>> >>
>> >> Exception in thread "main"
>> >> org.apache.samza.system.SystemConsumersException: Cannot deserialize an
>> >> incoming message.
>> >>
>> >> Updated the diff in pastebin with the changes.
>> >>
>> >> On Mon, Mar 23, 2015 at 8:41 PM, Ash W Matheson <
>> ash.matheson@gmail.com>
>> >> wrote:
>> >>
>> >> > Gah!  Yeah, those were gone several revisions ago but didn't get
>> nuked
>> >> in
>> >> > the last iteration.
>> >> >
>> >> > OK, let me do a quick test to see if that was my problem all along.
>> >> >
>> >> > On Mon, Mar 23, 2015 at 8:38 PM, Navina Ramesh <
>> >> > nramesh@linkedin.com.invalid> wrote:
>> >> >
>> >> >> Hey Ash,
>> >> >> I was referring to the lines before the try block.
>> >> >>
>> >> >> Map<String, Object> jsonObject = (Map<String, Object>)
>> >> >> envelope.getMessage();
>> >> >>     WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
>> >> >>
>> >> >>     try {
>> >> >>       System.out.println("[DWH] should see this");
>> >> >>       System.out.println(event.getRawEvent());
>> >> >> …
>> >> >>
>> >> >>
>> >> >> Did you remove those lines as well?
>> >> >>
>> >> >> Navina
>> >> >>
>> >> >> On 3/23/15, 8:31 PM, "Ash W Matheson" <ash.matheson@gmail.com>
>> wrote:
>> >> >>
>> >> >> >Just looking at the diff I posted and it's:
>> >> >> >
>> >> >> >
>> >> >> >   1.      try {
>> >> >> >   2. -      Map<String, Object> parsedJsonObject =
>> >> >> >parse(event.getRawEvent(
>> >> >> >   ));
>> >> >> >   3. +      System.out.println("[DWH] should see this");
>> >> >> >   4. +      System.out.println(event.getRawEvent());
>> >> >> >   5. +      // Map<String, Object> parsedJsonObject
= parse(
>> >> >> >   event.getRawEvent());
>> >> >> >
>> >> >> >
>> >> >> >I've removed the Map and added two System.out.println calls.
 So
>> no,
>> >> >> there
>> >> >> >shouldn't be any reference to
>> >> >> >Map<String, Object> parsedJsonObject = parse(event.getRawEvent());
>> >> >> >in the source java file.
>> >> >> >
>> >> >> >
>> >> >> >On Mon, Mar 23, 2015 at 7:42 PM, Ash W Matheson <
>> >> ash.matheson@gmail.com>
>> >> >> >wrote:
>> >> >> >
>> >> >> >> I'm in transit right now but if memory serves me everything
>> should
>> >> be
>> >> >> >> commented out of that method except for the System.out.println
>> call.
>> >> >> >>I'll
>> >> >> >> be home shortly and can confirm.
>> >> >> >> On Mar 23, 2015 7:28 PM, "Navina Ramesh"
>> >> <nramesh@linkedin.com.invalid
>> >> >> >
>> >> >> >> wrote:
>> >> >> >>
>> >> >> >>> Hi Ash,
>> >> >> >>> I just ran wikipedia-parser with your patch. Looks
like you have
>> >> set
>> >> >> >>>the
>> >> >> >>> message serde correctly in the configs. However, the
original
>> code
>> >> >> >>>still
>> >> >> >>> converts it into a Map for consumption in the
>> WikipediaFeedEvent.
>> >> >> >>> I am seeing the following (expected):
>> >> >> >>>
>> >> >> >>> 2015-03-23 19:17:49 SamzaContainerExceptionHandler
[ERROR]
>> Uncaught
>> >> >> >>> exception in thread (name=main). Exiting process now.
>> >> >> >>> java.lang.ClassCastException: java.lang.String cannot
be cast to
>> >> >> >>> java.util.Map
>> >> >> >>>  at
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>samza.examples.wikipedia.task.WikipediaParserStreamTask.process(Wikipedi
>> >> >> >>>aPa
>> >> >> >>> rserStreamTask.java:38)
>> >> >> >>>  at
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(
>> >> >> >>>Tas
>> >> >> >>> kInstance.scala:133)
>> >> >> >>>
>> >> >> >>> Did you make the changes to fix this error? Your patch
doesn¹t
>> >> seem to
>> >> >> >>> have that.
>> >> >> >>> Line 38 Map<String, Object> jsonObject = (Map<String,
Object>)
>> >> >> >>> envelope.getMessage();
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> Lmk so I can investigate further.
>> >> >> >>>
>> >> >> >>> Cheers!
>> >> >> >>> Navina
>> >> >> >>>
>> >> >> >>> On 3/23/15, 6:43 PM, "Ash W Matheson" <ash.matheson@gmail.com>
>> >> wrote:
>> >> >> >>>
>> >> >> >>> >If anyone's interested, I've posted a diff of
the project here:
>> >> >> >>> >http://pastebin.com/6ZW6Y1Vu
>> >> >> >>> >and the python publisher here: http://pastebin.com/2NvTFDFx
>> >> >> >>> >
>> >> >> >>> >if you want to take a stab at it.
>> >> >> >>> >
>> >> >> >>> >On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson
>> >> >> >>><ash.matheson@gmail.com>
>> >> >> >>> >wrote:
>> >> >> >>> >
>> >> >> >>> >> Ok, so very simple test, all running on a
local machine, not
>> >> across
>> >> >> >>> >> networks and all in the hello-samza repo
this time around.
>> >> >> >>> >>
>> >> >> >>> >> I've got the datapusher.py file set up to
push data into
>> >> localhost.
>> >> >> >>>One
>> >> >> >>> >> event per second.
>> >> >> >>> >> And a modified hello-samza where I've modified
the
>> >> >> >>> >> WikipediaParserStreamTask.java class to simply
read what's
>> >> there.
>> >> >> >>> >>
>> >> >> >>> >> Running them both now and I'm seeing in the
stderr files
>> >> >> >>> >>
>> >> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr)
>> >> >> >>>the
>> >> >> >>> >> following:
>> >> >> >>> >>
>> >> >> >>> >> Exception in thread "main"
>> >> >> >>> >> org.apache.samza.system.SystemConsumersException:
Cannot
>> >> >> >>>deserialize an
>> >> >> >>> >> incoming message.
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
>> >> >> >>>>>93)
>> >> >> >>> >>     at org.apache.samza.system.SystemConsumers.org
>> >> >> >>> >>
>> >> >>
>> >>
>> >>>$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
>> >> >> >>>>>mCo
>> >> >> >>> >>nsumers.scala:276)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(Syste
>> >> >> >>>>>mCo
>> >> >> >>> >>nsumers.scala:276)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>> >> >> >>>>>sca
>> >> >> >>> >>la:244)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.
>> >> >> >>>>>sca
>> >> >> >>> >>la:244)
>> >> >> >>> >>     at
>> >> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> >> >>> >>     at
>> >> >> >>>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >> >>> >>     at
>> >> >> >>>
>> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >>
>> >>
>> >>>scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.s
>> >> >> >>>>>cal
>> >> >> >>> >>a:47)
>> >> >> >>> >>     at scala.collection.SetLike$class.map(SetLike.scala:93)
>> >> >> >>> >>     at scala.collection.AbstractSet.map(Set.scala:47)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:
>> >> >> >>>>>276
>> >> >> >>> >>)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:2
>> >> >> >>>>>13)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
>> >> >> >>>>>(Ru
>> >> >> >>> >>nLoop.scala:81)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply
>> >> >> >>>>>(Ru
>> >> >> >>> >>nLoop.scala:81)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >>
>> >>
>> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>> >> >> >>> >>     at
>> >> >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.sc
>> >> >> >>>>>ala
>> >> >> >>> >>:80)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >>
>> >>
>> >>>org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>> >> >> >>> >>     at
>> >> >> >>>org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>> >> >> >>> >>     at
>> >> org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>> >> >> >>> >>     at
>> org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >>
>> >>
>> >>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.sca
>> >> >> >>>>>la:
>> >> >> >>> >>108)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>> >> >> >>> >>     at
>> >> >> >>>
>> >> >>org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>> >> >> >>> >> Caused by: org.codehaus.jackson.JsonParseException:
>> Unexpected
>> >> >> >>> character
>> >> >> >>> >> ('M' (code 77)): expected a valid value (number,
String,
>> array,
>> >> >> >>>object,
>> >> >> >>> >> 'true', 'false' or 'null')
>> >> >> >>> >>  at [Source: [B@5454d285; line: 1, column:
2]
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >>
>> >>
>> >>>org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParse
>> >> >> >>>>>rMi
>> >> >> >>> >>nimalBase.java:385)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(
>> >> >> >>>>>Jso
>> >> >> >>> >>nParserMinimalBase.java:306)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8
>> >> >> >>>>>Str
>> >> >> >>> >>eamParser.java:1581)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8S
>> >> >> >>>>>tre
>> >> >> >>> >>amParser.java:436)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.
>> >> >> >>>>>jav
>> >> >> >>> >>a:322)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.jav
>> >> >> >>>>>a:2
>> >> >> >>> >>432)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.ja
>> >> >> >>>>>va:
>> >> >> >>> >>2389)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >>
>> >>
>> >>>org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>> >> >> >>> >>     at
>> >> >> >>>
>> >> >>org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala
>> >> >> >>>>>:11
>> >> >> >>> >>5)
>> >> >> >>> >>     at
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:2
>> >> >> >>>>>90)
>> >> >> >>> >>
>> >> >> >>> >>
>> >> >> >>> >> I changed the systems.kafka.samza.msg.serde=json
to 'string'
>> a
>> >> >> while
>> >> >> >>> >>back,
>> >> >> >>> >> but that caused a separate exception.  However
that was many,
>> >> MANY
>> >> >> >>> >>attempts
>> >> >> >>> >> ago.
>> >> >> >>> >>
>> >> >> >>> >> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson
<
>> >> >> >>> ash.matheson@gmail.com>
>> >> >> >>> >> wrote:
>> >> >> >>> >>
>> >> >> >>> >>> Ahh, I was going to add it to the run-class.sh
script.
>> >> >> >>> >>>
>> >> >> >>> >>> Yeah, it's already there by default:
>> >> >> >>> >>>
>> >> >> >>> >>>
>> >> >> >>> >>> # Metrics
>> >> >> >>> >>> metrics.reporters=snapshot,jmx
>> >> >> >>> >>>
>> >> >> >>> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>>metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.Met
>> >> >> >>>>>>ric
>> >> >> >>> >>>sSnapshotReporterFactory
>> >> >> >>> >>> metrics.reporter.snapshot.stream=kafka.metrics
>> >> >> >>> >>>
>> >> >> >>> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>>>>>metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxRepor
>> >> >> >>>>>>ter
>> >> >> >>> >>>Factory
>> >> >> >>> >>>
>> >> >> >>> >>> So, where would I see those metrics?
>> >> >> >>> >>>
>> >> >> >>> >>> On Mon, Mar 23, 2015 at 5:15 PM, Ash
W Matheson
>> >> >> >>> >>><ash.matheson@gmail.com>
>> >> >> >>> >>> wrote:
>> >> >> >>> >>>
>> >> >> >>> >>>> read: I'm a C++ programmer looking
at Java for the first
>> time
>> >> in
>> >> >> >>>> 10
>> >> >> >>> >>>> years
>> >> >> >>> >>>>
>> >> >> >>> >>>> On Mon, Mar 23, 2015 at 5:13 PM,
Ash W Matheson
>> >> >> >>> >>>><ash.matheson@gmail.com>
>> >> >> >>> >>>> wrote:
>> >> >> >>> >>>>
>> >> >> >>> >>>>> I'm assuming I have Jmx defined
... where would that get
>> set?
>> >> >> >>> >>>>>
>> >> >> >>> >>>>> On Mon, Mar 23, 2015 at 5:08
PM, Chinmay Soman <
>> >> >> >>> >>>>> chinmay.cerebro@gmail.com>
wrote:
>> >> >> >>> >>>>>
>> >> >> >>> >>>>>> Hey Ash,
>> >> >> >>> >>>>>>
>> >> >> >>> >>>>>> Can you see your job metrics
(if you have the Jmx metrics
>> >> >> >>>defined)
>> >> >> >>> >>>>>>to
>> >> >> >>> >>>>>> see
>> >> >> >>> >>>>>> if your job is actually doing
anything ? My only guess at
>> >> this
>> >> >> >>> point
>> >> >> >>> >>>>>> is the
>> >> >> >>> >>>>>> process method is not being
called because somehow
>> there's
>> >> no
>> >> >> >>> >>>>>>incoming
>> >> >> >>> >>>>>> data. I could be totally
wrong of course.
>> >> >> >>> >>>>>>
>> >> >> >>> >>>>>> On Mon, Mar 23, 2015 at 4:28
PM, Ash W Matheson <
>> >> >> >>> >>>>>> ash.matheson@gmail.com>
>> >> >> >>> >>>>>> wrote:
>> >> >> >>> >>>>>>
>> >> >> >>> >>>>>> > Just to be clear, here's
what's changed from the
>> default
>> >> >> >>> >>>>>>hello-samza
>> >> >> >>> >>>>>> repo:
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> > wikipedia-parser.properties==========================
>> >> >> >>> >>>>>> > task.inputs=kafka.myTopic
>> >> >> >>> >>>>>> > systems.kafka.consumer.zookeeper.connect=
>> >> >> >>> >>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
>> >> >> >>> >>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> > WikipediaParserStreamTask.java
=====================
>> >> >> >>> >>>>>> >   public void process(IncomingMessageEnvelope
envelope,
>> >> >> >>> >>>>>> MessageCollector
>> >> >> >>> >>>>>> > collector, TaskCoordinator
coordinator) {
>> >> >> >>> >>>>>> >     Map<String, Object>
jsonObject = (Map<String,
>> Object>)
>> >> >> >>> >>>>>> > envelope.getMessage();
>> >> >> >>> >>>>>> >     WikipediaFeedEvent
event = new
>> >> >> >>> WikipediaFeedEvent(jsonObject);
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> >     try {
>> >> >> >>> >>>>>> >       System.out.println(event.getRawEvent());
>> >> >> >>> >>>>>> >       // Map<String,
Object> parsedJsonObject =
>> >> >> >>> >>>>>> parse(event.getRawEvent());
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> >       // parsedJsonObject.put("channel",
>> >> event.getChannel());
>> >> >> >>> >>>>>> >       // parsedJsonObject.put("source",
>> >> event.getSource());
>> >> >> >>> >>>>>> >       // parsedJsonObject.put("time",
event.getTime());
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> >       // collector.send(new
OutgoingMessageEnvelope(new
>> >> >> >>> >>>>>> > SystemStream("kafka",
"wikipedia-edits"),
>> >> parsedJsonObject));
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> > as well as the aforementioned
changes to the log4j.xml
>> >> file.
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> > The data pushed into
the 'myTopic' topic is nothing
>> more
>> >> than
>> >> >> >>>a
>> >> >> >>> >>>>>> sentence.
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> > On Mon, Mar 23, 2015
at 4:16 PM, Ash W Matheson <
>> >> >> >>> >>>>>> ash.matheson@gmail.com>
>> >> >> >>> >>>>>> > wrote:
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>> > > yep, modified log4j.xml
to look like this:
>> >> >> >>> >>>>>> > >
>> >> >> >>> >>>>>> > >   <root>
>> >> >> >>> >>>>>> > >     <priority
value="debug" />
>> >> >> >>> >>>>>> > >     <appender-ref
ref="RollingAppender"/>
>> >> >> >>> >>>>>> > >     <appender-ref
ref="jmx" />
>> >> >> >>> >>>>>> > >   </root>
>> >> >> >>> >>>>>> > >
>> >> >> >>> >>>>>> > > Not sure what you
mean by #2.
>> >> >> >>> >>>>>> > >
>> >> >> >>> >>>>>> > > However, I'm running
now, not seeing any exceptions,
>> but
>> >> >> >>>still
>> >> >> >>> >>>>>>not
>> >> >> >>> >>>>>> seeing
>> >> >> >>> >>>>>> > > any output from
System.out.println(...)
>> >> >> >>> >>>>>> > >
>> >> >> >>> >>>>>> > > On Mon, Mar 23,
2015 at 11:29 AM, Naveen
>> Somasundaram <
>> >> >> >>> >>>>>> > > nsomasundaram@linkedin.com.invalid>
wrote:
>> >> >> >>> >>>>>> > >
>> >> >> >>> >>>>>> > >> Hey Ash,
>> >> >> >>> >>>>>> > >>           
    1. Did you happen to modify your
>> >> log4j.xml
>> >> >> ?
>> >> >> >>> >>>>>> > >>           
    2. Can you print the class path that
>> was
>> >> >> >>> printed
>> >> >> >>> >>>>>> when the
>> >> >> >>> >>>>>> > >> job started
? I am wondering if log4j was not
>> loaded or
>> >> >> not
>> >> >> >>> >>>>>> present in
>> >> >> >>> >>>>>> > the
>> >> >> >>> >>>>>> > >> path where
it¹s looking for. If you have been using
>> >> hello
>> >> >> >>> >>>>>>samza,
>> >> >> >>> >>>>>> it
>> >> >> >>> >>>>>> > should
>> >> >> >>> >>>>>> > >> have pulled
it from Maven.
>> >> >> >>> >>>>>> > >>
>> >> >> >>> >>>>>> > >> Thanks,
>> >> >> >>> >>>>>> > >> Naveen
>> >> >> >>> >>>>>> > >>
>> >> >> >>> >>>>>> > >> On Mar 22,
2015, at 10:35 AM, Ash W Matheson <
>> >> >> >>> >>>>>> ash.matheson@gmail.com>
>> >> >> >>> >>>>>> > >> wrote:
>> >> >> >>> >>>>>> > >>
>> >> >> >>> >>>>>> > >> > Hey all,
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> > Evaluating
Samza currently and am running into
>> some
>> >> odd
>> >> >> >>> >>>>>>issues.
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> > I'm currently
working off the 'hello-samza' repo
>> and
>> >> >> >>>trying
>> >> >> >>> >>>>>>to
>> >> >> >>> >>>>>> parse a
>> >> >> >>> >>>>>> > >> > simple
kafka topic that I've produced through an
>> >> extenal
>> >> >> >>> java
>> >> >> >>> >>>>>> app
>> >> >> >>> >>>>>> > >> (nothing
>> >> >> >>> >>>>>> > >> > other
than a series of sentences) and it's failing
>> >> >> pretty
>> >> >> >>> >>>>>>hard
>> >> >> >>> >>>>>> for me.
>> >> >> >>> >>>>>> > >> The
>> >> >> >>> >>>>>> > >> > base 'hello-samza'
set of apps works fine, but as
>> >> soon
>> >> >> >>>as I
>> >> >> >>> >>>>>> change the
>> >> >> >>> >>>>>> > >> > configuration
to look at a different
>> Kafka/zookeeper
>> >> I
>> >> >> >>>get
>> >> >> >>> >>>>>>the
>> >> >> >>> >>>>>> > >> following in
>> >> >> >>> >>>>>> > >> > the userlogs:
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> > 2015-03-22
17:07:09 KafkaSystemAdmin [WARN]
>> Unable to
>> >> >> >>>fetch
>> >> >> >>> >>>>>>last
>> >> >> >>> >>>>>> > offsets
>> >> >> >>> >>>>>> > >> > for streams
[myTopic] due to
>> >> >> kafka.common.KafkaException:
>> >> >> >>> >>>>>> fetching
>> >> >> >>> >>>>>> > topic
>> >> >> >>> >>>>>> > >> > metadata
for topics [Set(myTopic)] from broker
>> >> >> >>> >>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)]
>> failed.
>> >> >> >>> Retrying.
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> > The modifications
are pretty straightforward.  In
>> the
>> >> >> >>> >>>>>> > >> > Wikipedia-parser.properties,
I've changed the
>> >> following:
>> >> >> >>> >>>>>> > >> > task.inputs=kafka.myTopic
>> >> >> >>> >>>>>> > >> >
>> >> systems.kafka.consumer.zookeeper.connect=redacted:2181/
>> >> >> >>> >>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest
>> >> >> >>> >>>>>> > >> >
>> >> >> systems.kafka.producer.metadata.broker.list=redacted:9092
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> > and in
the actual java file
>> >> >> >>>WikipediaParserStreamTask.java
>> >> >> >>> >>>>>> > >> >  public
void process(IncomingMessageEnvelope
>> >> envelope,
>> >> >> >>> >>>>>> > MessageCollector
>> >> >> >>> >>>>>> > >> > collector,
TaskCoordinator coordinator) {
>> >> >> >>> >>>>>> > >> >    Map<String,
Object> jsonObject = (Map<String,
>> >> >> Object>)
>> >> >> >>> >>>>>> > >> > envelope.getMessage();
>> >> >> >>> >>>>>> > >> >    WikipediaFeedEvent
event = new
>> >> >> >>> >>>>>> WikipediaFeedEvent(jsonObject);
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> >    try
{
>> >> >> >>> >>>>>> > >> >      
 System.out.println(event.getRawEvent());
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> > And then
following the compile/extract/run process
>> >> >> >>>outlined
>> >> >> >>> >>>>>>in
>> >> >> >>> >>>>>> the
>> >> >> >>> >>>>>> > >> > hello-samza
website.
>> >> >> >>> >>>>>> > >> >
>> >> >> >>> >>>>>> > >> > Any thoughts?
 I've looked online for any 'super
>> >> simple'
>> >> >> >>> >>>>>> examples of
>> >> >> >>> >>>>>> > >> > ingesting
kafka in samza with very little success.
>> >> >> >>> >>>>>> > >>
>> >> >> >>> >>>>>> > >>
>> >> >> >>> >>>>>> > >
>> >> >> >>> >>>>>> >
>> >> >> >>> >>>>>>
>> >> >> >>> >>>>>>
>> >> >> >>> >>>>>>
>> >> >> >>> >>>>>> --
>> >> >> >>> >>>>>> Thanks and regards
>> >> >> >>> >>>>>>
>> >> >> >>> >>>>>> Chinmay Soman
>> >> >> >>> >>>>>>
>> >> >> >>> >>>>>
>> >> >> >>> >>>>>
>> >> >> >>> >>>>
>> >> >> >>> >>>
>> >> >> >>> >>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > Thanks and regards
>> >
>> > Chinmay Soman
>> >
>>
>>
>>
>> --
>> Thanks and regards
>>
>> Chinmay Soman
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message