flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4273) Refactor JobClientActor to watch already submitted jobs
Date Thu, 18 Aug 2016 13:46:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426486#comment-15426486
] 

ASF GitHub Bot commented on FLINK-4273:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2313#discussion_r75310003
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
    @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
     			sysoutLogUpdates);
     
     		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    -		
    +
    +		Future<Object> submissionFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.SubmitJobAndWait(jobGraph),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobGraph.getJobID(),
    +				submissionFuture,
    +				jobClientActor,
    +				classLoader);
    +	}
    +
    +
    +	/**
    +	 * Attaches to a running Job using the JobID.
    +	 * Reconstructs the user class loader by downloading the jars from the JobManager.
    +	 * @throws JobRetrievalException if anything goes wrong while retrieving the job
    +	 */
    +	public static JobListeningContext attachToRunningJob(
    +			JobID jobID,
    +			ActorGateway jobManagerGateWay,
    +			Configuration configuration,
    +			ActorSystem actorSystem,
    +			LeaderRetrievalService leaderRetrievalService,
    +			FiniteDuration timeout,
    +			boolean sysoutLogUpdates) throws JobRetrievalException {
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
    +		checkNotNull(configuration, "The configuration must not be null.");
    +		checkNotNull(actorSystem, "The actorSystem must not be null.");
    +		checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null.");
    +
    +		// retrieve classloader first before doing anything
    +		ClassLoader classloader;
    +		try {
    +			classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout);
    +			LOG.info("Reconstructed class loader for Job {}" , jobID);
    +		} catch (Exception e) {
    +			LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID,
e);
    +			classloader = JobClient.class.getClassLoader();
    +		}
    +
    +		// we create a proxy JobClientActor that deals with all communication with
    +		// the JobManager. It forwards the job submission, checks the success/failure responses,
logs
    +		// update messages, watches for disconnect between client and JobManager, ...
    +		Props jobClientActorProps = JobClientActor.createJobClientActorProps(
    +			leaderRetrievalService,
    +			timeout,
    +			sysoutLogUpdates);
    +
    +		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
    +
    +		Future<Object> attachmentFuture = Patterns.ask(
    +				jobClientActor,
    +				new JobClientMessages.AttachToJobAndWait(jobID),
    +				new Timeout(AkkaUtils.INF_TIMEOUT()));
    +
    +		return new JobListeningContext(
    +				jobID,
    +				attachmentFuture,
    +				jobClientActor,
    +				classloader);
    +	}
    +
    +	/**
    +	 * Reconstructs the class loader by first requesting information about it at the JobManager
    +	 * and then downloading missing jar files.
    +	 * @param jobID id of job
    +	 * @param jobManager gateway to the JobManager
    +	 * @param config the flink configuration
    +	 * @param timeout timeout for querying the jobmanager
    +	 * @return A classloader that should behave like the original classloader
    +	 * @throws JobRetrievalException if anything goes wrong
    +	 */
    +	public static ClassLoader retrieveClassLoader(
    +		JobID jobID,
    +		ActorGateway jobManager,
    +		Configuration config,
    +		FiniteDuration timeout)
    +		throws JobRetrievalException {
    +
    +		final Object jmAnswer;
    +		try {
    +			jmAnswer = Await.result(
    +				jobManager.ask(
    +					new JobManagerMessages.RequestClassloadingProps(jobID), timeout), timeout);
    +		} catch (Exception e) {
    +			throw new JobRetrievalException(jobID, "JobManager didn't respond", e);
    +		}
    +
    +		if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
    +			JobManagerMessages.ClassloadingProps props = ((JobManagerMessages.ClassloadingProps)
jmAnswer);
    +
    +			Option<String> jmHost = jobManager.actor().path().address().host();
    +			String jmHostname = jmHost.isDefined() ? jmHost.get() : "localhost";
    +			InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, props.blobManagerPort());
    +			final BlobCache blobClient = new BlobCache(serverAddress, config);
    --- End diff --
    
    Does it make sense to clean up this `BlobCache` once the job execution result has been
delivered?


> Refactor JobClientActor to watch already submitted jobs 
> --------------------------------------------------------
>
>                 Key: FLINK-4273
>                 URL: https://issues.apache.org/jira/browse/FLINK-4273
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> The JobClientActor assumes that it receives a job, submits it, and waits for the result.
This process should be broken up into a submission process and a waiting process which can
both be entered independently. This leads to two different entry points:
> 1) submit(job) -> wait
> 2) retrieve(jobID) -> wait



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message