flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7113) Make ClusterDescriptor independent of Flink cluster size
Date Thu, 06 Jul 2017 16:11:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076813#comment-16076813
] 

ASF GitHub Bot commented on FLINK-7113:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r125944216
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
---
    @@ -520,72  439,128 @@ protected YarnClusterClient deployInternal() throws Exception {
     			taskManagerMemoryMb =  yarnMinAllocationMB;
     		}
     
    -		// Create application via yarnClient
    -		final YarnClientApplication yarnApplication = yarnClient.createApplication();
    -		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    -
    -		Resource maxRes = appResponse.getMaximumResourceCapability();
     		final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the
'yarn.nodemanager.resource.memory-mb' configuration values\n";
    -		if (jobManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
     		if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources
for the JobManager available!\n"
    -				  "Maximum Memory: "   maxRes.getMemory()   "MB Requested: "   jobManagerMemoryMb
  "MB. "   note);
     				  "Maximum Memory: "   maximumResourceCapability.getMemory()   "MB Requested: " 
 jobManagerMemoryMb   "MB. "   note);
     		}
     
    -		if (taskManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
     		if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources
for the TaskManagers available!\n"
    -				  "Maximum Memory: "   maxRes.getMemory()   " Requested: "   taskManagerMemoryMb
  "MB. "   note);
     				  "Maximum Memory: "   maximumResourceCapability.getMemory()   " Requested: "   taskManagerMemoryMb
  "MB. "   note);
     		}
     
     		final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session,
but maybe not all TaskManagers are "  
     			"connecting from the beginning because the resources are currently not available in
the cluster. "  
     			"The allocation might take more time than usual because the Flink YARN client needs
to wait until "  
     			"the resources become available.";
     		int totalMemoryRequired = jobManagerMemoryMb   taskManagerMemoryMb * taskManagerCount;
    -		ClusterResourceDescription freeClusterMem;
    -		try {
    -			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    -		} catch (YarnException | IOException e) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    -			throw new YarnDeploymentException("Could not retrieve information about free cluster
resources.", e);
    -		}
     
    -		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
     
     		if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
     			LOG.warn("This YARN session requires "   totalMemoryRequired   "MB of memory in the
cluster. "
    -				  "There are currently only "   freeClusterMem.totalFreeMemory   "MB available."
  noteRsc);
     				  "There are currently only "   freeClusterResources.totalFreeMemory   "MB available."
  noteRsc);
     
     		}
    -		if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
     		if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the TaskManagers ("   taskManagerMemoryMb
  "MB) is more than "
    -				  "the largest possible YARN container: "   freeClusterMem.containerLimit   noteRsc);
     				  "the largest possible YARN container: "   freeClusterResources.containerLimit 
 noteRsc);
     		}
    -		if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
     		if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the JobManager ("   jobManagerMemoryMb
  "MB) is more than "
    -				  "the largest possible YARN container: "   freeClusterMem.containerLimit   noteRsc);
     				  "the largest possible YARN container: "   freeClusterResources.containerLimit 
 noteRsc);
     		}
     
     		// ----------------- check if the requested containers fit into the cluster.
     
    -		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
     		int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
     		// first, allocate the jobManager somewhere.
     		if (!allocateResource(nmFree, jobManagerMemoryMb)) {
     			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master.
"  
     				"The JobManager requires "   jobManagerMemoryMb   "MB. NodeManagers available: "
 
    -				Arrays.toString(freeClusterMem.nodeManagersFree)   noteRsc);
     				Arrays.toString(freeClusterResources.nodeManagersFree)   noteRsc);
     		}
     		// allocate TaskManagers
     		for (int i = 0; i < taskManagerCount; i  ) {
     			if (!allocateResource(nmFree, taskManagerMemoryMb)) {
     				LOG.warn("There is not enough memory available in the YARN cluster. "  
     					"The TaskManager(s) require "   taskManagerMemoryMb   "MB each. "  
    -					"NodeManagers available: "   Arrays.toString(freeClusterMem.nodeManagersFree)  
"\n"  
     					"NodeManagers available: "   Arrays.toString(freeClusterResources.nodeManagersFree)
  "\n"  
     					"After allocating the JobManager ("   jobManagerMemoryMb   "MB) and ("   i   "/"
  taskManagerCount   ") TaskManagers, "  
     					"the following NodeManagers are available: "   Arrays.toString(nmFree)    noteRsc);
     			}
     		}
     
    -		ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
     		return new ClusterSpecification(
     			jobManagerMemoryMb,
     			taskManagerMemoryMb,
     			clusterSpecification.getNumberTaskManagers(),
     			clusterSpecification.getSlotsPerTaskManager());
     
     	}
     
     	protected void logClusterSpecification(ClusterSpecification clusterSpecification) {
    --- End diff --
    
    I would remove this method as it is only used once and isn't really more readable than
the inline alternative.


> Make ClusterDescriptor independent of Flink cluster size
> --------------------------------------------------------
>
>                 Key: FLINK-7113
>                 URL: https://issues.apache.org/jira/browse/FLINK-7113
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> The {{ClusterDescriptor}} needs to know the size of the Flink cluster it is supposed
to deploy. As a consequence we have the {{AbstractYarnClusterDescriptor}} which is configured
with this information via setters. I think it would be better to give the cluster size to
the {{ClusterDescriptor}} via the {{deploySession(ClusterSpecification)}} call. That way we
better decouple the deployment from the cluster configuration.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message