samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@apache.org>
Subject Re: Not safe to access KV stores from InitiableTask.init()
Date Mon, 16 Feb 2015 19:39:43 GMT
Hey Tommy,

This sounds broken. Let me have a look and see if there's an easy fix. I
*think* reordering should work, but I just want to make sure.

Could you open a JIRA and set the fixed version to 0.9.0? I'll take a looks
today/tomorrow. If you want to test out reordering it, please share any
findings. :)

Cheers,
Chris

On Monday, February 16, 2015, Tommy Becker <tobecker@tivo.com> wrote:

> I have need to do some initial processing of the entries in my KV store on
> startup before processing messages.  I put the code into my task's init()
> method, and although it worked with an empty KV store/changelog once I have
> entries in there it bombs with a rather obscure exception:
>
> java.util.NoSuchElementException: key not found: TaskName-Partition 3
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.AbstractMap.default(Map.scala:58)
> ~[scala-library-2.10.1.jar:na]
> at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> ~[scala-library-2.10.1.jar:na]
> at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71)
> ~[samza-core_2.10-0.8.0.jar:na]
> at
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
> ~[samza-core_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
> ~[na:1.8.0_25]
> at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
> at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
> at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
> ~[samza-kv_2.10-0.8.0.jar:na]
> at
> org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
> ~[samza-kv_2.10-0.8.0.jar:na]
> ...
>
> After some investigation I see that it's actually not safe to interact
> with anything that is going to potentially produce messages from init(),
> since startTask is called before startProducers in SamzaContainer.run.  In
> retrospect I guess that is why a MessageCollector is not passed to init()
> but of course writes to the KV store result in messages being sent to the
> changelog :/  I guess my question is whether or not this is intended
> behavior (could we not simply initialize producers before tasks) and if so,
> what an alternative might be for my use case.  As it is currently it seems
> like all I can do is add an "initProcessingDone" flag to my task and check
> it every time a message comes in.
>
> ________________________________
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message