flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8993) Add a test operator with keyed state that uses Kryo serializer (registered/unregistered/custom)
Date Mon, 30 Jul 2018 09:45:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561708#comment-16561708
] 

ASF GitHub Bot commented on FLINK-8993:
---------------------------------------

asfgit closed pull request #6413: [FLINK-8993] [tests] Let general purpose DataStream job
uses KryoSerializer via type extraction
URL: https://github.com/apache/flink/pull/6413
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 123807297ef..fb92960bb86 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -22,6 +22,8 @@
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigOption;
@@ -382,13 +384,38 @@ static boolean isSimulateFailures(ParameterTool pt) {
 	static <IN, OUT, STATE> ArtificialKeyedStateMapper<IN, OUT> createArtificialKeyedStateMapper(
 		MapFunction<IN, OUT> mapFunction,
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		List<TypeSerializer<STATE>> stateSerializers) {
+		List<TypeSerializer<STATE>> stateSerializers,
+		List<Class<STATE>> stateClasses) {
 
 		List<ArtificialStateBuilder<IN>> artificialStateBuilders = new ArrayList<>(stateSerializers.size());
 		for (TypeSerializer<STATE> typeSerializer : stateSerializers) {
-			artificialStateBuilders.add(createValueStateBuilder(inputAndOldStateToNewState, typeSerializer));
-			artificialStateBuilders.add(createListStateBuilder(inputAndOldStateToNewState, typeSerializer));
+			artificialStateBuilders.add(createValueStateBuilder(
+				inputAndOldStateToNewState,
+				new ValueStateDescriptor<>(
+					"valueState-" + typeSerializer.getClass().getSimpleName(),
+					typeSerializer)));
+
+			artificialStateBuilders.add(createListStateBuilder(
+				inputAndOldStateToNewState,
+				new ListStateDescriptor<>(
+					"listState-" + typeSerializer.getClass().getSimpleName(),
+					typeSerializer)));
 		}
+
+		for (Class<STATE> stateClass : stateClasses) {
+			artificialStateBuilders.add(createValueStateBuilder(
+				inputAndOldStateToNewState,
+				new ValueStateDescriptor<>(
+					"valueState-" + stateClass.getSimpleName(),
+					stateClass)));
+
+			artificialStateBuilders.add(createListStateBuilder(
+				inputAndOldStateToNewState,
+				new ListStateDescriptor<>(
+					"listState-" + stateClass.getSimpleName(),
+					stateClass)));
+		}
+
 		return new ArtificialKeyedStateMapper<>(mapFunction, artificialStateBuilders);
 	}
 
@@ -400,17 +427,17 @@ static boolean isSimulateFailures(ParameterTool pt) {
 
 	static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder(
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		TypeSerializer<STATE> typeSerializer) {
+		ValueStateDescriptor<STATE> valueStateDescriptor) {
 
 		return new ArtificialValueStateBuilder<>(
-			"valueState-" + typeSerializer.getClass().getSimpleName(),
+			valueStateDescriptor.getName(),
 			inputAndOldStateToNewState,
-			typeSerializer);
+			valueStateDescriptor);
 	}
 
 	static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder(
 		JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState,
-		TypeSerializer<STATE> typeSerializer) {
+		ListStateDescriptor<STATE> listStateDescriptor) {
 
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> listStateGenerator = (first,
second) -> {
 			List<STATE> newState = new ArrayList<>();
@@ -421,9 +448,9 @@ static boolean isSimulateFailures(ParameterTool pt) {
 		};
 
 		return new ArtificialListStateBuilder<>(
-			"listState-" + typeSerializer.getClass().getSimpleName(),
+			listStateDescriptor.getName(),
 			listStateGenerator,
 			listStateGenerator,
-			typeSerializer);
+			listStateDescriptor);
 	}
 }
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index ea90e655103..30c1c24ee3f 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -86,7 +86,8 @@ public static void main(String[] args) throws Exception {
 							return new ComplexPayload(first, KEYED_STATE_OPER_NAME);
 						},
 					Collections.singletonList(
-						new KryoSerializer<>(ComplexPayload.class, env.getConfig()))
+						new KryoSerializer<>(ComplexPayload.class, env.getConfig())), // custom KryoSerializer
+					Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction
 				)
 			)
 			.name(KEYED_STATE_OPER_NAME)
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
index a2c63877dd3..b29e535cfb2 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialListStateBuilder.java
@@ -21,8 +21,8 @@
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 
@@ -35,7 +35,7 @@
 
 	private transient ListState<STATE> listOperatorState;
 	private transient ListState<STATE> listKeyedState;
-	private final TypeSerializer<STATE> typeSerializer;
+	private final ListStateDescriptor<STATE> listStateDescriptor;
 	private final JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator;
 	private final JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator;
 
@@ -43,11 +43,11 @@ public ArtificialListStateBuilder(
 		String stateName,
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> keyedStateGenerator,
 		JoinFunction<IN, Iterable<STATE>, List<STATE>> operatorStateGenerator,
-		TypeSerializer<STATE> typeSerializer) {
+		ListStateDescriptor<STATE> listStateDescriptor) {
 		super(stateName);
-		this.typeSerializer = typeSerializer;
-		this.keyedStateGenerator = keyedStateGenerator;
-		this.operatorStateGenerator = operatorStateGenerator;
+		this.listStateDescriptor = Preconditions.checkNotNull(listStateDescriptor);
+		this.keyedStateGenerator = Preconditions.checkNotNull(keyedStateGenerator);
+		this.operatorStateGenerator = Preconditions.checkNotNull(operatorStateGenerator);
 	}
 
 	@Override
@@ -58,7 +58,6 @@ public void artificialStateForElement(IN event) throws Exception {
 
 	@Override
 	public void initialize(FunctionInitializationContext initializationContext) throws Exception
{
-		ListStateDescriptor<STATE> listStateDescriptor = new ListStateDescriptor<>(stateName,
typeSerializer);
 		listOperatorState = initializationContext.getOperatorStateStore().getListState(listStateDescriptor);
 		listKeyedState = initializationContext.getKeyedStateStore().getListState(listStateDescriptor);
 	}
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
index 6d74e0964f5..421a682351d 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialValueStateBuilder.java
@@ -21,8 +21,8 @@
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.util.Preconditions;
 
 /**
  * An {@link ArtificialStateBuilder} for user {@link ValueState}s.
@@ -32,16 +32,16 @@
 	private static final long serialVersionUID = -1205814329756790916L;
 
 	private transient ValueState<STATE> valueState;
-	private final TypeSerializer<STATE> typeSerializer;
+	private final ValueStateDescriptor<STATE> valueStateDescriptor;
 	private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
 
 	public ArtificialValueStateBuilder(
 		String stateName,
 		JoinFunction<IN, STATE, STATE> stateValueGenerator,
-		TypeSerializer<STATE> typeSerializer) {
+		ValueStateDescriptor<STATE> valueStateDescriptor) {
 		super(stateName);
-		this.typeSerializer = typeSerializer;
-		this.stateValueGenerator = stateValueGenerator;
+		this.valueStateDescriptor = Preconditions.checkNotNull(valueStateDescriptor);
+		this.stateValueGenerator = Preconditions.checkNotNull(stateValueGenerator);
 	}
 
 	@Override
@@ -51,8 +51,6 @@ public void artificialStateForElement(IN event) throws Exception {
 
 	@Override
 	public void initialize(FunctionInitializationContext initializationContext) {
-		ValueStateDescriptor<STATE> valueStateDescriptor =
-			new ValueStateDescriptor<>(stateName, typeSerializer);
 		valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
 	}
 }
diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 0b3b5ed4b89..4f77f954d3e 100644
--- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -84,8 +84,8 @@ public static void main(String[] args) throws Exception {
 			Collections.singletonList(new KryoSerializer<>(ComplexPayload.class, env.getConfig()));
 
 		KeyedStream<Event, Integer> afterStatefulOperations = isOriginalJobVariant(pt) ?
-			applyOriginalStatefulOperations(source, stateSer) :
-			applyUpgradedStatefulOperations(source, stateSer);
+			applyOriginalStatefulOperations(source, stateSer, Collections.emptyList()) :
+			applyUpgradedStatefulOperations(source, stateSer, Collections.emptyList());
 
 		afterStatefulOperations
 			.flatMap(createSemanticsCheckMapper(pt))
@@ -109,26 +109,29 @@ private static boolean isOriginalJobVariant(final ParameterTool pt)
{
 
 	private static KeyedStream<Event, Integer> applyOriginalStatefulOperations(
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
-		source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source,
stateSer);
-		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer);
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
+		source = applyTestStatefulOperator("stateMap1", simpleStateUpdate("stateMap1"), source,
stateSer, stateClass);
+		return applyTestStatefulOperator("stateMap2", lastStateUpdate("stateMap2"), source, stateSer,
stateClass);
 	}
 
 	private static KeyedStream<Event, Integer> applyUpgradedStatefulOperations(
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
-		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source,
stateSer);
-		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer);
-		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer);
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
+		source = applyTestStatefulOperator("stateMap2", simpleStateUpdate("stateMap2"), source,
stateSer, stateClass);
+		source = applyTestStatefulOperator("stateMap1", lastStateUpdate("stateMap1"), source, stateSer,
stateClass);
+		return applyTestStatefulOperator("stateMap3", simpleStateUpdate("stateMap3"), source, stateSer,
stateClass);
 	}
 
 	private static KeyedStream<Event, Integer> applyTestStatefulOperator(
 		String name,
 		JoinFunction<Event, ComplexPayload, ComplexPayload> stateFunc,
 		KeyedStream<Event, Integer> source,
-		List<TypeSerializer<ComplexPayload>> stateSer) {
+		List<TypeSerializer<ComplexPayload>> stateSer,
+		List<Class<ComplexPayload>> stateClass) {
 		return source
-			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer))
+			.map(createArtificialKeyedStateMapper(e -> e, stateFunc, stateSer, stateClass))
 			.name(name)
 			.uid(name)
 			.returns(Event.class)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add a test operator with keyed state that uses Kryo serializer (registered/unregistered/custom)
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8993
>                 URL: https://issues.apache.org/jira/browse/FLINK-8993
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.5.0
>            Reporter: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>
> Add an operator with keyed state that uses Kryo serializer (registered/unregistered/custom).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message