samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ash W Matheson <ash.mathe...@gmail.com>
Subject Re: New to Samza/Yarn and having Kafka issues
Date Tue, 24 Mar 2015 01:43:15 GMT
If anyone's interested, I've posted a diff of the project here:
http://pastebin.com/6ZW6Y1Vu
and the python publisher here: http://pastebin.com/2NvTFDFx

if you want to take a stab at it.

On Mon, Mar 23, 2015 at 6:04 PM, Ash W Matheson <ash.matheson@gmail.com>
wrote:

> Ok, so very simple test, all running on a local machine, not across
> networks and all in the hello-samza repo this time around.
>
> I've got the datapusher.py file set up to push data into localhost. One
> event per second.
> And a modified hello-samza where I've modified the
> WikipediaParserStreamTask.java class to simply read what's there.
>
> Running them both now and I'm seeing in the stderr files
> (deploy/yarn/logs/userlogs/application_XXXXX/container_YYYY/stderr) the
> following:
>
> Exception in thread "main"
> org.apache.samza.system.SystemConsumersException: Cannot deserialize an
> incoming message.
>     at
> org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:293)
>     at org.apache.samza.system.SystemConsumers.org
> $apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
>     at
> org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276)
>     at
> org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>     at
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
>     at scala.collection.SetLike$class.map(SetLike.scala:93)
>     at scala.collection.AbstractSet.map(Set.scala:47)
>     at
> org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276)
>     at
> org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213)
>     at
> org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81)
>     at
> org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81)
>     at
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>     at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>     at
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:80)
>     at
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>     at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
>     at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>     at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>     at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>     at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
>     at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
>     at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> Caused by: org.codehaus.jackson.JsonParseException: Unexpected character
> ('M' (code 77)): expected a valid value (number, String, array, object,
> 'true', 'false' or 'null')
>  at [Source: [B@5454d285; line: 1, column: 2]
>     at
> org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
>     at
> org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
>     at
> org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306)
>     at
> org.codehaus.jackson.impl.Utf8StreamParser._handleUnexpectedValue(Utf8StreamParser.java:1581)
>     at
> org.codehaus.jackson.impl.Utf8StreamParser._nextTokenNotInObject(Utf8StreamParser.java:436)
>     at
> org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:322)
>     at
> org.codehaus.jackson.map.ObjectMapper._initForReading(ObjectMapper.java:2432)
>     at
> org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2389)
>     at
> org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
>     at org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
>     at
> org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:115)
>     at
> org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290)
>
>
> I changed the systems.kafka.samza.msg.serde=json to 'string' a while back,
> but that caused a separate exception.  However that was many, MANY attempts
> ago.
>
> On Mon, Mar 23, 2015 at 5:23 PM, Ash W Matheson <ash.matheson@gmail.com>
> wrote:
>
>> Ahh, I was going to add it to the run-class.sh script.
>>
>> Yeah, it's already there by default:
>>
>>
>> # Metrics
>> metrics.reporters=snapshot,jmx
>>
>> metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
>> metrics.reporter.snapshot.stream=kafka.metrics
>>
>> metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory
>>
>> So, where would I see those metrics?
>>
>> On Mon, Mar 23, 2015 at 5:15 PM, Ash W Matheson <ash.matheson@gmail.com>
>> wrote:
>>
>>> read: I'm a C++ programmer looking at Java for the first time in > 10
>>> years
>>>
>>> On Mon, Mar 23, 2015 at 5:13 PM, Ash W Matheson <ash.matheson@gmail.com>
>>> wrote:
>>>
>>>> I'm assuming I have Jmx defined ... where would that get set?
>>>>
>>>> On Mon, Mar 23, 2015 at 5:08 PM, Chinmay Soman <
>>>> chinmay.cerebro@gmail.com> wrote:
>>>>
>>>>> Hey Ash,
>>>>>
>>>>> Can you see your job metrics (if you have the Jmx metrics defined) to
>>>>> see
>>>>> if your job is actually doing anything ? My only guess at this point
>>>>> is the
>>>>> process method is not being called because somehow there's no incoming
>>>>> data. I could be totally wrong of course.
>>>>>
>>>>> On Mon, Mar 23, 2015 at 4:28 PM, Ash W Matheson <
>>>>> ash.matheson@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Just to be clear, here's what's changed from the default hello-samza
>>>>> repo:
>>>>> >
>>>>> > wikipedia-parser.properties==========================
>>>>> > task.inputs=kafka.myTopic
>>>>> > systems.kafka.consumer.zookeeper.connect=
>>>>> > ec2-xxx-xxx-xxx-xxx.compute-1.amazonaws.com:2181/
>>>>> > systems.kafka.consumer.auto.offset.reset=smallest
>>>>> >
>>>>> > WikipediaParserStreamTask.java =====================
>>>>> >   public void process(IncomingMessageEnvelope envelope,
>>>>> MessageCollector
>>>>> > collector, TaskCoordinator coordinator) {
>>>>> >     Map<String, Object> jsonObject = (Map<String, Object>)
>>>>> > envelope.getMessage();
>>>>> >     WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
>>>>> >
>>>>> >     try {
>>>>> >       System.out.println(event.getRawEvent());
>>>>> >       // Map<String, Object> parsedJsonObject =
>>>>> parse(event.getRawEvent());
>>>>> >
>>>>> >       // parsedJsonObject.put("channel", event.getChannel());
>>>>> >       // parsedJsonObject.put("source", event.getSource());
>>>>> >       // parsedJsonObject.put("time", event.getTime());
>>>>> >
>>>>> >       // collector.send(new OutgoingMessageEnvelope(new
>>>>> > SystemStream("kafka", "wikipedia-edits"), parsedJsonObject));
>>>>> >
>>>>> > as well as the aforementioned changes to the log4j.xml file.
>>>>> >
>>>>> > The data pushed into the 'myTopic' topic is nothing more than a
>>>>> sentence.
>>>>> >
>>>>> >
>>>>> > On Mon, Mar 23, 2015 at 4:16 PM, Ash W Matheson <
>>>>> ash.matheson@gmail.com>
>>>>> > wrote:
>>>>> >
>>>>> > > yep, modified log4j.xml to look like this:
>>>>> > >
>>>>> > >   <root>
>>>>> > >     <priority value="debug" />
>>>>> > >     <appender-ref ref="RollingAppender"/>
>>>>> > >     <appender-ref ref="jmx" />
>>>>> > >   </root>
>>>>> > >
>>>>> > > Not sure what you mean by #2.
>>>>> > >
>>>>> > > However, I'm running now, not seeing any exceptions, but still
not
>>>>> seeing
>>>>> > > any output from System.out.println(...)
>>>>> > >
>>>>> > > On Mon, Mar 23, 2015 at 11:29 AM, Naveen Somasundaram <
>>>>> > > nsomasundaram@linkedin.com.invalid> wrote:
>>>>> > >
>>>>> > >> Hey Ash,
>>>>> > >>                1. Did you happen to modify your log4j.xml
?
>>>>> > >>                2. Can you print the class path that was
printed
>>>>> when the
>>>>> > >> job started ? I am wondering if log4j was not loaded or
not
>>>>> present in
>>>>> > the
>>>>> > >> path where it’s looking for. If you have been using hello
samza,
>>>>> it
>>>>> > should
>>>>> > >> have pulled it from Maven.
>>>>> > >>
>>>>> > >> Thanks,
>>>>> > >> Naveen
>>>>> > >>
>>>>> > >> On Mar 22, 2015, at 10:35 AM, Ash W Matheson <
>>>>> ash.matheson@gmail.com>
>>>>> > >> wrote:
>>>>> > >>
>>>>> > >> > Hey all,
>>>>> > >> >
>>>>> > >> > Evaluating Samza currently and am running into some
odd issues.
>>>>> > >> >
>>>>> > >> > I'm currently working off the 'hello-samza' repo and
trying to
>>>>> parse a
>>>>> > >> > simple kafka topic that I've produced through an extenal
java
>>>>> app
>>>>> > >> (nothing
>>>>> > >> > other than a series of sentences) and it's failing
pretty hard
>>>>> for me.
>>>>> > >> The
>>>>> > >> > base 'hello-samza' set of apps works fine, but as
soon as I
>>>>> change the
>>>>> > >> > configuration to look at a different Kafka/zookeeper
I get the
>>>>> > >> following in
>>>>> > >> > the userlogs:
>>>>> > >> >
>>>>> > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable
to fetch last
>>>>> > offsets
>>>>> > >> > for streams [myTopic] due to kafka.common.KafkaException:
>>>>> fetching
>>>>> > topic
>>>>> > >> > metadata for topics [Set(myTopic)] from broker
>>>>> > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed.
Retrying.
>>>>> > >> >
>>>>> > >> >
>>>>> > >> > The modifications are pretty straightforward.  In
the
>>>>> > >> > Wikipedia-parser.properties, I've changed the following:
>>>>> > >> > task.inputs=kafka.myTopic
>>>>> > >> > systems.kafka.consumer.zookeeper.connect=redacted:2181/
>>>>> > >> > systems.kafka.consumer.auto.offset.reset=smallest
>>>>> > >> > systems.kafka.producer.metadata.broker.list=redacted:9092
>>>>> > >> >
>>>>> > >> > and in the actual java file WikipediaParserStreamTask.java
>>>>> > >> >  public void process(IncomingMessageEnvelope envelope,
>>>>> > MessageCollector
>>>>> > >> > collector, TaskCoordinator coordinator) {
>>>>> > >> >    Map<String, Object> jsonObject = (Map<String,
Object>)
>>>>> > >> > envelope.getMessage();
>>>>> > >> >    WikipediaFeedEvent event = new
>>>>> WikipediaFeedEvent(jsonObject);
>>>>> > >> >
>>>>> > >> >    try {
>>>>> > >> >        System.out.println(event.getRawEvent());
>>>>> > >> >
>>>>> > >> > And then following the compile/extract/run process
outlined in
>>>>> the
>>>>> > >> > hello-samza website.
>>>>> > >> >
>>>>> > >> > Any thoughts?  I've looked online for any 'super simple'
>>>>> examples of
>>>>> > >> > ingesting kafka in samza with very little success.
>>>>> > >>
>>>>> > >>
>>>>> > >
>>>>> >
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks and regards
>>>>>
>>>>> Chinmay Soman
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message