flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10482) java.lang.IllegalArgumentException: Negative number of in progress checkpoints
Date Fri, 07 Dec 2018 10:47:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712644#comment-16712644
] 

ASF GitHub Bot commented on FLINK-10482:
----------------------------------------

tillrohrmann closed pull request #7118: [FLINK-10482] Fix double counting of checkpoint stat
URL: https://github.com/apache/flink/pull/7118
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
index dad45eb669c..9e15aebd048 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCounts.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -26,6 +29,7 @@
  * Counts of checkpoints.
  */
 public class CheckpointStatsCounts implements Serializable {
+	private static final Logger LOG = LoggerFactory.getLogger(CheckpointStatsCounts.class);
 
 	private static final long serialVersionUID = -5229425063269482528L;
 
@@ -147,9 +151,8 @@ void incrementInProgressCheckpoints() {
 	 * {@link #incrementInProgressCheckpoints()}.
 	 */
 	void incrementCompletedCheckpoints() {
-		if (--numInProgressCheckpoints < 0) {
-			throw new IllegalStateException("Incremented the completed number of checkpoints " +
-				"without incrementing the in progress checkpoints before.");
+		if (canDecrementOfInProgressCheckpointsNumber()) {
+			numInProgressCheckpoints--;
 		}
 		numCompletedCheckpoints++;
 	}
@@ -161,9 +164,8 @@ void incrementCompletedCheckpoints() {
 	 * {@link #incrementInProgressCheckpoints()}.
 	 */
 	void incrementFailedCheckpoints() {
-		if (--numInProgressCheckpoints < 0) {
-			throw new IllegalStateException("Incremented the completed number of checkpoints " +
-				"without incrementing the in progress checkpoints before.");
+		if (canDecrementOfInProgressCheckpointsNumber()) {
+			numInProgressCheckpoints--;
 		}
 		numFailedCheckpoints++;
 	}
@@ -181,4 +183,14 @@ CheckpointStatsCounts createSnapshot() {
 			numCompletedCheckpoints,
 			numFailedCheckpoints);
 	}
+
+	private boolean canDecrementOfInProgressCheckpointsNumber() {
+		boolean decrementLeadsToNegativeNumber = numInProgressCheckpoints - 1 < 0;
+		if (decrementLeadsToNegativeNumber) {
+			String errorMessage = "Incremented the completed number of checkpoints " +
+				"without incrementing the in progress checkpoints before.";
+			LOG.warn(errorMessage);
+		}
+		return !decrementLeadsToNegativeNumber;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5d2d363cf71..54c14af8983 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -962,19 +962,18 @@ public void heartbeatFromResourceManager(final ResourceID resourceID)
{
 		return checkpointCoordinator
 			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
 			.thenApply(CompletedCheckpoint::getExternalPointer)
-			.thenApplyAsync(path -> {
-				if (cancelJob) {
+			.handleAsync((path, throwable) -> {
+				if (throwable != null) {
+					if (cancelJob) {
+						startCheckpointScheduler(checkpointCoordinator);
+					}
+					throw new CompletionException(throwable);
+				} else if (cancelJob) {
 					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
 					cancel(timeout);
 				}
 				return path;
-			}, getMainThreadExecutor())
-			.exceptionally(throwable -> {
-				if (cancelJob) {
-					startCheckpointScheduler(checkpointCoordinator);
-				}
-				throw new CompletionException(throwable);
-			});
+			}, getMainThreadExecutor());
 	}
 
 	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator)
{
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
index cf1e7f7f82d..2d09b46464f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsCountsTest.java
@@ -21,15 +21,16 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 
+/** Test checkpoint statistics counters. */
 public class CheckpointStatsCountsTest {
 
 	/**
 	 * Tests that counts are reported correctly.
 	 */
 	@Test
-	public void testCounts() throws Exception {
+	public void testCounts() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
 		assertEquals(0, counts.getNumberOfRestoredCheckpoints());
 		assertEquals(0, counts.getTotalNumberOfCheckpoints());
@@ -80,19 +81,15 @@ public void testCounts() throws Exception {
 	 * incrementing the in progress checkpoints before throws an Exception.
 	 */
 	@Test
-	public void testCompleteOrFailWithoutInProgressCheckpoint() throws Exception {
+	public void testCompleteOrFailWithoutInProgressCheckpoint() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
-		try {
-			counts.incrementCompletedCheckpoints();
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
-		}
-
-		try {
-			counts.incrementFailedCheckpoints();
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
-		}
+		counts.incrementCompletedCheckpoints();
+		assertTrue("Number of checkpoints in progress should never be negative",
+			counts.getNumberOfInProgressCheckpoints() >= 0);
+
+		counts.incrementFailedCheckpoints();
+		assertTrue("Number of checkpoints in progress should never be negative",
+			counts.getNumberOfInProgressCheckpoints() >= 0);
 	}
 
 	/**
@@ -100,7 +97,7 @@ public void testCompleteOrFailWithoutInProgressCheckpoint() throws Exception
{
 	 * parent.
 	 */
 	@Test
-	public void testCreateSnapshot() throws Exception {
+	public void testCreateSnapshot() {
 		CheckpointStatsCounts counts = new CheckpointStatsCounts();
 		counts.incrementRestoredCheckpoints();
 		counts.incrementRestoredCheckpoints();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-10482
>                 URL: https://issues.apache.org/jira/browse/FLINK-10482
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.1
>            Reporter: Julio Biason
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> Recently I found the following log on my JobManager log:
> {noformat}
> 2018-10-02 17:44:50,090 [flink-akka.actor.default-dispatcher-4117] ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler 
- Implementation error: Unhandled exception.
>  java.lang.IllegalArgumentException: Negative number of in progress checkpoints
>          at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>          at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.<init>(CheckpointStatsCounts.java:72)
>          at org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
>          at org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
>          at org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
>          at org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
>          at org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
>          at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source)                 
 
>          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.lang.reflect.Method.invoke(Method.java:498)                               
 
>          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)                                                                                        
 
>          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>          at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>          at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)                                                                                                  
 
>          at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)                                                                                       
 
>          at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)                                                                                                   
 
>          at akka.actor.Actor$class.aroundReceive(Actor.scala:502)                                                                                                                            
 
>          at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)                                                                                                                     
 
>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)                 
 
>          at akka.actor.ActorCell.invoke(ActorCell.scala:495)           
 
>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)                                                                                                                          
 
>          at akka.dispatch.Mailbox.run(Mailbox.scala:224)    
>          at akka.dispatch.Mailbox.exec(Mailbox.scala:234)                                                                                                                                    
 
>          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)                                                                                                             
 
>          at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)                                                                                                 
 
>          at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)     
 
>          at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> Related: The job details don't appear, the screen shows only the skeleton, but no information
(like the pipeline, substasks, etc).
> One thing that may have caused this is that the job was failing – an uncaught exception
on our code – and, during one of its restarts, I issued a "flink cancel <jobid>".
The job was cancelled, but the JobManager interface took a very long time to put the slots
as available again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message