samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yan Fang <yanfang...@gmail.com>
Subject Re: Samza as a Caching layer
Date Fri, 05 Sep 2014 20:36:50 GMT
also was thinking of having fluentd push to Samza. But don't know how to
implement this. Not sure if adding a kafka layer between Samza and fluentd
is the only option.

Do other guys have better ideas?

Thanks,

Fang, Yan
yanfang724@gmail.com
+1 (206) 849-4108


On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur <ctippur@gmail.com> wrote:

> Yan,
>
> Wont it add additional hop. It did occur to me earlier but was not sure if
> this is the right way to go if we have a stringent sla driven system
> depending on it.
>
> - Shekar
>
>
> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang <yanfang724@gmail.com> wrote:
>
> > If you already put the events to the kafka, you can make the Samza
> accepts
> > the kafka topic, like the wikipedia-parse project in hello-samza accepts
> > the kafka topic wikipedia-raw ( see the config file ).
> >
> > Thanks,
> >
> > Fang, Yan
> > yanfang724@gmail.com
> > +1 (206) 849-4108
> >
> >
> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur <ctippur@gmail.com> wrote:
> >
> > > Awesome .. This works. Thanks a lot.
> > >
> > > Now off to my next step.
> > > I want to point to an incoming stream of events. These events are
> routed
> > > via fluentd. So, fluentd acts as a routing layer where it pushes the
> > events
> > > to kafka. Since it is a push and not a pull, any pointers on how to
> push
> > it
> > > to samza? Guessing I need a listener on Samza to collect this?
> > >
> > > - Shekar
> > >
> > >
> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang <yanfang724@gmail.com> wrote:
> > >
> > > > Aha, yes, we are almost there. I think I made a mistake in the
> previous
> > > > email.
> > > >
> > > > 1. modify the *wikipedia-parser.properties ,  NOT
> > > > *wikipedia-feed.properties
> > > > 2. run deploy/samza/bin/run-job.sh
> > > >
> > >
> >
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> > > >
> > >
> >
> --config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.properties*
> > > > *(NOT *wikipedia-feed,properties*)*
> > > >
> > > > Then you should see the messages in the kafka topic,
> *wikipedia-edits*
> > > >
> > > > Thanks. Let me know if you have any luck . :)
> > > >
> > > > Cheers,
> > > >
> > > > Fang, Yan
> > > > yanfang724@gmail.com
> > > > +1 (206) 849-4108
> > > >
> > > >
> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar Tippur <ctippur@gmail.com>
> > > wrote:
> > > >
> > > > > Just tried #3. Changed the property file wikipedia-feed.properties
> > > > >
> > > > > job.factory.class=org.apache.samza.job.local.LocalJobFactory
> > > > > Ran ..
> > > > >
> > > > > deploy/samza/bin/run-job.sh
> > > > >
> > > >
> > >
> >
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> > > > >
> > > > >
> > > >
> > >
> >
> --config-path=file:///home/ctippur/hello-samza/deploy/samza/config/wikipedia-feed.properties
> > > > >
> > > > > I dont see any debug messages that I added to the feed or the
> parser
> > > > file..
> > > > > I see messages on the kafka-consumer ..
> > > > >
> > > > > However the feed job died with the below message
> > > > >
> > > > >
> > > > > Exception in thread "ThreadJob" java.lang.RuntimeException: Trying
> to
> > > > > unlisten to a channel that has no listeners in it.
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFeed.java:98)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaConsumer.java:72)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemConsumers.scala:152)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(SystemConsumers.scala:152)
> > > > >
> > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > > > >
> > > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > > > >
> > > > > at
> > > > >
> > >
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > > > >
> > > > > at
> > > >
> org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:152)
> > > > >
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaContainer.scala:587)
> > > > >
> > > > > at
> > > >
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:512)
> > > > >
> > > > >  at
> > > org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
> > > > >
> > > > > - Shekar
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message