flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Date Fri, 19 Apr 2019 07:13:41 GMT
becketqin commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r276915833
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
 ##########
 @@ -172,9 +178,44 @@ public int getNumIds() {
 				ids.add(serializer.deserialize(deser));
 			}
 
-			deque.addLast(new Tuple2<Long, Set<T>>(checkpoint.checkpointId, ids));
+			map.put(checkpoint.checkpointId, ids);
 		}
 
+		return map;
+	}
+
+	/**
+	 * Combines multiple ArrayDeques with checkpoint data by checkpointId.
+	 * This could happen when a job rescales to a lower parallelism and states are multiple
tasks are combined.
+	 *
+	 * @param data The data to be combined.
+	 * @param <T> The type of the elements.
+	 * @return An ArrayDeque of combined element checkpoints.
+	 */
+	public static <T> ArrayDeque<Tuple2<Long, Set<T>>> combine(List<Map<Long,
Set<T>>> data) {
+		Map<Long, Set<T>> accumulator = new TreeMap<>();
+		for (Map<Long, Set<T>> element : data) {
+			accumulator = combine(accumulator, element);
+		}
+
+		//Convert map to deque sorted by checkpointId
+		ArrayDeque<Tuple2<Long, Set<T>>> deque = new ArrayDeque<>(accumulator.size());
+		accumulator.entrySet()
+				.stream()
+				.sequential()
+				.sorted(Comparator.comparing(Map.Entry::getKey))
 
 Review comment:
   Any reason that sorting is needed here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message