flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] asfgit closed pull request #6466: [FLINK-10005][DataStream API] StreamingFileSink: sets initialPartCounter=maxUsed in new Buckets
Date Wed, 01 Aug 2018 08:17:38 GMT
asfgit closed pull request #6466: [FLINK-10005][DataStream API] StreamingFileSink: sets initialPartCounter=maxUsed
in new Buckets
URL: https://github.com/apache/flink/pull/6466
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index ec59233c0e5..a350096e38b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -115,6 +115,18 @@ public Bucket(
 		this.pending = new ArrayList<>();
 	}
 
+	/**
+	 * Gets the information available for the currently
+	 * open part file, i.e. the one we are currently writing to.
+	 *
+	 * <p>This will be null if there is no currently open part file. This
+	 * is the case when we have a new, just created bucket or a bucket
+	 * that has not received any data after the closing of its previously
+	 * open in-progress file due to the specified rolling policy.
+	 *
+	 * @return The information about the currently in-progress part file
+	 * or {@code null} if there is no open part file.
+	 */
 	public PartFileInfo<BucketID> getInProgressPartInfo() {
 		return currentPart;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 7e9dd61e035..e62c425fc2f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -70,8 +70,6 @@
 
 	private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
 
-	private long initMaxPartCounter;
-
 	private long maxPartCounterUsed;
 
 	private final RecoverableWriter fileSystemWriter;
@@ -114,7 +112,6 @@
 				bucketer.getSerializer()
 		);
 
-		this.initMaxPartCounter = 0L;
 		this.maxPartCounterUsed = 0L;
 	}
 
@@ -137,7 +134,7 @@ void initializeState(final ListState<byte[]> bucketStates, final
ListState<Long>
 		for (long partCounter: partCounterState.get()) {
 			maxCounter = Math.max(partCounter, maxCounter);
 		}
-		initMaxPartCounter = maxCounter;
+		maxPartCounterUsed = maxCounter;
 
 		// get the restored buckets
 		for (byte[] recoveredState : bucketStates.get()) {
@@ -151,7 +148,7 @@ void initializeState(final ListState<byte[]> bucketStates, final
ListState<Long>
 			final Bucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
 					fileSystemWriter,
 					subtaskIndex,
-					initMaxPartCounter,
+					maxPartCounterUsed,
 					partFileWriterFactory,
 					bucketState
 			);
@@ -200,8 +197,6 @@ void snapshotState(
 			final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
 
 			if (info != null && rollingPolicy.shouldRollOnCheckpoint(info)) {
-				// we also check here so that we do not have to always
-				// wait for the "next" element to arrive.
 				bucket.closePartFile();
 			}
 
@@ -237,13 +232,19 @@ void onElement(IN value, SinkFunction.Context context) throws Exception
{
 					subtaskIndex,
 					bucketId,
 					bucketPath,
-					initMaxPartCounter,
+					maxPartCounterUsed,
 					partFileWriterFactory);
 			activeBuckets.put(bucketId, bucket);
 		}
 
 		final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
 		if (info == null || rollingPolicy.shouldRollOnEvent(info, value)) {
+
+			// info will be null if there is no currently open part file. This
+			// is the case when we have a new, just created bucket or a bucket
+			// that has not received any data after the closing of its previously
+			// open in-progress file due to the specified rolling policy.
+
 			bucket.rollPartFile(currentProcessingTime);
 		}
 		bucket.write(value, currentProcessingTime);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 4b1e7436772..a0c438e1847 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -323,7 +323,7 @@ public void testInactivityPeriodWithLateNotify() throws Exception {
 					Assert.assertEquals("test1@1\n", fileContents.getValue());
 				} else if (fileContents.getKey().getParentFile().getName().equals("test2")) {
 					bucketCounter++;
-					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("part-0-1", fileContents.getKey().getName());
 					Assert.assertEquals("test2@1\n", fileContents.getValue());
 				} else if (fileContents.getKey().getParentFile().getName().equals("test3")) {
 					bucketCounter++;
@@ -346,11 +346,11 @@ public void testInactivityPeriodWithLateNotify() throws Exception {
 					Assert.assertEquals("test2@1\n", fileContents.getValue());
 				} else if (fileContents.getKey().getParentFile().getName().equals("test3")) {
 					bucketCounter++;
-					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("part-0-2", fileContents.getKey().getName());
 					Assert.assertEquals("test3@1\n", fileContents.getValue());
 				} else if (fileContents.getKey().getParentFile().getName().equals("test4")) {
 					bucketCounter++;
-					Assert.assertEquals("part-0-0", fileContents.getKey().getName());
+					Assert.assertEquals("part-0-3", fileContents.getKey().getName());
 					Assert.assertEquals("test4@1\n", fileContents.getValue());
 				}
 			}
@@ -437,8 +437,8 @@ public void testScalingDownAndMergingOfStates() throws Exception {
 							inProgressFilename.contains(".part-1-0.inprogress")
 						)
 				) {
-						counter++;
-				} else if (parentFilename.equals("test2") && inProgressFilename.contains(".part-1-0.inprogress"))
{
+					counter++;
+				} else if (parentFilename.equals("test2") && inProgressFilename.contains(".part-1-1.inprogress"))
{
 					counter++;
 				}
 			}
@@ -476,7 +476,7 @@ public void testScalingDownAndMergingOfStates() throws Exception {
 						counter++;
 						Assert.assertTrue(fileContents.getValue().equals("test1@1\n") || fileContents.getValue().equals("test1@0\n"));
 					}
-				} else if (parentFilename.equals("test2") && filename.contains(".part-1-0.inprogress"))
{
+				} else if (parentFilename.equals("test2") && filename.contains(".part-1-1.inprogress"))
{
 					counter++;
 					Assert.assertEquals("test2@1\n", fileContents.getValue());
 				}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index db54de941b9..f16a9085d9d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -25,12 +25,19 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.equalTo;
 
 /**
  * Tests for different {@link RollingPolicy rolling policies}.
@@ -134,24 +141,74 @@ public void testRollOnCheckpointPolicy() throws Exception {
 			// we take a checkpoint so we roll.
 			testHarness.snapshot(1L, 1L);
 
+			for (File file: FileUtils.listFiles(outDir, null, true)) {
+				if (Objects.equals(file.getParentFile().getName(), "test1")) {
+					Assert.assertTrue(file.getName().contains(".part-0-1.inprogress."));
+				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+					Assert.assertTrue(file.getName().contains(".part-0-0.inprogress."));
+				}
+			}
+
 			// this will create a new part file
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
 			TestUtils.checkLocalFs(outDir, 3, 0);
 
+			testHarness.notifyOfCompletedCheckpoint(1L);
+			for (File file: FileUtils.listFiles(outDir, null, true)) {
+				if (Objects.equals(file.getParentFile().getName(), "test1")) {
+					Assert.assertTrue(
+							file.getName().contains(".part-0-2.inprogress.") || file.getName().equals("part-0-1")
+					);
+				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+					Assert.assertEquals("part-0-0", file.getName());
+				}
+			}
+
 			// and open and fill .part-0-2.inprogress
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
-			TestUtils.checkLocalFs(outDir, 3, 0);                    // nothing committed yet
+			TestUtils.checkLocalFs(outDir, 1, 2);
 
 			// we take a checkpoint so we roll.
 			testHarness.snapshot(2L, 2L);
 
 			testHarness.processElement(new StreamRecord<>(Tuple2.of("test2", 7), 7L));
-			TestUtils.checkLocalFs(outDir, 4, 0);
+			TestUtils.checkLocalFs(outDir, 2, 2);
+
+			for (File file: FileUtils.listFiles(outDir, null, true)) {
+				if (Objects.equals(file.getParentFile().getName(), "test1")) {
+					Assert.assertThat(
+							file.getName(),
+							either(containsString(".part-0-2.inprogress."))
+									.or(equalTo("part-0-1"))
+					);
+				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+					Assert.assertThat(
+							file.getName(),
+							either(containsString(".part-0-3.inprogress."))
+									.or(equalTo("part-0-0"))
+					);
+				}
+			}
 
 			// we acknowledge the last checkpoint so we should publish all but the latest in-progress
file
 			testHarness.notifyOfCompletedCheckpoint(2L);
+
 			TestUtils.checkLocalFs(outDir, 1, 3);
+			for (File file: FileUtils.listFiles(outDir, null, true)) {
+				if (Objects.equals(file.getParentFile().getName(), "test1")) {
+					Assert.assertThat(
+							file.getName(),
+							either(equalTo("part-0-2")).or(equalTo("part-0-1"))
+					);
+				} else if (Objects.equals(file.getParentFile().getName(), "test2")) {
+					Assert.assertThat(
+							file.getName(),
+							either(containsString(".part-0-3.inprogress."))
+									.or(equalTo("part-0-0"))
+					);
+				}
+			}
 		}
 	}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message