samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tommy Becker <tobec...@Tivo.com>
Subject Not safe to access KV stores from InitiableTask.init()
Date Mon, 16 Feb 2015 19:17:53 GMT
I have need to do some initial processing of the entries in my KV store on startup before processing
messages.  I put the code into my task's init() method, and although it worked with an empty
KV store/changelog once I have entries in there it bombs with a rather obscure exception:

java.util.NoSuchElementException: key not found: TaskName-Partition 3
at scala.collection.MapLike$class.default(MapLike.scala:228) ~[scala-library-2.10.1.jar:na]
at scala.collection.AbstractMap.default(Map.scala:58) ~[scala-library-2.10.1.jar:na]
at scala.collection.mutable.HashMap.apply(HashMap.scala:64) ~[scala-library-2.10.1.jar:na]
at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) ~[samza-core_2.10-0.8.0.jar:na]
at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) ~[samza-core_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
~[samza-kv_2.10-0.8.0.jar:na]
at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) ~[na:1.8.0_25]
at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36) ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44) ~[samza-kv_2.10-0.8.0.jar:na]
...

After some investigation I see that it's actually not safe to interact with anything that
is going to potentially produce messages from init(), since startTask is called before startProducers
in SamzaContainer.run.  In retrospect I guess that is why a MessageCollector is not passed
to init() but of course writes to the KV store result in messages being sent to the changelog
:/  I guess my question is whether or not this is intended behavior (could we not simply initialize
producers before tasks) and if so, what an alternative might be for my use case.  As it is
currently it seems like all I can do is add an "initProcessingDone" flag to my task and check
it every time a message comes in.

________________________________

This email and any attachments may contain confidential and privileged material for the sole
use of the intended recipient. Any review, copying, or distribution of this email (or any
attachments) by others is prohibited. If you are not the intended recipient, please contact
the sender immediately and permanently delete this email and any attachments. No employee
or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc.
by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.

Mime
View raw message