samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shekar Tippur <ctip...@gmail.com>
Subject Re: Samza and sliding window
Date Sat, 04 Jul 2015 07:52:33 GMT
Any takers on this please?

- Shekar

On Fri, Jul 3, 2015 at 9:46 AM, Shekar Tippur <ctippur@gmail.com> wrote:

> Any answer on how to get all the kv values and reinitialise the kv store?
>
> Had one more question on implementing sliding window.
>
> If i use a kv store like rocksdb, and I use yarn (say 3 node cluster), the
> job that it runs to aggregate gets distributed as well and I am guessing
> the aggregation numbers get skewed? Is that a right assessment?
>
> On Thu, Jul 2, 2015 at 5:47 PM, Shekar Tippur <ctippur@gmail.com> wrote:
>
>> Also, next.getValue() or next.getKey() does not yield anything.
>>
>>    KeyValueIterator<String, String> i = store.all();
>>
>>   while(i.hasNext()){
>>
>>        Entry <String, String> next = i.next();
>>
>>        log.info("Removed Key", next.getValue());
>>
>>   }
>>
>> On Thu, Jul 2, 2015 at 5:36 PM, Shekar Tippur <ctippur@gmail.com> wrote:
>>
>>> Yi,
>>>
>>> There is no exception. I want to do couple of things in the window.
>>>
>>> - Get all the keys and values and publish to another store (like
>>> graphite) as a list
>>> - Remove all entries.
>>>
>>> I can iterate thro the list later but I want to be able to get all kv
>>> values and delete all of them in an atomic operation.
>>>
>>> How do I do these operations on the kv store?
>>>
>>> - S
>>>
>>>
>>>
>>>
>>> On Thu, Jul 2, 2015 at 4:59 PM, Yi Pan <nickpan47@gmail.com> wrote:
>>>
>>>> Hi, Shekar,
>>>>
>>>> Sorry I was not able to follow up w/ you in time. It is great that you
>>>> have
>>>> found the configure problem and made it work!
>>>>
>>>> As for the exception on the iterator, could you send us the log w/ the
>>>> exception?
>>>>
>>>> Thanks!
>>>>
>>>> -Yi
>>>>
>>>> On Thu, Jul 2, 2015 at 4:36 PM, Shekar Tippur <ctippur@gmail.com>
>>>> wrote:
>>>>
>>>> > Yi,
>>>> >
>>>> > Looks like it is working now. There was a redundant line in the
>>>> config.
>>>> >
>>>> > I am able to initialize kv store and add values.
>>>> > In the window code, I am unable to retrieve them and mark them as 0.
>>>> >
>>>> > Here is my window code:
>>>> >
>>>> > public void window(MessageCollector collector,
>>>> >
>>>> >       TaskCoordinator coordinator) {
>>>> >
>>>> >   //store.delete(appName);
>>>> >
>>>> >   collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>>> eventsSeen));
>>>> >
>>>> >   KeyValueIterator<String, String> i = store.all();
>>>> >
>>>> >   while(i.hasNext()){
>>>> >
>>>> >    Entry <String, String> next = i.next();
>>>> >
>>>> >    log.info("Trying to remove Key", next.getKey());
>>>> >
>>>> >    //i.remove();
>>>> >
>>>> >
>>>> >
>>>> >   }
>>>> >
>>>> >   eventsSeen = 0;
>>>> >
>>>> >   i.close();
>>>> >
>>>> >
>>>> >
>>>> >   }
>>>> >
>>>> >
>>>> > How do I retrieve the key and is there a way to remove it? i.remove
>>>> throws
>>>> > an exception.
>>>> >
>>>> >
>>>> > - Shekar
>>>> >
>>>> > On Wed, Jul 1, 2015 at 7:25 PM, Shekar Tippur <ctippur@gmail.com>
>>>> wrote:
>>>> >
>>>> > > Yi,
>>>> > >
>>>> > > Here is my config file:
>>>> > > http://pastebin.com/Kf3C9E0h
>>>> > >
>>>> > > - S
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message