flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9887) Ensure serializer upgrades work with timer service remake
Date Thu, 02 Aug 2018 09:48:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566556#comment-16566556
] 

ASF GitHub Bot commented on FLINK-9887:
---------------------------------------

tzulitai commented on a change in pull request #6467: [FLINK-9887] Ensure serializer upgrades
work with timer service remake
URL: https://github.com/apache/flink/pull/6467#discussion_r207164541
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ##########
 @@ -1093,23 +1131,123 @@ public void testSerializerPresenceOnRestore() throws Exception {
 			backend.setCurrentKey(2);
 			state.update(new TestCustomStateClass("new-test-message-2", "extra-message-2"));
 
-			KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
-				682375462379L,
-				3,
-				streamFactory,
-				CheckpointOptions.forCheckpointWithDefaultLocation()));
+			KeyedStateHandle snapshot2 = runSnapshot(
+				backend.snapshot(
+					682375462379L,
+					3,
+					streamFactory,
+					CheckpointOptions.forCheckpointWithDefaultLocation()),
+				sharedStateRegistry);
 
-			snapshot2.registerSharedStates(sharedStateRegistry);
 			snapshot1.discardState();
 		} finally {
 			backend.dispose();
 		}
 	}
 
+	@Test
+	public void testPriorityQueueSerializerUpdates() throws Exception {
+
+		final String stateName = "test";
+		final CheckpointStreamFactory streamFactory = createStreamFactory();
+		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+
+		AbstractKeyedStateBackend<Integer> keyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		try {
+			TypeSerializer<InternalPriorityQueueTestBase.TestElement> serializer =
+				InternalPriorityQueueTestBase.TestElementSerializer.INSTANCE;
+
+			KeyGroupedInternalPriorityQueue<InternalPriorityQueueTestBase.TestElement> priorityQueue
=
+				keyedBackend.create(stateName, serializer);
+
+			priorityQueue.add(new InternalPriorityQueueTestBase.TestElement(42L, 0L));
+
+			RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
+				keyedBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
+
+			KeyedStateHandle keyedStateHandle = runSnapshot(snapshot, sharedStateRegistry);
+
+			keyedBackend.dispose();
+
+			// test restore with a modified but compatible serializer ---------------------------
+
+			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
+
+			serializer = new ModifiedTestElementSerializer();
+
+			priorityQueue = keyedBackend.create(stateName, serializer);
+
+			final InternalPriorityQueueTestBase.TestElement checkElement =
+				new InternalPriorityQueueTestBase.TestElement(4711L, 1L);
+			priorityQueue.add(checkElement);
+
+			snapshot = keyedBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
+
+			keyedStateHandle = runSnapshot(snapshot, sharedStateRegistry);
+
+			keyedBackend.dispose();
+
+			// test that the modified serializer was actually used ---------------------------
+
+			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
+			priorityQueue = keyedBackend.create(stateName, serializer);
+
+			priorityQueue.poll();
+
+			ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos();
+			DataOutputViewStreamWrapper outWrapper = new DataOutputViewStreamWrapper(out);
+			serializer.serialize(checkElement, outWrapper);
+			InternalPriorityQueueTestBase.TestElement expected =
+				serializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(out.toByteArray())));
+
+			Assert.assertEquals(
+				expected,
+				priorityQueue.poll());
+			Assert.assertTrue(priorityQueue.isEmpty());
+
+			keyedBackend.dispose();
+
+			// test that incompatible serializer is rejected ---------------------------
+
+			serializer = InternalPriorityQueueTestBase.TestElementSerializer.INSTANCE;
+			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
 
 Review comment:
   This fails because the revision count for the original `TestElementSerializer` is now smaller
than the new revision of the modified one, correct?
   
   nit: I would add a comment here to make that a bit more explicit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Ensure serializer upgrades work with timer service remake
> ---------------------------------------------------------
>
>                 Key: FLINK-9887
>                 URL: https://issues.apache.org/jira/browse/FLINK-9887
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message