flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10522) Check if RecoverableWriter supportsResume and act accordingly.
Date Tue, 04 Dec 2018 11:02:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708556#comment-16708556
] 

ASF GitHub Bot commented on FLINK-10522:
----------------------------------------

azagrebin commented on a change in pull request #7047: [FLINK-10522] Check if RecoverableWriter
supportsResume() and act accordingly.
URL: https://github.com/apache/flink/pull/7047#discussion_r238593584
 
 

 ##########
 File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
 ##########
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpCommitter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableFsDataOutputStream;
+import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Tests for the {@link Bucket} class.
+ */
+public class BucketTest {
+
+	private final PartFileWriter.PartFileFactory<String, Integer> factory =
+			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
+
+	private final RollingPolicy<String, Integer> rollingPolicy = DefaultRollingPolicy.create().build();
+
+	// --------------------------- Checking Restore ---------------------------
+
+	@Test
+	public void inProgressFileShouldBeCommittedIfWriterDoesNotSupportResume() throws IOException
{
+		final StubNonResumableWriter nonResumableWriter = new StubNonResumableWriter();
+		final Bucket<String, Integer> bucket = getRestoredBucketWithOnlyInProgressPart(nonResumableWriter);
+
+		Assert.assertThat(nonResumableWriter, hasMethodCallCountersEqualTo(1, 0, 1));
+		Assert.assertThat(bucket, hasNullInProgressFile(true));
+	}
+
+	@Test
+	public void inProgressFileShouldBeRestoredIfWriterSupportsResume() throws IOException {
+		final StubResumableWriter resumableWriter = new StubResumableWriter();
+		final Bucket<String, Integer> bucket = getRestoredBucketWithOnlyInProgressPart(resumableWriter);
+
+		Assert.assertThat(resumableWriter, hasMethodCallCountersEqualTo(1, 1, 0));
+		Assert.assertThat(bucket, hasNullInProgressFile(false));
+	}
+
+	@Test
+	public void pendingFilesShouldBeRestored() throws IOException {
+		final int expectedRecoverForCommitCounter = 10;
+
+		final StubNonResumableWriter writer = new StubNonResumableWriter();
+		final Bucket<String, Integer> bucket = getRestoredBucketWithOnlyPendingParts(writer,
expectedRecoverForCommitCounter);
+
+		Assert.assertThat(writer, hasMethodCallCountersEqualTo(0, 0, expectedRecoverForCommitCounter));
+		Assert.assertThat(bucket, hasNullInProgressFile(true));
+	}
+
+	// ---------------------------------- Matchers ----------------------------------
+
+	private static TypeSafeMatcher<Bucket<String, Integer>> hasNullInProgressFile(final
boolean isNull) {
+
+		return new TypeSafeMatcher<Bucket<String, Integer>>() {
+			@Override
+			protected boolean matchesSafely(Bucket<String, Integer> bucket) {
+				final PartFileWriter<String, Integer> inProgressPart = bucket.getInProgressPart();
+				return isNull == (inProgressPart == null);
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a Bucket with its inProgressPart being ")
+						.appendText(isNull ? " null." : " not null.");
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<BaseStubWriter> hasMethodCallCountersEqualTo(
+			final int supportsResumeCalls,
+			final int recoverCalls,
+			final int recoverForCommitCalls) {
+
+		return new TypeSafeMatcher<BaseStubWriter>() {
+			@Override
+			protected boolean matchesSafely(BaseStubWriter writer) {
+				return writer.getSupportsResumeCallCounter() == supportsResumeCalls &&
+						writer.getRecoverCallCounter() == recoverCalls &&
+						writer.getRecoverForCommitCallCounter() == recoverForCommitCalls;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a Writer where:")
+						.appendText(" supportsResume was called ").appendValue(supportsResumeCalls).appendText("
times,")
+						.appendText(" recover was called ").appendValue(recoverCalls).appendText(" times,")
+						.appendText(" and recoverForCommit was called ").appendValue(recoverForCommitCalls).appendText("
times.")
+						.appendText("'");
+			}
+		};
+	}
+
+	// ---------------------------------- Utility Methods ----------------------------------
+
+	private Bucket<String, Integer> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter
writer) throws IOException {
+		final BucketState<Integer> stateWithOnlyInProgressFile =
+				new BucketState<>(5, new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
+		return Bucket.restore(writer, 0, 1L, factory, rollingPolicy, stateWithOnlyInProgressFile);
+	}
+
+	private Bucket<String, Integer> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter
writer, final int numberOfPendingParts) throws IOException {
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> completePartsPerCheckpoint
=
+				createPendingPartsPerCheckpoint(numberOfPendingParts);
+
+		final BucketState<Integer> initStateWithOnlyInProgressFile =
+				new BucketState<>(5, new Path(), 12345L, null, completePartsPerCheckpoint);
+		return Bucket.restore(writer, 0, 1L, factory, rollingPolicy, initStateWithOnlyInProgressFile);
+	}
+
+	private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int
noOfCheckpoints) {
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint
= new HashMap<>();
+		for (int checkpointId = 0; checkpointId < noOfCheckpoints; checkpointId++) {
+			final List<RecoverableWriter.CommitRecoverable> pending = new ArrayList<>();
+			pending.add(new NoOpRecoverable());
+			pendingCommittablesPerCheckpoint.put((long) checkpointId, pending);
+		}
+		return pendingCommittablesPerCheckpoint;
+	}
+
+	// ---------------------------------- Test Classes ----------------------------------
+
+	/**
+	 * A test implementation of a {@link RecoverableWriter} that does not support
+	 * resuming, i.e. keep on writing to the in-progress file at the point we were
+	 * before the failure.
 
 Review comment:
   Comments for 3 stub classes look like they need to be updated accordingly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Check if RecoverableWriter supportsResume and act accordingly.
> --------------------------------------------------------------
>
>                 Key: FLINK-10522
>                 URL: https://issues.apache.org/jira/browse/FLINK-10522
>             Project: Flink
>          Issue Type: Sub-task
>          Components: filesystem-connector
>    Affects Versions: 1.6.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> So far we assumed that all `RecoverableWriters` support "resuming", i.e. after recovering
from a failure or from a savepoint they could keep writing to the previously "in-progress"
file. This assumption holds for all current writers, but in order to be able to accommodate
also filesystems that may not support this operation, we should check upon initialization
if the writer supports resuming and if yes, we go as before, if not, we recover for commit
and commit the previously in-progress file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message