samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shekar Tippur <ctip...@gmail.com>
Subject Re: Samza as a Caching layer
Date Tue, 02 Sep 2014 18:36:41 GMT
Chris,

Got some time to play around a bit more.
I tried to edit
samza-wikipedia/src/main/java/samza/examples/wikipedia/task/WikipediaFeedStreamTask.java
to add a logger info statement to tap the incoming message. I dont see the
messages being printed to the log file.

Is this the right place to start?

public class WikipediaFeedStreamTask implements StreamTask {

  private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka",
"wikipedia-raw");

  private static final Logger log = LoggerFactory.getLogger
(WikipediaFeedStreamTask.class);

  @Override

  public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) {

    Map<String, Object> outgoingMap =
WikipediaFeedEvent.toMap((WikipediaFeedEvent) envelope.getMessage());

    log.info(envelope.getMessage().toString());

    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));

  }

}


On Mon, Aug 25, 2014 at 9:01 AM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey Shekar,
>
> Your thought process is on the right track. It's probably best to start
> with hello-samza, and modify it to get what you want. To start with,
> you'll want to:
>
> 1. Write a simple StreamTask that just does something silly like just
> print messages that it receives.
> 2. Write a configuration for the job that consumes from just the stream
> (alerts from different sources).
> 3. Run this to make sure you've got it working.
> 4. Now add your table join. This can be either a change-data capture (CDC)
> stream, or via a remote DB call.
>
> That should get you to a point where you've got your job up and running.
> From there, you could create your own Maven project, and migrate your code
> over accordingly.
>
> Cheers,
> Chris
>
> On 8/24/14 1:42 AM, "Shekar Tippur" <ctippur@gmail.com> wrote:
>
> >Chris,
> >
> >I have gone thro the documentation and decided that the option that is
> >most
> >suitable for me is stream-table.
> >
> >I see the following things:
> >
> >1. Point samza to a table (database)
> >2. Point Samza to a stream - Alert stream from different sources
> >3. Join key like a hostname
> >
> >I have Hello Samza working. To extend that to do what my needs are, I am
> >not sure where to start (Needs more code change OR configuration changes
> >OR
> >both)?
> >
> >I have gone thro
> >http://samza.incubator.apache.org/learn/documentation/latest/api/overview
> .
> >html
> >
> >Is my thought process on the right track? Can you please point me to the
> >right direction?
> >
> >- Shekar
> >
> >
> >
> >On Thu, Aug 21, 2014 at 1:08 PM, Shekar Tippur <ctippur@gmail.com> wrote:
> >
> >> Chris,
> >>
> >> This is perfectly good answer. I will start poking more into option #4.
> >>
> >> - Shekar
> >>
> >>
> >> On Thu, Aug 21, 2014 at 1:05 PM, Chris Riccomini <
> >> criccomini@linkedin.com.invalid> wrote:
> >>
> >>> Hey Shekar,
> >>>
> >>> Your two options are really (3) or (4), then. You can either run some
> >>> external DB that holds the data set, and you can query it from a
> >>> StreamTask, or you can use Samza's state store feature to push data
> >>>into a
> >>> stream that you can then store in a partitioned key-value store along
> >>>with
> >>> your StreamTasks. There is some documentation here about the state
> >>>store
> >>> approach:
> >>>
> >>>
> >>>
> >>>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/st
> >>>ate
> >>> -management.html
> >>>
> >>><
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/s
> >>>tate-management.html>
> >>>
> >>>
> >>> (4) is going to require more up front effort from you, since you'll
> >>>have
> >>> to understand how Kafka's partitioning model works, and setup some
> >>> pipeline to push the updates for your state. In the long run, I believe
> >>> it's the better approach, though. Local lookups on a key-value store
> >>> should be faster than doing remote RPC calls to a DB for every message.
> >>>
> >>> I'm sorry I can't give you a more definitive answer. It's really about
> >>> trade-offs.
> >>>
> >>> Cheers,
> >>> Chris
> >>>
> >>> On 8/21/14 12:22 PM, "Shekar Tippur" <ctippur@gmail.com> wrote:
> >>>
> >>> >Chris,
> >>> >
> >>> >A big thanks for a swift response. The data set is huge and the
> >>>frequency
> >>> >is in burst.
> >>> >What do you suggest?
> >>> >
> >>> >- Shekar
> >>> >
> >>> >
> >>> >On Thu, Aug 21, 2014 at 11:05 AM, Chris Riccomini <
> >>> >criccomini@linkedin.com.invalid> wrote:
> >>> >
> >>> >> Hey Shekar,
> >>> >>
> >>> >> This is feasible, and you are on the right thought process.
> >>> >>
> >>> >> For the sake of discussion, I'm going to pretend that you have
a
> >>>Kafka
> >>> >> topic called "PageViewEvent", which has just the IP address that
was
> >>> >>used
> >>> >> to view a page. These messages will be logged every time a page
view
> >>> >> happens. I'm also going to pretend that you have some state called
> >>> >>"IPGeo"
> >>> >> (e.g. The maxmind data set). In this example, we'll want to join
the
> >>> >> long/lat geo information from IPGeo to the PageViewEvent, and send
> >>>it
> >>> >>to a
> >>> >> new topic: PageViewEventsWithGeo.
> >>> >>
> >>> >> You have several options on how to implement this example.
> >>> >>
> >>> >> 1. If your joining data set (IPGeo) is relatively small and changes
> >>> >> infrequently, you can just pack it up in your jar or .tgz file,
and
> >>> open
> >>> >> it open in every StreamTask.
> >>> >> 2. If your data set is small, but changes somewhat frequently,
you
> >>>can
> >>> >> throw the data set on some HTTP/HDFS/S3 server somewhere, and have
> >>>your
> >>> >> StreamTask refresh it periodically by re-downloading it.
> >>> >> 3. You can do remote RPC calls for the IPGeo data on every page
view
> >>> >>event
> >>> >> by query some remote service or DB (e.g. Cassandra).
> >>> >> 4. You can use Samza's state feature to set your IPGeo data as
a
> >>>series
> >>> >>of
> >>> >> messages to a log-compacted Kafka topic
> >>> >> (https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction),
> >>> and
> >>> >> configure your Samza job to read this topic as a bootstrap stream
> >>> >> (
> >>> >>
> >>> >>
> >>>
> >>>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/st
> >>>r
> >>> >>e
> >>> >> ams.html).
> >>> >>
> >>> >> For (4), you'd have to partition the IPGeo state topic according
to
> >>>the
> >>> >> same key as PageViewEvent. If PageViewEvent were partitioned by,
> >>>say,
> >>> >> member ID, but you want your IPGeo state topic to be partitioned
by
> >>>IP
> >>> >> address, then you'd have to have an upstream job that re-partitioned
> >>> >> PageViewEvent into some new topic by IP address. This new topic
will
> >>> >>have
> >>> >> to have the same number of partitions as the IPGeo state topic
(if
> >>> IPGeo
> >>> >> has 8 partitions, then the new PageViewEventRepartitioned topic
> >>>needs 8
> >>> >>as
> >>> >> well). This will cause your PageViewEventRepartitioned topic and
> >>>your
> >>> >> IPGeo state topic to be aligned such that the StreamTask that gets
> >>>page
> >>> >> views for IP address X will also have the IPGeo information for
IP
> >>> >>address
> >>> >> X.
> >>> >>
> >>> >> Which strategy you pick is really up to you. :) (4) is the most
> >>> >> complicated, but also the most flexible, and most operationally
> >>>sound.
> >>> >>(1)
> >>> >> is the easiest if it fits your needs.
> >>> >>
> >>> >> Cheers,
> >>> >> Chris
> >>> >>
> >>> >> On 8/21/14 10:15 AM, "Shekar Tippur" <ctippur@gmail.com>
wrote:
> >>> >>
> >>> >> >Hello,
> >>> >> >
> >>> >> >I am new to Samza. I have just installed Hello Samza and got
it
> >>> >>working.
> >>> >> >
> >>> >> >Here is the use case for which I am trying to use Samza:
> >>> >> >
> >>> >> >
> >>> >> >1. Cache the contextual information which contains more information
> >>> >>about
> >>> >> >the hostname or IP address using Samza/Yarn/Kafka
> >>> >> >2. Collect alert and metric events which contain either hostname
> >>>or IP
> >>> >> >address
> >>> >> >3. Append contextual information to the alert and metric and
> >>>insert to
> >>> >>a
> >>> >> >Kafka queue from which other subscribers read off of.
> >>> >> >
> >>> >> >Can you please shed some light on
> >>> >> >
> >>> >> >1. Is this feasible?
> >>> >> >2. Am I on the right thought process
> >>> >> >3. How do I start
> >>> >> >
> >>> >> >I now have 1 & 2 of them working disparately. I need to
integrate
> >>> them.
> >>> >> >
> >>> >> >Appreciate any input.
> >>> >> >
> >>> >> >- Shekar
> >>> >>
> >>> >>
> >>>
> >>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message