storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/2] storm git commit: STORM-2449 Ensure same key appears only once in State iterator
Date Tue, 20 Jun 2017 11:34:20 GMT
Repository: storm
Updated Branches:
  refs/heads/master b59e49b2d -> 6fe323d27


STORM-2449 Ensure same key appears only once in State iterator

* also don't expose TOMBSTONE keys


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fdf63ce9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fdf63ce9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fdf63ce9

Branch: refs/heads/master
Commit: fdf63ce9a619ee24b52435e8f5945304912ad3ea
Parents: 38e997e
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Fri Jun 16 15:39:45 2017 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Fri Jun 16 15:39:45 2017 +0900

----------------------------------------------------------------------
 .../redis/state/RedisKeyValueStateIterator.java | 131 ++++++---
 .../state/RedisKeyValueStateIteratorTest.java   | 264 +++++++++++++++++++
 2 files changed, 360 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fdf63ce9/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
index d3f9d2f..748e852 100644
--- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
+++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueStateIterator.java
@@ -15,19 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.redis.state;
 
-import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
 
 import java.util.AbstractMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
 import org.apache.storm.redis.utils.RedisEncoder;
-import org.apache.storm.state.DefaultStateSerializer;
 import org.apache.storm.state.Serializer;
 
 import redis.clients.jedis.JedisCommands;
@@ -35,43 +38,82 @@ import redis.clients.jedis.ScanParams;
 import redis.clients.jedis.ScanResult;
 
 /**
- * An iterator over {@link RedisKeyValueState}
+ * An iterator over {@link RedisKeyValueState}.
  */
 public class RedisKeyValueStateIterator<K, V> implements Iterator<Map.Entry<K,
V>> {
 
     private final String namespace;
-    private final Iterator<Map.Entry<String, String>> pendingPrepareIterator;
-    private final Iterator<Map.Entry<String, String>> pendingCommitIterator;
+    private final PeekingIterator<Map.Entry<String, String>> pendingPrepareIterator;
+    private final PeekingIterator<Map.Entry<String, String>> pendingCommitIterator;
     private final RedisEncoder<K, V> decoder;
     private final JedisCommandsInstanceContainer jedisContainer;
     private final ScanParams scanParams;
-    private Iterator<Map.Entry<String, String>> pendingIterator;
+    private final Set<String> providedKeys;
+
+    private PeekingIterator<Map.Entry<String, String>> cachedResultIterator;
     private String cursor;
-    private List<Map.Entry<String, String>> cachedResult;
-    private int readPosition;
+    private boolean firstLoad = true;
+    private PeekingIterator<Map.Entry<String, String>> pendingIterator;
 
-    public RedisKeyValueStateIterator(String namespace, JedisCommandsInstanceContainer jedisContainer,
Iterator<Map.Entry<String, String>> pendingPrepareIterator, Iterator<Map.Entry<String,
String>> pendingCommitIterator, int chunkSize, Serializer<K> keySerializer, Serializer<V>
valueSerializer) {
+    /**
+     * Constructor.
+     *
+     * @param namespace The namespace of State
+     * @param jedisContainer The instance of JedisContainter
+     * @param pendingPrepareIterator The iterator of pendingPrepare
+     * @param pendingCommitIterator The iterator of pendingCommit
+     * @param chunkSize The size of chunk to get entries from Redis
+     * @param keySerializer The serializer of key
+     * @param valueSerializer The serializer of value
+     */
+    public RedisKeyValueStateIterator(String namespace, JedisCommandsInstanceContainer jedisContainer,
+                                      Iterator<Map.Entry<String, String>> pendingPrepareIterator,
+                                      Iterator<Map.Entry<String, String>> pendingCommitIterator,
+                                      int chunkSize, Serializer<K> keySerializer,
+                                      Serializer<V> valueSerializer) {
         this.namespace = namespace;
-        this.pendingPrepareIterator = pendingPrepareIterator;
-        this.pendingCommitIterator = pendingCommitIterator;
+        this.pendingPrepareIterator = Iterators.peekingIterator(pendingPrepareIterator);
+        this.pendingCommitIterator = Iterators.peekingIterator(pendingCommitIterator);
         this.jedisContainer = jedisContainer;
         this.decoder = new RedisEncoder<K, V>(keySerializer, valueSerializer);
         this.scanParams = new ScanParams().count(chunkSize);
         this.cursor = ScanParams.SCAN_POINTER_START;
+        this.providedKeys = new HashSet<>();
     }
 
     @Override
     public boolean hasNext() {
-        if (pendingPrepareIterator != null && pendingPrepareIterator.hasNext()) {
+        if (seekToAvailableEntry(pendingPrepareIterator)) {
             pendingIterator = pendingPrepareIterator;
             return true;
-        } else if (pendingCommitIterator != null && pendingCommitIterator.hasNext())
{
+        }
+
+        if (seekToAvailableEntry(pendingCommitIterator)) {
             pendingIterator = pendingCommitIterator;
             return true;
-        } else {
-            pendingIterator = null;
-            return !cursor.equals("0");
         }
+
+        if (firstLoad) {
+            // load the first part of entries
+            loadChunkFromRedis();
+            firstLoad = false;
+        }
+
+        while (true) {
+            if (seekToAvailableEntry(cachedResultIterator)) {
+                pendingIterator = cachedResultIterator;
+                return true;
+            }
+
+            if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
+                break;
+            }
+
+            loadChunkFromRedis();
+        }
+
+        pendingIterator = null;
+        return false;
     }
 
     @Override
@@ -79,27 +121,11 @@ public class RedisKeyValueStateIterator<K, V> implements Iterator<Map.Entry<K,
V
         if (!hasNext()) {
             throw new NoSuchElementException();
         }
-        Map.Entry<String, String> redisKeyValue = null;
-        if (pendingIterator != null) {
-            redisKeyValue = pendingIterator.next();
-        } else {
-            if (cachedResult == null || readPosition >= cachedResult.size()) {
-                JedisCommands commands = null;
-                try {
-                    commands = jedisContainer.getInstance();
-                    ScanResult<Map.Entry<String, String>> scanResult = commands.hscan(namespace,
cursor, scanParams);
-                    cachedResult = scanResult.getResult();
-                    cursor = scanResult.getStringCursor();
-                    readPosition = 0;
-                } finally {
-                    jedisContainer.returnInstance(commands);
-                }
-            }
-            redisKeyValue = cachedResult.get(readPosition);
-            readPosition += 1;
-        }
+        Map.Entry<String, String> redisKeyValue = pendingIterator.next();
         K key = decoder.decodeKey(redisKeyValue.getKey());
         V value = decoder.decodeValue(redisKeyValue.getValue());
+
+        providedKeys.add(redisKeyValue.getKey());
         return new AbstractMap.SimpleEntry(key, value);
     }
 
@@ -107,4 +133,39 @@ public class RedisKeyValueStateIterator<K, V> implements Iterator<Map.Entry<K,
V
     public void remove() {
         throw new UnsupportedOperationException();
     }
+
+    private boolean seekToAvailableEntry(PeekingIterator<Map.Entry<String, String>>
iterator) {
+        if (iterator != null) {
+            while (iterator.hasNext()) {
+                Map.Entry<String, String> entry = iterator.peek();
+                if (!providedKeys.contains(entry.getKey())) {
+                    if (entry.getValue().equals(RedisEncoder.TOMBSTONE)) {
+                        providedKeys.add(entry.getKey());
+                    } else {
+                        return true;
+                    }
+                }
+
+                iterator.next();
+            }
+        }
+
+        return false;
+    }
+
+    private void loadChunkFromRedis() {
+        JedisCommands commands = null;
+        try {
+            commands = jedisContainer.getInstance();
+            ScanResult<Map.Entry<String, String>> scanResult = commands.hscan(namespace,
cursor, scanParams);
+            List<Map.Entry<String, String>> result = scanResult.getResult();
+            if (result != null) {
+                cachedResultIterator = Iterators.peekingIterator(result.iterator());
+            }
+            cursor = scanResult.getStringCursor();
+        } finally {
+            jedisContainer.returnInstance(commands);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdf63ce9/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
new file mode 100644
index 0000000..ddf4e19
--- /dev/null
+++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateIteratorTest.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.redis.state;
+
+import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
+import org.apache.storm.redis.utils.RedisEncoder;
+import org.apache.storm.state.DefaultStateSerializer;
+import org.apache.storm.state.Serializer;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanParams;
+import redis.clients.jedis.ScanResult;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for RedisKeyValueStateIterator.
+ */
+public class RedisKeyValueStateIteratorTest {
+
+    private String namespace;
+    private JedisCommandsInstanceContainer mockContainer;
+    private Jedis mockJedis;
+    private int chunkSize = 1000;
+    private Serializer<String> keySerializer = new DefaultStateSerializer<>();
+    private Serializer<String> valueSerializer = new DefaultStateSerializer<>();
+    private RedisEncoder<String, String> encoder;
+
+    @Before
+    public void setUp() {
+        namespace = "namespace";
+        mockContainer = mock(JedisCommandsInstanceContainer.class);
+        mockJedis = mock(Jedis.class);
+        when(mockContainer.getInstance()).thenReturn(mockJedis);
+
+        encoder = new RedisEncoder<>(keySerializer, valueSerializer);
+    }
+
+    @Test
+    public void testGetEntriesFromPendingPrepare() {
+        Map<String, String> pendingPrepare = new TreeMap<>();
+        putEncodedKeyValueToMap(pendingPrepare, "key0", "value0");
+        putTombstoneToMap(pendingPrepare, "key1");
+        putEncodedKeyValueToMap(pendingPrepare, "key2", "value2");
+
+        Map<String, String> pendingCommit = new TreeMap<>();
+
+        ScanResult<Map.Entry<String, String>> scanResult = new ScanResult<Map.Entry<String,
String>>(
+                ScanParams.SCAN_POINTER_START, new ArrayList<>());
+        when(mockJedis.hscan(eq(namespace), anyString(), any(ScanParams.class))).thenReturn(scanResult);
+
+        RedisKeyValueStateIterator<String, String> kvIterator =
+                new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        assertNextEntry(kvIterator, "key0", "value0");
+
+        // key1 shouldn't in iterator
+
+        assertNextEntry(kvIterator, "key2", "value2");
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    @Test
+    public void testGetEntriesFromPendingCommit() {
+        Map<String, String> pendingPrepare = new TreeMap<>();
+
+        Map<String, String> pendingCommit = new TreeMap<>();
+        putEncodedKeyValueToMap(pendingCommit, "key0", "value0");
+        putTombstoneToMap(pendingCommit, "key1");
+        putEncodedKeyValueToMap(pendingCommit, "key2", "value2");
+
+        ScanResult<Map.Entry<String, String>> scanResult = new ScanResult<Map.Entry<String,
String>>(
+                ScanParams.SCAN_POINTER_START, new ArrayList<>());
+        when(mockJedis.hscan(eq(namespace), anyString(), any(ScanParams.class))).thenReturn(scanResult);
+
+        RedisKeyValueStateIterator<String, String> kvIterator =
+                new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        assertNextEntry(kvIterator, "key0", "value0");
+
+        // key1 shouldn't in iterator
+
+        assertNextEntry(kvIterator, "key2", "value2");
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    @Test
+    public void testGetEntriesFromFirstPartOfChunkInRedis() {
+        // pendingPrepare has no entries
+        Map<String, String> pendingPrepare = new TreeMap<>();
+
+        // pendingCommit has no entries
+        Map<String, String> pendingCommit = new TreeMap<>();
+
+        // Redis has a chunk but no more
+        Map<String, String> chunkMap = new TreeMap<>();
+        putEncodedKeyValueToMap(chunkMap, "key0", "value0");
+        putEncodedKeyValueToMap(chunkMap, "key2", "value2");
+
+        ScanResult<Map.Entry<String, String>> scanResultFirst = new ScanResult<>(
+                "12345", new ArrayList<>(chunkMap.entrySet()));
+        ScanResult<Map.Entry<String, String>> scanResultSecond = new ScanResult<>(
+                ScanParams.SCAN_POINTER_START, new ArrayList<>());
+        when(mockJedis.hscan(eq(namespace), anyString(), any(ScanParams.class)))
+                .thenReturn(scanResultFirst, scanResultSecond);
+
+        RedisKeyValueStateIterator<String, String> kvIterator =
+                new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        assertNextEntry(kvIterator, "key0", "value0");
+
+        // key1 shouldn't in iterator
+
+        assertNextEntry(kvIterator, "key2", "value2");
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    @Test
+    public void testGetEntriesFromThirdPartOfChunkInRedis() {
+        // pendingPrepare has no entries
+        Map<String, String> pendingPrepare = new TreeMap<>();
+
+        // pendingCommit has no entries
+        Map<String, String> pendingCommit = new TreeMap<>();
+
+        // Redis has three chunks which last chunk only has entries
+        Map<String, String> chunkMap = new TreeMap<>();
+        putEncodedKeyValueToMap(chunkMap, "key0", "value0");
+        putEncodedKeyValueToMap(chunkMap, "key2", "value2");
+
+        ScanResult<Map.Entry<String, String>> scanResultFirst = new ScanResult<>(
+                "12345", new ArrayList<>());
+        ScanResult<Map.Entry<String, String>> scanResultSecond = new ScanResult<>(
+                "23456", new ArrayList<>());
+        ScanResult<Map.Entry<String, String>> scanResultThird = new ScanResult<>(
+                ScanParams.SCAN_POINTER_START, new ArrayList<>(chunkMap.entrySet()));
+        when(mockJedis.hscan(eq(namespace), anyString(), any(ScanParams.class)))
+                .thenReturn(scanResultFirst, scanResultSecond, scanResultThird);
+
+        RedisKeyValueStateIterator<String, String> kvIterator =
+                new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        assertNextEntry(kvIterator, "key0", "value0");
+
+        // key1 shouldn't in iterator
+
+        assertNextEntry(kvIterator, "key2", "value2");
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    @Test
+    public void testGetEntriesRemovingDuplicationKeys() {
+        Map<String, String> pendingPrepare = new TreeMap<>();
+        putEncodedKeyValueToMap(pendingPrepare, "key0", "value0");
+        putTombstoneToMap(pendingPrepare, "key1");
+
+        Map<String, String> pendingCommit = new TreeMap<>();
+        putEncodedKeyValueToMap(pendingCommit, "key1", "value1");
+        putEncodedKeyValueToMap(pendingCommit, "key2", "value2");
+
+        Map<String, String> chunkMap = new TreeMap<>();
+        putEncodedKeyValueToMap(chunkMap, "key2", "value2");
+        putEncodedKeyValueToMap(chunkMap, "key3", "value3");
+
+        Map<String, String> chunkMap2 = new TreeMap<>();
+        putEncodedKeyValueToMap(chunkMap2, "key3", "value3");
+        putEncodedKeyValueToMap(chunkMap2, "key4", "value4");
+
+        ScanResult<Map.Entry<String, String>> scanResultFirst = new ScanResult<>(
+                "12345", new ArrayList<>(chunkMap.entrySet()));
+        ScanResult<Map.Entry<String, String>> scanResultSecond = new ScanResult<>(
+                "23456", new ArrayList<>(chunkMap2.entrySet()));
+        ScanResult<Map.Entry<String, String>> scanResultThird = new ScanResult<>(
+                ScanParams.SCAN_POINTER_START, new ArrayList<>());
+        when(mockJedis.hscan(eq(namespace), anyString(), any(ScanParams.class)))
+                .thenReturn(scanResultFirst, scanResultSecond, scanResultThird);
+
+        RedisKeyValueStateIterator<String, String> kvIterator =
+                new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        // keys shouldn't appear twice
+
+        assertNextEntry(kvIterator, "key0", "value0");
+
+        // key1 shouldn't be in iterator since it's marked as deleted
+
+        assertNextEntry(kvIterator, "key2", "value2");
+        assertNextEntry(kvIterator, "key3", "value3");
+        assertNextEntry(kvIterator, "key4", "value4");
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    @Test
+    public void testGetEntryNotAvailable() {
+        Map<String, String> pendingPrepare = new TreeMap<>();
+
+        Map<String, String> pendingCommit = new TreeMap<>();
+
+        ScanResult<Map.Entry<String, String>> scanResult = new ScanResult<>(
+                ScanParams.SCAN_POINTER_START, new ArrayList<>());
+        when(mockJedis.hscan(eq(namespace), anyString(), any(ScanParams.class)))
+                .thenReturn(scanResult);
+
+        RedisKeyValueStateIterator<String, String> kvIterator =
+                new RedisKeyValueStateIterator<>(namespace, mockContainer, pendingPrepare.entrySet().iterator(),
+                        pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer);
+
+        assertFalse(kvIterator.hasNext());
+    }
+
+    private void assertNextEntry(RedisKeyValueStateIterator<String, String> kvIterator,
String expectedKey,
+                                 String expectedValue) {
+        assertTrue(kvIterator.hasNext());
+        Map.Entry<String, String> entry = kvIterator.next();
+        assertEquals(expectedKey, entry.getKey());
+        assertEquals(expectedValue, entry.getValue());
+    }
+
+    private void putEncodedKeyValueToMap(Map<String, String> map, String key, String
value) {
+        map.put(encoder.encodeKey(key), encoder.encodeValue(value));
+    }
+
+    private void putTombstoneToMap(Map<String, String> map, String key) {
+        map.put(encoder.encodeKey(key), RedisEncoder.TOMBSTONE);
+    }
+}
\ No newline at end of file


Mime
View raw message