flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #2732: [FLINK-4272] Create a JobClient for job control an...
Date Wed, 09 Nov 2016 13:50:00 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2732#discussion_r87102297
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
---
    @@ -142,4 +209,66 @@ private ActorGateway getJobManager() throws JobRetrievalException
{
     			throw new JobRetrievalException(jobID, "Couldn't retrieve leading JobManager.", e);
     		}
     	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	private static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID),
    +					AkkaUtils.getDefaultTimeoutAsFiniteDuration()),
    +				AkkaUtils.getDefaultTimeoutAsFiniteDuration());
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "Couldn't retrieve class loading properties
from JobManager.", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps)
jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    +
    +			final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
    +			final List<URL> requiredClasspaths = props.requiredClasspaths();
    +
    +			final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
    +
    +			int pos = 0;
    +			for (BlobKey blobKey : props.requiredJarFiles()) {
    +				try {
    +					allURLs[pos++] = blobClient.getURL(blobKey);
    +				} catch (Exception e) {
    +					blobClient.shutdown();
    +					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
    --- End diff --
    
    Exception `e` is swallowed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message