flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4273) Refactor JobClientActor to watch already submitted jobs
Date Thu, 18 Aug 2016 12:58:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426385#comment-15426385
] 

ASF GitHub Bot commented on FLINK-4273:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75302372
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID,
e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses,
logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps)
jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    +
    +			final List<BlobKey> requiredJarFiles = props.requiredJarFiles();
    +			final List<URL> requiredClasspaths = props.requiredClasspaths();
    +
    +			final URL[] allURLs = new URL[requiredJarFiles.size() + requiredClasspaths.size()];
    +
    +			int pos = 0;
    +			for (BlobKey blobKey : props.requiredJarFiles()) {
    +				try {
    +					allURLs[pos++] = blobClient.getURL(blobKey);
    +				} catch (Exception e) {
    +					blobClient.shutdown();
    +					throw new JobRetrievalException(jobID, "Failed to download BlobKey " + blobKey);
    +				}
    +			}
    +
    +			for (URL url : requiredClasspaths) {
    +				allURLs[pos++] = url;
    +			}
    +
    +			return new URLClassLoader(allURLs, JobClient.class.getClassLoader());
    +		} else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
    +			throw new JobRetrievalException(jobID, "Couldn't retrieve class loader. Job " + jobID
+ " not found");
    +		} else {
    +			throw new JobRetrievalException(jobID, "Unknown response from JobManager: " + jmAnswer);
    +		}
    +	}
    +
    +	/**
    +	 * Given a JobListeningContext, awaits the result of the job execution that this context
is bound to
    +	 * @param listeningContext The listening context of the job execution
    +	 * @return The result of the execution
    +	 * @throws JobExecutionException if anything goes wrong while monitoring the job
    +	 */
    +	public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext)
throws JobExecutionException {
    +
    +		final JobID jobID = listeningContext.jobID;
    +		final Future<Object> jobSubmissionFuture = listeningContext.jobResultFuture;
    +		final ClassLoader classLoader = listeningContext.classLoader;
    +
     		// first block handles errors while waiting for the result
    -		Object answer;
    +		final Object answer;
     		try {
    -			Future<Object> future = Patterns.ask(jobClientActor,
    -					new JobClientMessages.SubmitJobAndWait(jobGraph),
    -					new Timeout(AkkaUtils.INF_TIMEOUT()));
    -			
    -			answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
    +			answer = Await.result(jobSubmissionFuture, AkkaUtils.INF_TIMEOUT());
    --- End diff --
    
    What if the `JobClientActor` dies for some reason? Then he won't be able to complete or
fail the future and we would be stuck. Maybe we could periodically check if the `JobClientActor`
is still alive to avoid this scenario.


> Refactor JobClientActor to watch already submitted jobs 
> --------------------------------------------------------
>
>                 Key: FLINK-4273
>                 URL: https://issues.apache.org/jira/browse/FLINK-4273
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for the result.
This process should be broken up into a submission process and a waiting process which can
both be entered independently. This leads to two different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message