flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState
Date Thu, 12 Jul 2018 13:46:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541667#comment-16541667
] 

ASF GitHub Bot commented on FLINK-9804:
---------------------------------------

Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6306
  
    @aljoscha Thanks for your quick review, will address your comments while merging.


> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -------------------------------------------------------------
>
>                 Key: FLINK-9804
>                 URL: https://issues.apache.org/jira/browse/FLINK-9804
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.5.1
>            Reporter: Aljoscha Krettek
>            Assignee: Sihua Zhou
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
> 	final int namespace1ElementsNum = 1000;
> 	final int namespace2ElementsNum = 1000;
> 	String fieldName = "get-keys-test";
> 	AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
> 	try {
> 		final String ns1 = "ns1";
> 		MapState<String, Integer> keyedState1 = backend.getPartitionedState(
> 			ns1,
> 			StringSerializer.INSTANCE,
> 			new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
> 		);
> 		for (int key = 0; key < namespace1ElementsNum; key++) {
> 			backend.setCurrentKey(key);
> 			keyedState1.put("he", key * 2);
> 			keyedState1.put("ho", key * 2);
> 		}
> 		final String ns2 = "ns2";
> 		MapState<String, Integer> keyedState2 = backend.getPartitionedState(
> 			ns2,
> 			StringSerializer.INSTANCE,
> 			new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
> 		);
> 		for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum;
key++) {
> 			backend.setCurrentKey(key);
> 			keyedState2.put("he", key * 2);
> 			keyedState2.put("ho", key * 2);
> 		}
> 		// valid for namespace1
> 		try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
> 			PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
> 			for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
> 				assertTrue(actualIterator.hasNext());
> 				assertEquals(expectedKey, actualIterator.nextInt());
> 			}
> 			assertFalse(actualIterator.hasNext());
> 		}
> 		// valid for namespace2
> 		try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
> 			PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
> 			for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum
+ namespace2ElementsNum; expectedKey++) {
> 				assertTrue(actualIterator.hasNext());
> 				assertEquals(expectedKey, actualIterator.nextInt());
> 			}
> 			assertFalse(actualIterator.hasNext());
> 		}
> 	}
> 	finally {
> 		IOUtils.closeQuietly(backend);
> 		backend.dispose();
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message