samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yan Fang <yanfang...@gmail.com>
Subject Re: Samza and sliding window
Date Wed, 01 Jul 2015 22:09:00 GMT
Do you have

serializers.registry.json.class
=org.apache.samza.serializers.JsonSerdeFactory

in your config file?


Fang, Yan
yanfang724@gmail.com

On Wed, Jul 1, 2015 at 2:59 PM, Shekar Tippur <ctippur@gmail.com> wrote:

> Yi/Milinda,
>
> I am trying to initialize a kv store. I have the following properties
> defined:
>
> stores.store-name.key.serde=json
>
> stores.store-name.msg.serde=json
>
> stores.store-name.changelog=argos.windowchangelog
> How do I define a key serde as I am getting this exception:
>
> Exception in thread "main" org.apache.samza.SamzaException: Must define a
> key serde when using key value storage.
>
> at
>
> org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86)
>
> at
>
> org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28)
>
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455)
>
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439)
>
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439)
>
> at
>
> org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416)
>
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at
>
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
>
> at scala.collection.SetLike$class.map(SetLike.scala:93)
>
> at scala.collection.AbstractSet.map(Set.scala:47)
>
> at
> org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416)
>
> at
>
> org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63)
>
> at org.apache.samza.job.JobRunner.run(JobRunner.scala:62)
>
> at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37)
>
> at org.apache.samza.job.JobRunner.main(JobRunner.scala)
>
> On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur <ctippur@gmail.com> wrote:
>
> > Yi,
> >
> > My use case is more of the latter. Your explanation makes sense now. I
> was
> > also looking into Milinda's wiki. She has a section for Kafka
> > partition SimplePartitioner, which is simple enough as well.
> >
> > Thanks for all the inputs. Let me see what I come up with while
> > implementing it.
> >
> > - Shekar
> >
> > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan <nickpan47@gmail.com> wrote:
> >
> >> Hi, Shekar,
> >>
> >> First, I would like to clarify what you meant by sliding window: is it
> >> defined as windows with size N and advance step size of 1 (which means
> >> that
> >> windows overlap and each input message would contribute to multiple
> counts
> >> in different windows)? Or windows with size N and advance step size of N
> >> (i.e. each incoming message only contribute to one counter in a single
> >> window)?
> >>
> >> If your use case falls into the first category, you will need something
> >> more sophisticated as discussed in SAMZA-552. If your use case is the
> >> second one, there could be a simpler version of SAMZA-552 that you can
> go
> >> with:
> >>
> >> 1) Initiate a KV-store that uses the application name as the key
> >> 2) For each incoming message, look for the windows that the message by
> the
> >> application name
> >> 3) Update the counter and update the value in the KV-store based on the
> >> application name
> >> 4) Every 5 min when window() method is triggered, set all counters to
> zero
> >> (this can be done in a lazy way as well, by keeping the last reset
> >> timestamp in the record in the KV-store, keyed by application name.
> Then,
> >> resetting counter to zero can be done when next time the application
> >> counter is updated again)
> >>
> >> Hope that makes sense.
> >>
> >> -Yi
> >>
> >> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur <ctippur@gmail.com>
> >> wrote:
> >>
> >> > Benjamin,
> >> >
> >> > Thanks for the explanation. We dont have any specific partition scheme
> >> as
> >> > yet. We just have 2 topics - raw and processed and we use default
> >> > partitioning scheme.
> >> > Can you share any code snippet so I can understand it better?
> >> >
> >> > - Shekar
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message