storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nathanm...@apache.org
Subject [1/2] git commit: adds capability to remove keys from memorymapstate (maintains proper opaque semantics)
Date Tue, 25 Mar 2014 01:54:50 GMT
Repository: incubator-storm
Updated Branches:
  refs/heads/master d5dee0ef5 -> c621a6c1b


adds capability to remove keys from memorymapstate (maintains proper opaque semantics)


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/69594018
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/69594018
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/69594018

Branch: refs/heads/master
Commit: 69594018082333f6dc168a3e5e07ee2eb9a75c8f
Parents: a2be522
Author: nathanmarz <nathan@nathanmarz.com>
Authored: Mon Mar 17 14:07:45 2014 -0700
Committer: nathanmarz <nathan@nathanmarz.com>
Committed: Mon Mar 17 14:07:45 2014 -0700

----------------------------------------------------------------------
 .../jvm/storm/trident/state/map/OpaqueMap.java  |  6 +++-
 .../trident/state/map/RemovableMapState.java    |  8 +++++
 .../storm/trident/testing/MemoryMapState.java   | 27 +++++++++++++++-
 .../test/clj/storm/trident/state_test.clj       | 33 +++++++++++++++++++-
 4 files changed, 71 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/69594018/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java b/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
index cd6766d..12f3083 100644
--- a/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
+++ b/storm-core/src/jvm/storm/trident/state/map/OpaqueMap.java
@@ -43,7 +43,11 @@ public class OpaqueMap<T> implements MapState<T> {
         for(CachedBatchReadsMap.RetVal<OpaqueValue> retval: curr) {
             OpaqueValue val = retval.val;
             if(val!=null) {
-                ret.add((T) val.get(_currTx));
+                if(retval.cached) {
+                    ret.add((T) val.getCurr());
+                } else {
+                    ret.add((T) val.get(_currTx));
+                }
             } else {
                 ret.add(null);
             }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/69594018/storm-core/src/jvm/storm/trident/state/map/RemovableMapState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/state/map/RemovableMapState.java b/storm-core/src/jvm/storm/trident/state/map/RemovableMapState.java
new file mode 100644
index 0000000..cf34f05
--- /dev/null
+++ b/storm-core/src/jvm/storm/trident/state/map/RemovableMapState.java
@@ -0,0 +1,8 @@
+package storm.trident.state.map;
+
+import java.util.List;
+import storm.trident.state.State;
+
+public interface RemovableMapState<T> extends State {
+    void multiRemove(List<List<Object>> keys);
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/69594018/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java b/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
index 5df99f7..fd38900 100644
--- a/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
+++ b/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java
@@ -30,10 +30,13 @@ import storm.trident.state.ValueUpdater;
 import storm.trident.state.map.*;
 import storm.trident.state.snapshot.Snapshottable;
 
-public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection,
MapState<T> {
+public class MemoryMapState<T> implements Snapshottable<T>, ITupleCollection,
MapState<T>, RemovableMapState<T> {
 
     MemoryMapStateBacking<OpaqueValue> _backing;
     SnapshottableMap<T> _delegate;
+    List<List<Object>> _removed = new ArrayList();
+    Long _currTx = null;
+
 
     public MemoryMapState(String id) {
         _backing = new MemoryMapStateBacking(id);
@@ -54,6 +57,11 @@ public class MemoryMapState<T> implements Snapshottable<T>,
ITupleCollection, Ma
 
     public void beginCommit(Long txid) {
         _delegate.beginCommit(txid);
+        if(txid==null || !txid.equals(_currTx)) {
+            _backing.multiRemove(_removed);
+        }
+        _removed = new ArrayList();
+        _currTx = txid;
     }
 
     public void commit(Long txid) {
@@ -76,6 +84,17 @@ public class MemoryMapState<T> implements Snapshottable<T>,
ITupleCollection, Ma
         return _delegate.multiGet(keys);
     }
 
+    @Override
+    public void multiRemove(List<List<Object>> keys) {
+        List nulls = new ArrayList();
+        for(int i=0; i<keys.size(); i++) {
+            nulls.add(null);
+        }
+        // first just set the keys to null, then flag to remove them at beginning of next
commit when we know the current and last value are both null
+        multiPut(keys, nulls);
+        _removed.addAll(keys);
+    }
+
     public static class Factory implements StateFactory {
 
         String _id;
@@ -106,6 +125,12 @@ public class MemoryMapState<T> implements Snapshottable<T>,
ITupleCollection, Ma
             this.db = (Map<List<Object>, T>) _dbs.get(id);
         }
 
+        public void multiRemove(List<List<Object>> keys) {
+            for(List<Object> key: keys) {
+                db.remove(key);
+            }
+        }
+
         @Override
         public List<T> multiGet(List<List<Object>> keys) {
             List<T> ret = new ArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/69594018/storm-core/test/clj/storm/trident/state_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/storm/trident/state_test.clj b/storm-core/test/clj/storm/trident/state_test.clj
index 6e091e3..63e38ca 100644
--- a/storm-core/test/clj/storm/trident/state_test.clj
+++ b/storm-core/test/clj/storm/trident/state_test.clj
@@ -20,10 +20,16 @@
   (:import [storm.trident.state OpaqueValue])
   (:import [storm.trident.state CombinerValueUpdater])
   (:import [storm.trident.state.map TransactionalMap OpaqueMap])
-  (:import [storm.trident.testing MemoryBackingMap])
+  (:import [storm.trident.testing MemoryBackingMap MemoryMapState])
   (:use [storm.trident testing])
   (:use [backtype.storm util]))
 
+(defn single-remove [map key]
+  (-> map (.multiRemove [[key]])))
+
+(defn single-put [map key val]
+  (-> map (.multiPut [[key]] [val])))
+
 (defn single-get [map key]
   (-> map (.multiGet [[key]]) first))
 
@@ -61,7 +67,9 @@
     (is (= nil (single-get map "a")))
     ;; tests that intra-batch caching works
     (is (= 1 (single-update map "a" 1)))
+    (is (= 1 (single-get map "a")))
     (is (= 3 (single-update map "a" 2)))
+    (is (= 3 (single-get map "a")))
     (.commit map 1)
     (.beginCommit map 1)
     (is (= nil (single-get map "a")))
@@ -94,3 +102,26 @@
     (is (= 7 (single-update map "a" 1)))
     (.commit map 2)
     ))
+
+
+(deftest test-memory-map-state-remove
+  (let [map (MemoryMapState. (uuid))]
+    (.beginCommit map 1)
+    (single-put map "a" 1)
+    (single-put map "b" 2)
+    (.commit map 1)
+    (.beginCommit map 2)
+    (single-remove map "a")
+    (is (nil? (single-get map "a")))
+    (is (= 2 (single-get map "b")))
+    (.commit map 2)
+    (.beginCommit map 2)
+    (is (= 1 (single-get map "a")))
+    (is (= 2 (single-get map "b")))
+    (single-remove map "a")
+    (.commit map 2)
+    (.beginCommit map 3)
+    (is (nil? (single-get map "a")))
+    (is (= 2 (single-get map "b")))    
+    (.commit map 3)
+    ))


Mime
View raw message