samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shekar Tippur <ctip...@gmail.com>
Subject Re: Samza and sliding window
Date Wed, 01 Jul 2015 22:41:35 GMT
Yan,

yes. I do have it.

- Shekar

On Wed, Jul 1, 2015 at 3:09 PM, Yan Fang <yanfang724@gmail.com> wrote:

> Do you have
>
> serializers.registry.json.class
> =org.apache.samza.serializers.JsonSerdeFactory
>
> in your config file?
>
>
> Fang, Yan
> yanfang724@gmail.com
>
> On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur <ctippur@gmail.com> wrote:
>
> > Yi/Milinda,
> >
> > I am trying to initialize a kv store. I have the following properties
> > defined:
> >
> > stores.store-name.key.serde=json
> >
> > stores.store-name.msg.serde=json
> >
> > stores.store-name.changelog=argos.windowchangelog
> > How do I define a key serde as I am getting this exception:
> >
> > Exception in thread "main" org.apache.samza.SamzaException: Must define a
> > key serde when using key value storage.
> >
> > at
> >
> >
> org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)
> >
> > at
> >
> >
> org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)
> >
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)
> >
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)
> >
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> >
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >
> > at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> >
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)
> >
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)
> >
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> >
> > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> >
> > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> >
> > at
> >
> >
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
> >
> > at scala.collection.SetLike$class.map(SetLike.scala:93)
> >
> > at scala.collection.AbstractSet.map(Set.scala:47)
> >
> > at
> >
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)
> >
> > at
> >
> >
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)
> >
> > at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)
> >
> > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
> >
> > at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> >
> > On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur <ctippur@gmail.com>
> wrote:
> >
> > > Yi,
> > >
> > > My use case is more of the latter. Your explanation makes sense now. I
> > was
> > > also looking into Milinda's wiki. She has a section for Kafka
> > > partition SimplePartitioner, which is simple enough as well.
> > >
> > > Thanks for all the inputs. Let me see what I come up with while
> > > implementing it.
> > >
> > > - Shekar
> > >
> > > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan <nickpan47@gmail.com> wrote:
> > >
> > >> Hi, Shekar,
> > >>
> > >> First, I would like to clarify what you meant by sliding window: is it
> > >> defined as windows with size N and advance step size of 1 (which means
> > >> that
> > >> windows overlap and each input message would contribute to multiple
> > counts
> > >> in different windows)? Or windows with size N and advance step size
> of N
> > >> (i.e. each incoming message only contribute to one counter in a single
> > >> window)?
> > >>
> > >> If your use case falls into the first category, you will need
> something
> > >> more sophisticated as discussed in SAMZA-552. If your use case is the
> > >> second one, there could be a simpler version of SAMZA-552 that you can
> > go
> > >> with:
> > >>
> > >> 1) Initiate a KV-store that uses the application name as the key
> > >> 2) For each incoming message, look for the windows that the message by
> > the
> > >> application name
> > >> 3) Update the counter and update the value in the KV-store based on
> the
> > >> application name
> > >> 4) Every 5 min when window() method is triggered, set all counters to
> > zero
> > >> (this can be done in a lazy way as well, by keeping the last reset
> > >> timestamp in the record in the KV-store, keyed by application name.
> > Then,
> > >> resetting counter to zero can be done when next time the application
> > >> counter is updated again)
> > >>
> > >> Hope that makes sense.
> > >>
> > >> -Yi
> > >>
> > >> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur <ctippur@gmail.com>
> > >> wrote:
> > >>
> > >> > Benjamin,
> > >> >
> > >> > Thanks for the explanation. We dont have any specific partition
> scheme
> > >> as
> > >> > yet. We just have 2 topics - raw and processed and we use default
> > >> > partitioning scheme.
> > >> > Can you share any code snippet so I can understand it better?
> > >> >
> > >> > - Shekar
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message