Author: omalley
Date: Tue Mar 8 05:52:53 2011
New Revision: 1079182
URL: http://svn.apache.org/viewvc?rev=1079182&view=rev
Log:
commit 131eca2afd98133ba90d5495e3875afad4d36914
Author: Mahadev Konar <mahadev@apache.org>
Date: Mon Nov 1 18:43:56 2010 +0000
Changes to Hadoop trunk for making client protocol pluggable. (mahadev) from
+++ b/YAHOO-CHANGES.txt
+ Please make it clear why the job failed (Krishna Ramachandran via mahadev)
+ from
+
+ Changes to Hadoop trunk for making client protocol pluggable. (mahadev)
+ from
Added:
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/ClientFactory.java
Modified:
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java
Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/ClientFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/ClientFactory.java?rev=1079182&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/ClientFactory.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/ClientFactory.java
Tue Mar 8 05:52:53 2011
@@ -0,0 +1,69 @@
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Class to instantiate ClientProtocol proxy handle.
+ *
+ */
+public abstract class ClientFactory {
+
+ @SuppressWarnings("unchecked")
+ public static ClientProtocol create(Configuration conf) throws IOException {
+ Class<ClientFactory> factory = (Class<ClientFactory>) conf.getClass(
+ "mapreduce.clientfactory.class.name",
+ DefaultClientFactory.class);
+ try {
+ return factory.newInstance().createClient(conf);
+ } catch (Exception e) {
+ throw new IOException("could not create ClientProtocol", e);
+ }
+ }
+
+ protected abstract ClientProtocol createClient(Configuration conf)
+ throws IOException;
+
+ //the default factory handles the backward compatibility
+ public static class DefaultClientFactory extends ClientFactory {
+
+ @Override
+ protected ClientProtocol createClient(Configuration conf)
+ throws IOException {
+ String tracker = conf.get("mapreduce.jobtracker.address");
+ if ("local".equals(tracker)) {
+ return createLocalClient(conf);
+ } else {
+ return createJTClient(conf);
+ }
+ }
+ }
+
+ public ClientProtocol createLocalClient(Configuration conf)
+ throws IOException {
+ conf.setInt("mapreduce.job.maps", 1);
+ return new LocalJobRunner(conf);
+ }
+
+ public ClientProtocol createJTClient(Configuration conf) throws IOException {
+ return createJTClient(JobTracker.getAddress(conf), conf);
+ }
+
+ public ClientProtocol createJTClient(InetSocketAddress addr,
+ Configuration conf) throws IOException {
+ return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+ ClientProtocol.versionID, addr,
+ UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getSocketFactory(conf, ClientProtocol.class));
+ }
+}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1079182&r1=1079181&r2=1079182&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java
(original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/Cluster.java
Tue Mar 8 05:52:53 2011
@@ -24,6 +24,8 @@ import java.security.PrivilegedException
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -35,10 +37,12 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.ClientFactory.DefaultClientFactory;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.State;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
@@ -52,6 +56,7 @@ import org.apache.hadoop.security.token.
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Cluster {
+ private static final Log LOG = LogFactory.getLog(Cluster.class);
private ClientProtocol client;
private UserGroupInformation ugi;
private Configuration conf;
@@ -67,34 +72,17 @@ public class Cluster {
public Cluster(Configuration conf) throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
- client = createClient(conf);
+ client = ClientFactory.create(conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
- client = createRPCProxy(jobTrackAddr, conf);
+ client = (new DefaultClientFactory()).createJTClient(jobTrackAddr,
+ conf);
}
- private ClientProtocol createRPCProxy(InetSocketAddress addr,
- Configuration conf) throws IOException {
- return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
- ClientProtocol.versionID, addr, ugi, conf,
- NetUtils.getSocketFactory(conf, ClientProtocol.class));
- }
-
- private ClientProtocol createClient(Configuration conf) throws IOException {
- ClientProtocol client;
- String tracker = conf.get("mapreduce.jobtracker.address", "local");
- if ("local".equals(tracker)) {
- conf.setInt("mapreduce.job.maps", 1);
- client = new LocalJobRunner(conf);
- } else {
- client = createRPCProxy(JobTracker.getAddress(conf), conf);
- }
- return client;
- }
ClientProtocol getClient() {
return client;
|