samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@linkedin.com.INVALID>
Subject Re: Samza as a Caching layer
Date Tue, 09 Sep 2014 22:15:30 GMT
Hey Shekar,

Your understanding of the jobs is correct.

The WikipediaFeedEvent class is just a convenience class so that
StreamTask code doesn't have to deal with poking around inside the JSON
structure. Instead, they get WikipediaFeedEvent events, which means the
code can just call WikipediaFeedEvent.getChannel, etc.

Cheers,
Chris

On 9/9/14 2:58 PM, "Shekar Tippur" <ctippur@gmail.com> wrote:

>More Questions ..
>
>Please correct me if I am wrong. As I am trying to unravel hello-samza,
>
>http://samza.incubator.apache.org/learn/tutorials/0.7.0/run-hello-samza-wi
>thout-internet.html
>
>1. wikipedia-feed.properties - deploy a Samza job which listens to
>wikipedia API, receives the feed in realtime and produces the feed to the
>Kafka topic wikipedia-raw
>2. wikipedia-parser - This pulls the messages from wikipedia-raw and
>registers to an event that checks the integrity of the incoming message.
>Why do we need to register this to an event?
>
>WikipediaFeedEvent event = new WikipediaFeedEvent(jsonObject);
>
>
>- Shekar
>
>On Mon, Sep 8, 2014 at 10:54 AM, Chris Riccomini <
>criccomini@linkedin.com.invalid> wrote:
>
>> Hey Shekar,
>>
>> Sure. If your input stream has 8 partitions and is partitioned by "ip
>> address", then your state stream must also have 8 partitions and be
>> partitioned by "ip address". This is to guarantee that the StreamTask
>>that
>> receives a message from the stream will have the state required to do
>>the
>> table join in its local store.
>>
>> Cheers,
>> Chris
>>
>> On 9/8/14 10:51 AM, "Shekar Tippur" <ctippur@gmail.com> wrote:
>>
>> >Chris -
>> >Can you please elaborate on
>> >
>> >"Also note that if you take this approach, your state
>> >must be partitioned in the same way as your input stream."
>> >
>> >- Shekar
>> >
>> >On Mon, Sep 8, 2014 at 10:29 AM, Chris Riccomini <
>> >criccomini@linkedin.com.invalid> wrote:
>> >
>> >> Hey Shekar,
>> >>
>> >> You can either run some external DB that holds the data set, and you
>>can
>> >> query it from a StreamTask, or you can use Samza's state store
>>feature
>> >>to
>> >> push data into a stream that you can then store in a partitioned
>> >>key-value
>> >> store along with your StreamTasks. There is some documentation here
>> >>about
>> >> the state store approach:
>> >>
>> >>
>> >>
>> 
>>http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
>> >>te
>> >> -management.html
>> >>
>> >> Putting your data into a Kafka stream is going to require more up
>>front
>> >> effort from you, since you'll have to understand how Kafka's
>> >>partitioning
>> >> model works, and setup some pipeline to push the updates for your
>>state.
>> >> In the long run, I believe it's the better approach, though. Local
>> >>lookups
>> >> on a key-value store should be faster than doing remote RPC calls to
>>a
>> >>DB
>> >> for every message. Also note that if you take this approach, your
>>state
>> >> must be partitioned in the same way as your input stream.
>> >>
>> >> I'm sorry I can't give you a more definitive answer. It's really
>>about
>> >> trade-offs.
>> >>
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 9/8/14 10:17 AM, "Shekar Tippur" <ctippur@gmail.com> wrote:
>> >>
>> >> >Hello,
>> >> >
>> >> >I am able to read messages of of a new kafka queue now.
>> >> >The next task is to enrich the data with more information. The data
>> >>that
>> >> >is
>> >> >flowing in has ip address or host name. I have a redis cache where
>> >>there
>> >> >is
>> >> >more contextual information (like owner of the alert, SLA, etc). The
>> >>data
>> >> >in redis does not change often.
>> >> >Pretty much becomes a stream table join.
>> >> >I can also dump the same data to a different kafka queue and make
>>it a
>> >> >stream - stream join as well.
>> >> >
>> >> >What do you guys recommend?
>> >> >
>> >> >- Shekar
>> >> >
>> >> >On Fri, Sep 5, 2014 at 4:08 PM, Chris Riccomini <
>> >> >criccomini@linkedin.com.invalid> wrote:
>> >> >
>> >> >> Hey Guys,
>> >> >>
>> >> >> I don't know a whole lot about Fluentd, but if you don't want to
>>do
>> >>this
>> >> >> flow:
>> >> >>
>> >> >>   Fluentd -> Kafka -> Samza
>> >> >>
>> >> >> Then the alternative is:
>> >> >>
>> >> >>   Fluentd -> Samza
>> >> >>
>> >> >> The "direct" approach (no Kafka) is going to be pretty labor
>> >>intensive
>> >> >>to
>> >> >> build. You'd have to:
>> >> >>
>> >> >> 1. Implement a FluentdSystemConsumer for Samza.
>> >> >> 2. Write a Flutend data output plugin, which sends to the
>> >> >> FluentdSystemConsumer.
>> >> >> 3. Figure out a way for the Fluentd data output plugin to
>>"discover"
>> >> >>where
>> >> >> the Samza FluentdSystemConsumer is located (since SamzaContainers
>>are
>> >> >> deployed to dynamic hosts in YARN, and move around a lot).
>> >> >> 4. Implement a bare-bones FluentdSystemAdmin and
>>FluentdSystemFactory
>> >> >> class (similar to the WikipediaSystemFactory in hello-samza).
>> >> >> 5. Decide on some partitioning model that makes sense for Fluentd.
>> >>Maybe
>> >> >> one partition = one host? Not sure how Fluentd works here.
>> >> >>
>> >> >> My instinct is that it's going to be *far* better to use the first
>> >> >> approach (pipe the Fluentd events into Kafka). This will give you
>> >>all of
>> >> >> the semantics that Kafka provides (e.g. Ordering within a
>>partition,
>> >> >> rewinding streams, durability, etc).
>> >> >>
>> >> >> Cheers,
>> >> >> Chris
>> >> >>
>> >> >> On 9/5/14 1:36 PM, "Yan Fang" <yanfang724@gmail.com> wrote:
>> >> >>
>> >> >> >also was thinking of having fluentd push to Samza. But don't
know
>> >>how
>> >> >>to
>> >> >> >implement this. Not sure if adding a kafka layer between Samza
>>and
>> >> >>fluentd
>> >> >> >is the only option.
>> >> >> >
>> >> >> >Do other guys have better ideas?
>> >> >> >
>> >> >> >Thanks,
>> >> >> >
>> >> >> >Fang, Yan
>> >> >> >yanfang724@gmail.com
>> >> >> >+1 (206) 849-4108
>> >> >> >
>> >> >> >
>> >> >> >On Fri, Sep 5, 2014 at 12:09 PM, Shekar Tippur
>><ctippur@gmail.com>
>> >> >>wrote:
>> >> >> >
>> >> >> >> Yan,
>> >> >> >>
>> >> >> >> Wont it add additional hop. It did occur to me earlier
but was
>>not
>> >> >>sure
>> >> >> >>if
>> >> >> >> this is the right way to go if we have a stringent sla
driven
>> >>system
>> >> >> >> depending on it.
>> >> >> >>
>> >> >> >> - Shekar
>> >> >> >>
>> >> >> >>
>> >> >> >> On Fri, Sep 5, 2014 at 10:55 AM, Yan Fang
>><yanfang724@gmail.com>
>> >> >>wrote:
>> >> >> >>
>> >> >> >> > If you already put the events to the kafka, you can
make the
>> >>Samza
>> >> >> >> accepts
>> >> >> >> > the kafka topic, like the wikipedia-parse project
in
>>hello-samza
>> >> >> >>accepts
>> >> >> >> > the kafka topic wikipedia-raw ( see the config file
).
>> >> >> >> >
>> >> >> >> > Thanks,
>> >> >> >> >
>> >> >> >> > Fang, Yan
>> >> >> >> > yanfang724@gmail.com
>> >> >> >> > +1 (206) 849-4108
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > On Fri, Sep 5, 2014 at 8:48 AM, Shekar Tippur
>> >><ctippur@gmail.com>
>> >> >> >>wrote:
>> >> >> >> >
>> >> >> >> > > Awesome .. This works. Thanks a lot.
>> >> >> >> > >
>> >> >> >> > > Now off to my next step.
>> >> >> >> > > I want to point to an incoming stream of events.
These
>>events
>> >>are
>> >> >> >> routed
>> >> >> >> > > via fluentd. So, fluentd acts as a routing layer
where it
>> >>pushes
>> >> >>the
>> >> >> >> > events
>> >> >> >> > > to kafka. Since it is a push and not a pull,
any pointers
>>on
>> >>how
>> >> >>to
>> >> >> >> push
>> >> >> >> > it
>> >> >> >> > > to samza? Guessing I need a listener on Samza
to collect
>>this?
>> >> >> >> > >
>> >> >> >> > > - Shekar
>> >> >> >> > >
>> >> >> >> > >
>> >> >> >> > > On Fri, Sep 5, 2014 at 1:03 AM, Yan Fang
>> >><yanfang724@gmail.com>
>> >> >> >>wrote:
>> >> >> >> > >
>> >> >> >> > > > Aha, yes, we are almost there. I think
I made a mistake
>>in
>> >>the
>> >> >> >> previous
>> >> >> >> > > > email.
>> >> >> >> > > >
>> >> >> >> > > > 1. modify the *wikipedia-parser.properties
,  NOT
>> >> >> >> > > > *wikipedia-feed.properties
>> >> >> >> > > > 2. run deploy/samza/bin/run-job.sh
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfig
>>>>>>>>Fa
>> >>>>>>ct
>> >> >>>>or
>> >> >> >>y
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>--config-path=file://$PWD/deploy/samza/config/*wikipedia-parser.pro
>>>>>>>>pe
>> >>>>>>rt
>> >> >>>>ie
>> >> >> >>s*
>> >> >> >> > > > *(NOT *wikipedia-feed,properties*)*
>> >> >> >> > > >
>> >> >> >> > > > Then you should see the messages in the
kafka topic,
>> >> >> >> *wikipedia-edits*
>> >> >> >> > > >
>> >> >> >> > > > Thanks. Let me know if you have any luck
. :)
>> >> >> >> > > >
>> >> >> >> > > > Cheers,
>> >> >> >> > > >
>> >> >> >> > > > Fang, Yan
>> >> >> >> > > > yanfang724@gmail.com
>> >> >> >> > > > +1 (206) 849-4108
>> >> >> >> > > >
>> >> >> >> > > >
>> >> >> >> > > > On Thu, Sep 4, 2014 at 11:19 PM, Shekar
Tippur
>> >> >><ctippur@gmail.com
>> >> >> >
>> >> >> >> > > wrote:
>> >> >> >> > > >
>> >> >> >> > > > > Just tried #3. Changed the property
file
>> >> >> >>wikipedia-feed.properties
>> >> >> >> > > > >
>> >> >> >> > > > >
>> >>job.factory.class=org.apache.samza.job.local.LocalJobFactory
>> >> >> >> > > > > Ran ..
>> >> >> >> > > > >
>> >> >> >> > > > > deploy/samza/bin/run-job.sh
>> >> >> >> > > > >
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>--config-factory=org.apache.samza.config.factories.PropertiesConfig
>>>>>>>>Fa
>> >>>>>>ct
>> >> >>>>or
>> >> >> >>y
>> >> >> >> > > > >
>> >> >> >> > > > >
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>--config-path=file:///home/ctippur/hello-samza/deploy/samza/config/
>>>>>>>>wi
>> >>>>>>ki
>> >> >>>>pe
>> >> >> >>dia-feed.properties
>> >> >> >> > > > >
>> >> >> >> > > > > I dont see any debug messages that
I added to the feed
>>or
>> >>the
>> >> >> >> parser
>> >> >> >> > > > file..
>> >> >> >> > > > > I see messages on the kafka-consumer
..
>> >> >> >> > > > >
>> >> >> >> > > > > However the feed job died with the
below message
>> >> >> >> > > > >
>> >> >> >> > > > >
>> >> >> >> > > > > Exception in thread "ThreadJob"
>> >>java.lang.RuntimeException:
>> >> >> >>Trying
>> >> >> >> to
>> >> >> >> > > > > unlisten to a channel that has no
listeners in it.
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >> > > > >
>> >> >> >> > > > >
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>samza.examples.wikipedia.system.WikipediaFeed.unlisten(WikipediaFee
>>>>>>>>d.
>> >>>>>>ja
>> >> >>>>va
>> >> >> >>:98)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >> > > > >
>> >> >> >> > > > >
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>samza.examples.wikipedia.system.WikipediaConsumer.stop(WikipediaCon
>>>>>>>>su
>> >>>>>>me
>> >> >>>>r.
>> >> >> >>java:72)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >> > > > >
>> >> >> >> > > > >
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(Syste
>>>>>>>>mC
>> >>>>>>on
>> >> >>>>su
>> >> >> >>mers.scala:152)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >> > > > >
>> >> >> >> > > > >
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>org.apache.samza.system.SystemConsumers$$anonfun$stop$2.apply(Syste
>>>>>>>>mC
>> >>>>>>on
>> >> >>>>su
>> >> >> >>mers.scala:152)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >>scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >> > > > >
>> >> >> >> > >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scal
>>>>>>>>a:
>> >>>>>>20
>> >> >>>>6)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >> > > >
>> >> >> >>
>> >>
>> 
>>>>>>org.apache.samza.system.SystemConsumers.stop(SystemConsumers.scala:15
>>>>>>2)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >> > > > >
>> >> >> >> > > > >
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> 
>>>>>>>>org.apache.samza.container.SamzaContainer.shutdownConsumers(SamzaCo
>>>>>>>>nt
>> >>>>>>ai
>> >> >>>>ne
>> >> >> >>r.scala:587)
>> >> >> >> > > > >
>> >> >> >> > > > > at
>> >> >> >> > > >
>> >> >> >>
>> >>
>> 
>>>>>>org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:51
>>>>>>2)
>> >> >> >> > > > >
>> >> >> >> > > > >  at
>> >> >> >> > >
>> >> 
>>>>org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
>> >> >> >> > > > >
>> >> >> >> > > > > - Shekar
>> >> >> >> > > > >
>> >> >> >> > > >
>> >> >> >> > >
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >> >>
>> >>
>> >>
>>
>>


Mime
View raw message