flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "godfrey he (Jira)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-15669) SQL client can't cancel flink job
Date Sun, 19 Jan 2020 11:40:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

godfrey he updated FLINK-15669:
-------------------------------
    Description: 
in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String
resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the
job id. So CLI client can't cancel a running job.


related code in {{LocalExecutor}}:
{code:java}
private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C>
context, String query) {
	 ......

	// store the result with a unique id
	final String resultId = UUID.randomUUID().toString();
	resultStore.storeResult(resultId, result);

	......

	// create execution
	final ProgramDeployer deployer = new ProgramDeployer(
		configuration, jobName, pipeline);

	// start result retrieval
	result.startRetrieval(deployer);

	return new ResultDescriptor(
			resultId,
			removeTimeAttributes(table.getSchema()),
			result.isMaterialized());
}



private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId)
{
	......

	// stop Flink job
	try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor())
{
		ClusterClient<T> clusterClient = null;
		try {
			// retrieve existing cluster
			clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
			try {
				// ======== can job through resultId =======
				clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
			} catch (Throwable t) {
				// the job might has finished earlier
			}
		} catch (Exception e) {
			throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
		} finally {
			try {
				if (clusterClient != null) {
					clusterClient.close();
				}
			} catch (Exception e) {
				// ignore
			}
		}
	} catch (SqlExecutionException e) {
		throw e;
	} catch (Exception e) {
		throw new SqlExecutionException("Could not locate a cluster.", e);
	}
}
{code}





  was:
in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId, String
resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is not the
job id. So CLI client can't cancel a running job.


related code in {{LocalExecutor}}:
{code:java}
private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C>
context, String query) {
	 ......

	// store the result with a unique id
	final String resultId = UUID.randomUUID().toString();
	resultStore.storeResult(resultId, result);

	......

	// create execution
	final ProgramDeployer deployer = new ProgramDeployer(
		configuration, jobName, pipeline);

	// start result retrieval
	result.startRetrieval(deployer);

	return new ResultDescriptor(
			resultId,
			removeTimeAttributes(table.getSchema()),
			result.isMaterialized());
}



private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId)
{
	......

	// stop Flink job
	try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor())
{
		ClusterClient<T> clusterClient = null;
		try {
			// retrieve existing cluster
			clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
			try {
				clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
			} catch (Throwable t) {
				// the job might has finished earlier
			}
		} catch (Exception e) {
			throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
		} finally {
			try {
				if (clusterClient != null) {
					clusterClient.close();
				}
			} catch (Exception e) {
				// ignore
			}
		}
	} catch (SqlExecutionException e) {
		throw e;
	} catch (Exception e) {
		throw new SqlExecutionException("Could not locate a cluster.", e);
	}
}
{code}






> SQL client can't cancel flink job
> ---------------------------------
>
>                 Key: FLINK-15669
>                 URL: https://issues.apache.org/jira/browse/FLINK-15669
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Client
>    Affects Versions: 1.10.0
>            Reporter: godfrey he
>            Priority: Major
>             Fix For: 1.10.0
>
>
> in sql client, CLI client do cancel query through {{void cancelQuery(String sessionId,
String resultId)}} method in {{Executor}}. However, the {{resultId}} is a random UUID, is
not the job id. So CLI client can't cancel a running job.
> related code in {{LocalExecutor}}:
> {code:java}
> private <C> ResultDescriptor executeQueryInternal(String sessionId, ExecutionContext<C>
context, String query) {
> 	 ......
> 	// store the result with a unique id
> 	final String resultId = UUID.randomUUID().toString();
> 	resultStore.storeResult(resultId, result);
> 	......
> 	// create execution
> 	final ProgramDeployer deployer = new ProgramDeployer(
> 		configuration, jobName, pipeline);
> 	// start result retrieval
> 	result.startRetrieval(deployer);
> 	return new ResultDescriptor(
> 			resultId,
> 			removeTimeAttributes(table.getSchema()),
> 			result.isMaterialized());
> }
> private <T> void cancelQueryInternal(ExecutionContext<T> context, String
resultId) {
> 	......
> 	// stop Flink job
> 	try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor())
{
> 		ClusterClient<T> clusterClient = null;
> 		try {
> 			// retrieve existing cluster
> 			clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
> 			try {
> 				// ======== can job through resultId =======
> 				clusterClient.cancel(new JobID(StringUtils.hexStringToByte(resultId))).get();
> 			} catch (Throwable t) {
> 				// the job might has finished earlier
> 			}
> 		} catch (Exception e) {
> 			throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
> 		} finally {
> 			try {
> 				if (clusterClient != null) {
> 					clusterClient.close();
> 				}
> 			} catch (Exception e) {
> 				// ignore
> 			}
> 		}
> 	} catch (SqlExecutionException e) {
> 		throw e;
> 	} catch (Exception e) {
> 		throw new SqlExecutionException("Could not locate a cluster.", e);
> 	}
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message