flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10312) Wrong / missing exception when submitting job
Date Mon, 01 Oct 2018 11:59:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633901#comment-16633901
] 

ASF GitHub Bot commented on FLINK-10312:
----------------------------------------

zentol closed pull request #6731: [FLINK-10312] Propagate exception from server to client
in REST API
URL: https://github.com/apache/flink/pull/6731
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 935a07faf89..9eb8cc72c8f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -376,7 +376,7 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
 				(JobSubmitResponseBody jobSubmitResponseBody) -> new JobSubmissionResult(jobGraph.getJobID()))
 			.exceptionally(
 				(Throwable throwable) -> {
-					throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed
to submit JobGraph.", throwable));
+					throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed
to submit JobGraph.", ExceptionUtils.stripCompletionException(throwable)));
 				});
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index d4a65dec8f6..5a19a3f628d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -198,7 +198,10 @@
 						if (throwable instanceof CancellationException) {
 							resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.",
throwable));
 						} else {
-							if (retries > 0 && retryPredicate.test(throwable)) {
+							throwable = ExceptionUtils.stripExecutionException(throwable);
+							if (!retryPredicate.test(throwable)) {
+								resultFuture.completeExceptionally(throwable);
+							} else if (retries > 0) {
 								final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
 									() -> retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay,
retryPredicate, scheduledExecutor),
 									retryDelay.toMilliseconds(),
@@ -207,12 +210,10 @@
 								resultFuture.whenComplete(
 									(innerT, innerThrowable) -> scheduledFuture.cancel(false));
 							} else {
-								final String errorMsg = retries == 0 ?
-									"Number of retries has been exhausted." :
-									"Exception is not retryable.";
-								resultFuture.completeExceptionally(new RetryException(
-									"Could not complete the operation. " + errorMsg,
-									throwable));
+								RetryException retryException = new RetryException(
+									"Could not complete the operation. Number of retries has been exhausted.",
+									throwable);
+								resultFuture.completeExceptionally(retryException);
 							}
 						}
 					} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index e4cec086017..9cfb58e98a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
@@ -39,6 +40,7 @@
 
 import javax.annotation.Nonnull;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -80,43 +82,26 @@ protected AbstractRestHandler(
 		}
 
 		return response.whenComplete((P resp, Throwable throwable) -> {
-			if (throwable != null) {
-
-				Throwable error = ExceptionUtils.stripCompletionException(throwable);
-
-				if (error instanceof RestHandlerException) {
-					final RestHandlerException rhe = (RestHandlerException) error;
-
-					processRestHandlerException(ctx, httpRequest, rhe);
-				} else {
-					log.error("Implementation error: Unhandled exception.", error);
-					HandlerUtils.sendErrorResponse(
-						ctx,
-						httpRequest,
-						new ErrorResponseBody("Internal server error."),
-						HttpResponseStatus.INTERNAL_SERVER_ERROR,
-						responseHeaders);
-				}
-			} else {
-				HandlerUtils.sendResponse(
-					ctx,
-					httpRequest,
-					resp,
-					messageHeaders.getResponseStatusCode(),
-					responseHeaders);
-			}
+			Tuple2<ResponseBody, HttpResponseStatus> r = throwable != null ?
+				errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode());
+			HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders);
 		}).thenApply(ignored -> null);
 	}
 
-	private void processRestHandlerException(ChannelHandlerContext ctx, HttpRequest httpRequest,
RestHandlerException rhe) {
-		log.error("Exception occurred in REST handler.", rhe);
-
-		HandlerUtils.sendErrorResponse(
-			ctx,
-			httpRequest,
-			new ErrorResponseBody(rhe.getMessage()),
-			rhe.getHttpResponseStatus(),
-			responseHeaders);
+	private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable)
{
+		Throwable error = ExceptionUtils.stripCompletionException(throwable);
+		if (error instanceof RestHandlerException) {
+			final RestHandlerException rhe = (RestHandlerException) error;
+			log.error("Exception occurred in REST handler.", rhe);
+			return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
+		} else {
+			log.error("Implementation error: Unhandled exception.", error);
+			String stackTrace = String.format("<Exception on server side:%n%s%nEnd of exception
on server side>",
+				ExceptionUtils.stringifyException(throwable));
+			return Tuple2.of(
+				new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)),
+				HttpResponseStatus.INTERNAL_SERVER_ERROR);
+		}
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index 491ba094f4c..e3101bef5ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -114,10 +114,7 @@ public JobSubmitHandler(
 		CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph
-> gateway.submitJob(jobGraph, timeout));
 
 		return jobSubmissionFuture.thenCombine(jobGraphFuture,
-			(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
-			.exceptionally(exception -> {
-				throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR,
exception));
-			});
+			(ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
 	}
 
 	private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody requestBody,
Map<String, Path> nameToFile) throws MissingFileException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index c386952c056..99962a6848e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -82,7 +82,7 @@ public void testRetrySuccess() throws Exception {
 			TestingUtils.defaultExecutor());
 
 		assertTrue(retryFuture.get());
-		assertTrue(retries == atomicInteger.get());
+		assertEquals(retries, atomicInteger.get());
 	}
 
 	/**
@@ -274,7 +274,7 @@ public void testRetryWithDelayAndPredicate() throws Exception {
 					throwable instanceof RuntimeException && throwable.getMessage().contains(retryableExceptionMessage),
 				new ScheduledExecutorServiceAdapter(retryExecutor)).get();
 		} catch (final ExecutionException e) {
-			assertThat(e.getMessage(), containsString("Could not complete the operation"));
+			assertThat(e.getMessage(), containsString("should propagate"));
 		} finally {
 			retryExecutor.shutdownNow();
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index be1cb797748..78ace967bec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -269,11 +269,7 @@ public void testFailedJobSubmission() throws Exception {
 				.get();
 		} catch (Exception e) {
 			Throwable t = ExceptionUtils.stripExecutionException(e);
-			if (t instanceof RestHandlerException){
-				Assert.assertTrue(t.getMessage().equals("Job submission failed."));
-			} else {
-				throw e;
-			}
+			Assert.assertEquals(errorMessage, t.getMessage());
 		}
 	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Wrong / missing exception when submitting job
> ---------------------------------------------
>
>                 Key: FLINK-10312
>                 URL: https://issues.apache.org/jira/browse/FLINK-10312
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.5.2, 1.6.0
>            Reporter: Stephan Ewen
>            Assignee: Andrey Zagrebin
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2, 1.5.5
>
>         Attachments: lmerge-TR.pdf
>
>
> h3. Problem
> When submitting a job that cannot be created / initialized on the JobManager, there is
no proper error message. The exception says *"Could not retrieve the execution result. (JobID:
5a7165e1260c6316fa11d2760bd3d49f)"*
> h3. Steps to Reproduce
> Create a streaming job, set a state backend with a non existing file system scheme.
> h3. Full Stack Trace
> {code}
> Submitting a job where instantiation on the JM fails yields this, which seems like a
major regression from seeing the actual exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution
result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)
> 	at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
> 	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> 	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
> 	at com.dataartisans.streamledger.examples.simpletrade.SimpleTradeExample.main(SimpleTradeExample.java:98)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
> 	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
> 	at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
> 	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> 	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
> 	at org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1120)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
> 	at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$25(RestClusterClient.java:379)
> 	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> 	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> 	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$32(FutureUtils.java:213)
> 	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> 	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 	at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> 	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> 	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Exception is not retryable.
> 	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> 	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> 	at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> 	at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> 	... 12 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
complete the operation. Exception is not retryable.
> 	... 10 more
> Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.util.RestClientException:
[Job submission failed.]
> 	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> 	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> 	at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> 	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
> 	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> 	... 4 more
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job submission failed.]
> 	at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:310)
> 	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$364(RestClient.java:294)
> 	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> 	... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message