Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Mar 31 22:23:22 2011
@@ -27,12 +27,13 @@ import java.util.Map;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import static org.apache.hadoop.yarn.util.StringHelper.*;
@@ -44,60 +45,60 @@ public class MRApps extends Apps {
public static final String TASK = "task";
public static final String ATTEMPT = "attempt";
- public static String toString(JobID jid) {
- return _join(JOB, jid.appID.clusterTimeStamp, jid.appID.id, jid.id);
+ public static String toString(JobId jid) {
+ return _join(JOB, jid.getAppId().getClusterTimestamp(), jid.getAppId().getId(), jid.getId());
}
- public static JobID toJobID(String jid) {
+ public static JobId toJobID(String jid) {
Iterator<String> it = _split(jid).iterator();
return toJobID(JOB, jid, it);
}
// mostly useful for parsing task/attempt id like strings
- public static JobID toJobID(String prefix, String s, Iterator<String> it) {
- ApplicationID appID = toAppID(prefix, s, it);
+ public static JobId toJobID(String prefix, String s, Iterator<String> it) {
+ ApplicationId appId = toAppID(prefix, s, it);
shouldHaveNext(prefix, s, it);
- JobID jobID = new JobID();
- jobID.appID = appID;
- jobID.id = Integer.parseInt(it.next());
- return jobID;
+ JobId jobId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
+ jobId.setAppId(appId);
+ jobId.setId(Integer.parseInt(it.next()));
+ return jobId;
}
- public static String toString(TaskID tid) {
- return _join("task", tid.jobID.appID.clusterTimeStamp, tid.jobID.appID.id,
- tid.jobID.id, taskSymbol(tid.taskType), tid.id);
+ public static String toString(TaskId tid) {
+ return _join("task", tid.getJobId().getAppId().getClusterTimestamp(), tid.getJobId().getAppId().getId(),
+ tid.getJobId().getId(), taskSymbol(tid.getTaskType()), tid.getId());
}
- public static TaskID toTaskID(String tid) {
+ public static TaskId toTaskID(String tid) {
Iterator<String> it = _split(tid).iterator();
return toTaskID(TASK, tid, it);
}
- public static TaskID toTaskID(String prefix, String s, Iterator<String> it) {
- JobID jid = toJobID(prefix, s, it);
+ public static TaskId toTaskID(String prefix, String s, Iterator<String> it) {
+ JobId jid = toJobID(prefix, s, it);
shouldHaveNext(prefix, s, it);
- TaskID tid = new TaskID();
- tid.jobID = jid;
- tid.taskType = taskType(it.next());
+ TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
+ tid.setJobId(jid);
+ tid.setTaskType(taskType(it.next()));
shouldHaveNext(prefix, s, it);
- tid.id = Integer.parseInt(it.next());
+ tid.setId(Integer.parseInt(it.next()));
return tid;
}
- public static String toString(TaskAttemptID taid) {
- return _join("attempt", taid.taskID.jobID.appID.clusterTimeStamp,
- taid.taskID.jobID.appID.id, taid.taskID.jobID.id,
- taskSymbol(taid.taskID.taskType), taid.taskID.id, taid.id);
+ public static String toString(TaskAttemptId taid) {
+ return _join("attempt", taid.getTaskId().getJobId().getAppId().getClusterTimestamp(),
+ taid.getTaskId().getJobId().getAppId().getId(), taid.getTaskId().getJobId().getId(),
+ taskSymbol(taid.getTaskId().getTaskType()), taid.getTaskId().getId(), taid.getId());
}
- public static TaskAttemptID toTaskAttemptID(String taid) {
+ public static TaskAttemptId toTaskAttemptID(String taid) {
Iterator<String> it = _split(taid).iterator();
- TaskID tid = toTaskID(ATTEMPT, taid, it);
+ TaskId tid = toTaskID(ATTEMPT, taid, it);
shouldHaveNext(ATTEMPT, taid, it);
- TaskAttemptID taID = new TaskAttemptID();
- taID.taskID = tid;
- taID.id = Integer.parseInt(it.next());
- return taID;
+ TaskAttemptId taId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
+ taId.setTaskId(tid);
+ taId.setId(Integer.parseInt(it.next()));
+ return taId;
}
public static String taskSymbol(TaskType type) {
@@ -116,7 +117,7 @@ public class MRApps extends Apps {
}
public static void setInitialClasspath(
- Map<CharSequence, CharSequence> environment) throws IOException {
+ Map<String, String> environment) throws IOException {
// Get yarn mapreduce-app classpath from generated classpath
// Works if compile time env is same as runtime. For e.g. tests.
@@ -151,8 +152,8 @@ public class MRApps extends Apps {
}
public static void addToClassPath(
- Map<CharSequence, CharSequence> environment, String fileName) {
- CharSequence classpath = environment.get(CLASSPATH);
+ Map<String, String> environment, String fileName) {
+ String classpath = environment.get(CLASSPATH);
if (classpath == null) {
classpath = fileName;
} else {
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRProtoUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRProtoUtils.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRProtoUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRProtoUtils.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,82 @@
+package org.apache.hadoop.mapreduce.v2.util;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobStateProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.PhaseProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptCompletionEventStatusProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptStateProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskStateProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskTypeProto;
+
+public class MRProtoUtils {
+
+ /*
+ * JobState
+ */
+ private static String JOB_STATE_PREFIX = "J_";
+ public static JobStateProto convertToProtoFormat(JobState e) {
+ return JobStateProto.valueOf(JOB_STATE_PREFIX + e.name());
+ }
+ public static JobState convertFromProtoFormat(JobStateProto e) {
+ return JobState.valueOf(e.name().replace(JOB_STATE_PREFIX, ""));
+ }
+
+ /*
+ * Phase
+ */
+ private static String PHASE_PREFIX = "P_";
+ public static PhaseProto convertToProtoFormat(Phase e) {
+ return PhaseProto.valueOf(PHASE_PREFIX + e.name());
+ }
+ public static Phase convertFromProtoFormat(PhaseProto e) {
+ return Phase.valueOf(e.name().replace(PHASE_PREFIX, ""));
+ }
+
+ /*
+ * TaskAttemptCompletionEventStatus
+ */
+ private static String TACE_PREFIX = "TACE_";
+ public static TaskAttemptCompletionEventStatusProto convertToProtoFormat(TaskAttemptCompletionEventStatus e) {
+ return TaskAttemptCompletionEventStatusProto.valueOf(TACE_PREFIX + e.name());
+ }
+ public static TaskAttemptCompletionEventStatus convertFromProtoFormat(TaskAttemptCompletionEventStatusProto e) {
+ return TaskAttemptCompletionEventStatus.valueOf(e.name().replace(TACE_PREFIX, ""));
+ }
+
+ /*
+ * TaskAttemptState
+ */
+ private static String TASK_ATTEMPT_STATE_PREFIX = "TA_";
+ public static TaskAttemptStateProto convertToProtoFormat(TaskAttemptState e) {
+ return TaskAttemptStateProto.valueOf(TASK_ATTEMPT_STATE_PREFIX + e.name());
+ }
+ public static TaskAttemptState convertFromProtoFormat(TaskAttemptStateProto e) {
+ return TaskAttemptState.valueOf(e.name().replace(TASK_ATTEMPT_STATE_PREFIX, ""));
+ }
+
+ /*
+ * TaskState
+ */
+ private static String TASK_STATE_PREFIX = "TS_";
+ public static TaskStateProto convertToProtoFormat(TaskState e) {
+ return TaskStateProto.valueOf(TASK_STATE_PREFIX + e.name());
+ }
+ public static TaskState convertFromProtoFormat(TaskStateProto e) {
+ return TaskState.valueOf(e.name().replace(TASK_STATE_PREFIX, ""));
+ }
+
+ /*
+ * TaskType
+ */
+ public static TaskTypeProto convertToProtoFormat(TaskType e) {
+ return TaskTypeProto.valueOf(e.name());
+ }
+ public static TaskType convertFromProtoFormat(TaskTypeProto e) {
+ return TaskType.valueOf(e.name());
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,20 @@
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "MRClientProtocol";
+option java_generic_services = true;
+
+import "mr_service_protos.proto";
+
+service MRClientProtocolService {
+ rpc getJobReport (GetJobReportRequestProto) returns (GetJobReportResponseProto);
+ rpc getTaskReport (GetTaskReportRequestProto) returns (GetTaskReportResponseProto);
+ rpc getTaskAttemptReport (GetTaskAttemptReportRequestProto) returns (GetTaskAttemptReportResponseProto);
+ rpc getCounters (GetCountersRequestProto) returns (GetCountersResponseProto);
+ rpc getTaskAttemptCompletionEvents (GetTaskAttemptCompletionEventsRequestProto) returns (GetTaskAttemptCompletionEventsResponseProto);
+ rpc getTaskReports (GetTaskReportsRequestProto) returns (GetTaskReportsResponseProto);
+ rpc getDiagnostics (GetDiagnosticsRequestProto) returns (GetDiagnosticsResponseProto);
+
+ rpc killJob (KillJobRequestProto) returns (KillJobResponseProto);
+ rpc killTask (KillTaskRequestProto) returns (KillTaskResponseProto);
+ rpc killTaskAttempt (KillTaskAttemptRequestProto) returns (KillTaskAttemptResponseProto);
+ rpc failTaskAttempt (FailTaskAttemptRequestProto) returns (FailTaskAttemptResponseProto);
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,149 @@
+option java_package = "org.apache.hadoop.mapreduce.v2.proto";
+option java_outer_classname = "MRProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+
+enum TaskTypeProto {
+ MAP = 1;
+ REDUCE = 2;
+}
+
+message JobIdProto {
+ optional ApplicationIdProto app_id = 1;
+ optional int32 id = 2;
+}
+
+message TaskIdProto {
+ optional JobIdProto job_id = 1;
+ optional TaskTypeProto task_type = 2;
+ optional int32 id = 3;
+}
+
+message TaskAttemptIdProto {
+ optional TaskIdProto task_id = 1;
+ optional int32 id = 2;
+}
+
+enum TaskStateProto {
+ TS_NEW = 1;
+ TS_SCHEDULED = 2;
+ TS_RUNNING = 3;
+ TS_SUCCEEDED = 4;
+ TS_FAILED = 5;
+ TS_KILL_WAIT = 6;
+ TS_KILLED = 7;
+}
+
+enum PhaseProto {
+ P_STARTING = 1;
+ P_MAP = 2;
+ P_SHUFFLE = 3;
+ P_SORT = 4;
+ P_REDUCE = 5;
+ P_CLEANUP = 6;
+}
+
+message CounterProto {
+ optional string name = 1;
+ optional string display_name = 2;
+ optional int64 value = 3;
+}
+
+message CounterGroupProto {
+ optional string name = 1;
+ optional string display_name = 2;
+ repeated StringCounterMapProto counters = 3;
+}
+
+message CountersProto {
+ repeated StringCounterGroupMapProto counter_groups = 1;
+}
+
+message TaskReportProto {
+ optional TaskIdProto task_id = 1;
+ optional TaskStateProto task_state = 2;
+ optional float progress = 3;
+ optional int64 start_time = 4;
+ optional int64 finish_time = 5;
+ optional CountersProto counters = 6;
+ repeated TaskAttemptIdProto running_attempts = 7;
+ optional TaskAttemptIdProto successful_attempt = 8;
+ repeated string diagnostics = 9;
+}
+
+enum TaskAttemptStateProto {
+ TA_NEW = 1;
+ TA_UNASSIGNED = 2;
+ TA_ASSIGNED = 3;
+ TA_RUNNING = 4;
+ TA_COMMIT_PENDING = 5;
+ TA_SUCCESS_CONTAINER_CLEANUP = 6;
+ TA_SUCCEEDED = 7;
+ TA_FAIL_CONTAINER_CLEANUP = 8;
+ TA_FAIL_TASK_CLEANUP = 9;
+ TA_FAILED = 10;
+ TA_KILL_CONTAINER_CLEANUP = 11;
+ TA_KILL_TASK_CLEANUP = 12;
+ TA_KILLED = 13;
+}
+
+message TaskAttemptReportProto {
+ optional TaskAttemptIdProto task_attempt_id = 1;
+ optional TaskAttemptStateProto task_attempt_state = 2;
+ optional float progress = 3;
+ optional int64 start_time = 4;
+ optional int64 finish_time = 5;
+ optional CountersProto counters = 6;
+ optional string diagnostic_info = 7;
+ optional string state_string = 8;
+ optional PhaseProto phase = 9;
+}
+
+enum JobStateProto {
+ J_NEW = 1;
+ J_RUNNING = 2;
+ J_SUCCEEDED = 3;
+ J_FAILED = 4;
+ J_KILL_WAIT = 5;
+ J_KILLED = 6;
+ J_ERROR = 7;
+}
+
+message JobReportProto {
+ optional JobIdProto job_id = 1;
+ optional JobStateProto job_state = 2;
+ optional float map_progress = 3;
+ optional float reduce_progress = 4;
+ optional float cleanup_progress = 5;
+ optional float setup_progress = 6;
+ optional int64 start_time = 7;
+ optional int64 finish_time = 8;
+}
+
+enum TaskAttemptCompletionEventStatusProto {
+ TACE_FAILED = 1;
+ TACE_KILLED = 2;
+ TACE_SUCCEEDED = 3;
+ TACE_OBSOLETE = 4;
+ TACE_TIPFAILED = 5;
+}
+
+message TaskAttemptCompletionEventProto {
+ optional TaskAttemptIdProto attempt_id = 1;
+ optional TaskAttemptCompletionEventStatusProto status = 2;
+ optional string map_output_server_address = 3;
+ optional int32 attempt_run_time = 4;
+ optional int32 event_id = 5;
+}
+
+message StringCounterMapProto {
+ optional string key = 1;
+ optional CounterProto value = 2;
+}
+
+message StringCounterGroupMapProto {
+ optional string key = 1;
+ optional CounterGroupProto value = 2;
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_service_protos.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_service_protos.proto (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_service_protos.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,83 @@
+option java_package = "org.apache.hadoop.mapreduce.v2.proto";
+option java_outer_classname = "MRServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "mr_protos.proto";
+
+message GetJobReportRequestProto {
+ optional JobIdProto job_id = 1;
+}
+message GetJobReportResponseProto {
+ optional JobReportProto job_report = 1;
+}
+
+message GetTaskReportRequestProto {
+ optional TaskIdProto task_id = 1;
+}
+message GetTaskReportResponseProto {
+ optional TaskReportProto task_report = 1;
+}
+
+message GetTaskAttemptReportRequestProto {
+ optional TaskAttemptIdProto task_attempt_id = 1;
+}
+message GetTaskAttemptReportResponseProto {
+ optional TaskAttemptReportProto task_attempt_report = 1;
+}
+
+message GetCountersRequestProto {
+ optional JobIdProto job_id = 1;
+}
+message GetCountersResponseProto {
+ optional CountersProto counters = 1;
+}
+
+message GetTaskAttemptCompletionEventsRequestProto {
+ optional JobIdProto job_id = 1;
+ optional int32 from_event_id = 2;
+ optional int32 max_events = 3;
+}
+message GetTaskAttemptCompletionEventsResponseProto {
+ repeated TaskAttemptCompletionEventProto completion_events = 1;
+}
+
+message GetTaskReportsRequestProto {
+ optional JobIdProto job_id = 1;
+ optional TaskTypeProto task_type = 2;
+}
+message GetTaskReportsResponseProto {
+ repeated TaskReportProto task_reports = 1;
+}
+
+message GetDiagnosticsRequestProto {
+ optional TaskAttemptIdProto task_attempt_id = 1;
+}
+message GetDiagnosticsResponseProto {
+ repeated string diagnostics = 1;
+}
+
+
+message KillJobRequestProto {
+ optional JobIdProto job_id = 1;
+}
+message KillJobResponseProto {
+}
+
+message KillTaskRequestProto {
+ optional TaskIdProto task_id = 1;
+}
+message KillTaskResponseProto {
+}
+
+message KillTaskAttemptRequestProto {
+ optional TaskAttemptIdProto task_attempt_id = 1;
+}
+message KillTaskAttemptResponseProto {
+}
+
+message FailTaskAttemptRequestProto {
+ optional TaskAttemptIdProto task_attempt_id = 1;
+}
+message FailTaskAttemptResponseProto {
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,180 @@
+package org.apache.hadoop.mapreduce.v2;
+
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.junit.Test;
+
+public class TestRPCFactories {
+
+
+
+ @Test
+ public void test() {
+ testPbServerFactory();
+
+ testPbClientFactory();
+ }
+
+
+
+ private void testPbServerFactory() {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ Configuration conf = new Configuration();
+ MRClientProtocol instance = new MRClientProtocolTestImpl();
+ Server server = null;
+ try {
+ server = RpcServerFactoryPBImpl.get().getServer(MRClientProtocol.class, instance, addr, conf, null);
+ server.start();
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to crete server");
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ private void testPbClientFactory() {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ System.err.println(addr.getHostName() + addr.getPort());
+ Configuration conf = new Configuration();
+ MRClientProtocol instance = new MRClientProtocolTestImpl();
+ Server server = null;
+ try {
+ server = RpcServerFactoryPBImpl.get().getServer(MRClientProtocol.class, instance, addr, conf, null);
+ server.start();
+ System.err.println(server.getListenerAddress());
+ System.err.println(NetUtils.getConnectAddress(server));
+
+ MRClientProtocol client = null;
+ try {
+ client = (MRClientProtocol) RpcClientFactoryPBImpl.get().getClient(MRClientProtocol.class, 1, NetUtils.getConnectAddress(server), conf);
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to crete client");
+ }
+
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to crete server");
+ } finally {
+ server.stop();
+ }
+ }
+
+
+ public class MRClientProtocolTestImpl implements MRClientProtocol {
+
+ @Override
+ public GetJobReportResponse getJobReport(GetJobReportRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetTaskAttemptReportResponse getTaskAttemptReport(
+ GetTaskAttemptReportRequest request) throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetCountersResponse getCounters(GetCountersRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+ GetTaskAttemptCompletionEventsRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public KillJobResponse killJob(KillJobRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public KillTaskResponse killTask(KillTaskRequest request)
+ throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public KillTaskAttemptResponse killTaskAttempt(
+ KillTaskAttemptRequest request) throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public FailTaskAttemptResponse failTaskAttempt(
+ FailTaskAttemptRequest request) throws YarnRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,37 @@
+package org.apache.hadoop.mapreduce.v2;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersRequestPBImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.CounterGroupPBImpl;
+import org.junit.Test;
+
+public class TestRecordFactory {
+
+ @Test
+ public void testPbRecordFactory() {
+ RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
+
+ try {
+ CounterGroup response = pbRecordFactory.newRecordInstance(CounterGroup.class);
+ Assert.assertEquals(CounterGroupPBImpl.class, response.getClass());
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to crete record");
+ }
+
+ try {
+ GetCountersRequest response = pbRecordFactory.newRecordInstance(GetCountersRequest.class);
+ Assert.assertEquals(GetCountersRequestPBImpl.class, response.getClass());
+ } catch (YarnException e) {
+ e.printStackTrace();
+ Assert.fail("Failed to crete record");
+ }
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Mar 31 22:23:22 2011
@@ -18,13 +18,14 @@
package org.apache.hadoop.mapreduce.v2.util;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -32,42 +33,48 @@ import static org.junit.Assert.*;
public class TestMRApps {
@Test public void testJobIDtoString() {
- JobID jid = new JobID();
- jid.appID = new ApplicationID();
+ JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
+ jid.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
assertEquals("job_0_0_0", MRApps.toString(jid));
}
@Test public void testToJobID() {
- JobID jid = MRApps.toJobID("job_1_1_1");
- assertEquals(1, jid.appID.clusterTimeStamp);
- assertEquals(1, jid.appID.id);
- assertEquals(1, jid.id);
+ JobId jid = MRApps.toJobID("job_1_1_1");
+ assertEquals(1, jid.getAppId().getClusterTimestamp());
+ assertEquals(1, jid.getAppId().getId());
+ assertEquals(1, jid.getId());
}
@Test(expected=YarnException.class) public void testJobIDShort() {
MRApps.toJobID("job_0_0");
}
+ //TODO_get.set
@Test public void testTaskIDtoString() {
- TaskID tid = new TaskID();
- tid.jobID = new JobID();
- tid.jobID.appID = new ApplicationID();
- tid.taskType = TaskType.MAP;
+ TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
+ tid.setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
+ tid.getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
+ tid.setTaskType(TaskType.MAP);
+ TaskType type = tid.getTaskType();
+ System.err.println(type);
+ type = TaskType.REDUCE;
+ System.err.println(type);
+ System.err.println(tid.getTaskType());
assertEquals("task_0_0_0_m_0", MRApps.toString(tid));
- tid.taskType = TaskType.REDUCE;
+ tid.setTaskType(TaskType.REDUCE);
assertEquals("task_0_0_0_r_0", MRApps.toString(tid));
}
@Test public void testToTaskID() {
- TaskID tid = MRApps.toTaskID("task_1_2_3_r_4");
- assertEquals(1, tid.jobID.appID.clusterTimeStamp);
- assertEquals(2, tid.jobID.appID.id);
- assertEquals(3, tid.jobID.id);
- assertEquals(TaskType.REDUCE, tid.taskType);
- assertEquals(4, tid.id);
+ TaskId tid = MRApps.toTaskID("task_1_2_3_r_4");
+ assertEquals(1, tid.getJobId().getAppId().getClusterTimestamp());
+ assertEquals(2, tid.getJobId().getAppId().getId());
+ assertEquals(3, tid.getJobId().getId());
+ assertEquals(TaskType.REDUCE, tid.getTaskType());
+ assertEquals(4, tid.getId());
tid = MRApps.toTaskID("task_1_2_3_m_4");
- assertEquals(TaskType.MAP, tid.taskType);
+ assertEquals(TaskType.MAP, tid.getTaskType());
}
@Test(expected=YarnException.class) public void testTaskIDShort() {
@@ -78,22 +85,23 @@ public class TestMRApps {
MRApps.toTaskID("task_0_0_0_x_0");
}
+ //TODO_get.set
@Test public void testTaskAttemptIDtoString() {
- TaskAttemptID taid = new TaskAttemptID();
- taid.taskID = new TaskID();
- taid.taskID.taskType = TaskType.MAP;
- taid.taskID.jobID = new JobID();
- taid.taskID.jobID.appID = new ApplicationID();
+ TaskAttemptId taid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
+ taid.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
+ taid.getTaskId().setTaskType(TaskType.MAP);
+ taid.getTaskId().setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
+ taid.getTaskId().getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
assertEquals("attempt_0_0_0_m_0_0", MRApps.toString(taid));
}
@Test public void testToTaskAttemptID() {
- TaskAttemptID taid = MRApps.toTaskAttemptID("attempt_0_1_2_m_3_4");
- assertEquals(0, taid.taskID.jobID.appID.clusterTimeStamp);
- assertEquals(1, taid.taskID.jobID.appID.id);
- assertEquals(2, taid.taskID.jobID.id);
- assertEquals(3, taid.taskID.id);
- assertEquals(4, taid.id);
+ TaskAttemptId taid = MRApps.toTaskAttemptID("attempt_0_1_2_m_3_4");
+ assertEquals(0, taid.getTaskId().getJobId().getAppId().getClusterTimestamp());
+ assertEquals(1, taid.getTaskId().getJobId().getAppId().getId());
+ assertEquals(2, taid.getTaskId().getJobId().getId());
+ assertEquals(3, taid.getTaskId().getId());
+ assertEquals(4, taid.getId());
}
@Test(expected=YarnException.class) public void testTaskAttemptIDShort() {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Thu Mar 31 22:23:22 2011
@@ -36,16 +36,18 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.JobState;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
/**
* Loads the basic job level data upfront.
@@ -56,20 +58,20 @@ public class CompletedJob implements org
static final Log LOG = LogFactory.getLog(CompletedJob.class);
private final Counters counters;
private final Configuration conf;
- private final JobID jobID;
+ private final JobId jobId;
private final List<String> diagnostics = new ArrayList<String>();
private final JobReport report;
- private final Map<TaskID, Task> tasks = new HashMap<TaskID, Task>();
- private final Map<TaskID, Task> mapTasks = new HashMap<TaskID, Task>();
- private final Map<TaskID, Task> reduceTasks = new HashMap<TaskID, Task>();
+ private final Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+ private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
+ private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
private TaskAttemptCompletionEvent[] completionEvents;
private JobInfo jobInfo;
- public CompletedJob(Configuration conf, JobID jobID) throws IOException {
+ public CompletedJob(Configuration conf, JobId jobId) throws IOException {
this.conf = conf;
- this.jobID = jobID;
+ this.jobId = jobId;
//TODO fix
/*
String doneLocation =
@@ -97,11 +99,11 @@ public class CompletedJob implements org
counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
diagnostics.add(jobInfo.getErrorInfo());
- report = new JobReport();
- report.id = jobID;
- report.state = JobState.valueOf(jobInfo.getJobStatus());
- report.startTime = jobInfo.getLaunchTime();
- report.finishTime = jobInfo.getFinishTime();
+ report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
+ report.setJobId(jobId);
+ report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
+ report.setStartTime(jobInfo.getLaunchTime());
+ report.setFinishTime(jobInfo.getFinishTime());
}
@Override
@@ -120,8 +122,8 @@ public class CompletedJob implements org
}
@Override
- public JobID getID() {
- return jobID;
+ public JobId getID() {
+ return jobId;
}
@Override
@@ -131,12 +133,12 @@ public class CompletedJob implements org
@Override
public JobState getState() {
- return report.state;
+ return report.getJobState();
}
@Override
- public Task getTask(TaskID taskID) {
- return tasks.get(taskID);
+ public Task getTask(TaskId taskId) {
+ return tasks.get(taskId);
}
@Override
@@ -146,7 +148,7 @@ public class CompletedJob implements org
}
@Override
- public Map<TaskID, Task> getTasks() {
+ public Map<TaskId, Task> getTasks() {
return tasks;
}
@@ -159,7 +161,7 @@ public class CompletedJob implements org
if (user == null) {
LOG.error("user null is not allowed");
}
- String jobName = TypeConverter.fromYarn(jobID).toString();
+ String jobName = TypeConverter.fromYarn(jobId).toString();
String defaultDoneDir = conf.get(
YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
String jobhistoryDir =
@@ -186,7 +188,7 @@ public class CompletedJob implements org
// populate the tasks
for (Map.Entry<org.apache.hadoop.mapreduce.TaskID, TaskInfo> entry : jobInfo
.getAllTasks().entrySet()) {
- TaskID yarnTaskID = TypeConverter.toYarn(entry.getKey());
+ TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
TaskInfo taskInfo = entry.getValue();
Task task = new CompletedTask(yarnTaskID, taskInfo);
tasks.put(yarnTaskID, task);
@@ -207,7 +209,7 @@ public class CompletedJob implements org
}
@Override
- public CharSequence getName() {
+ public String getName() {
return jobInfo.getJobname();
}
@@ -222,7 +224,7 @@ public class CompletedJob implements org
}
@Override
- public Map<TaskID, Task> getTasks(TaskType taskType) {
+ public Map<TaskId, Task> getTasks(TaskType taskType) {
if (TaskType.MAP.equals(taskType)) {
return mapTasks;
} else {//we have only two type of tasks
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java Thu Mar 31 22:23:22 2011
@@ -27,14 +27,15 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class CompletedTask implements Task {
@@ -44,15 +45,15 @@ public class CompletedTask implements Ta
private final long startTime;
private final long finishTime;
private final TaskState state;
- private final TaskID taskID;
+ private final TaskId taskId;
private final TaskReport report;
- private final Map<TaskAttemptID, TaskAttempt> attempts =
- new LinkedHashMap<TaskAttemptID, TaskAttempt>();
+ private final Map<TaskAttemptId, TaskAttempt> attempts =
+ new LinkedHashMap<TaskAttemptId, TaskAttempt>();
private static final Log LOG = LogFactory.getLog(CompletedTask.class);
- CompletedTask(TaskID taskID, TaskInfo taskinfo) {
- this.taskID = taskID;
+ CompletedTask(TaskId taskId, TaskInfo taskinfo) {
+ this.taskId = taskId;
this.startTime = taskinfo.getStartTime();
this.finishTime = taskinfo.getFinishTime();
this.type = TypeConverter.toYarn(taskinfo.getTaskType());
@@ -61,35 +62,34 @@ public class CompletedTask implements Ta
this.state = TaskState.valueOf(taskinfo.getTaskStatus());
for (TaskAttemptInfo attemptHistory :
taskinfo.getAllTaskAttempts().values()) {
- CompletedTaskAttempt attempt = new CompletedTaskAttempt(taskID,
+ CompletedTaskAttempt attempt = new CompletedTaskAttempt(taskId,
attemptHistory);
attempts.put(attempt.getID(), attempt);
}
- report = new TaskReport();
- report.id = taskID;
- report.startTime = startTime;
- report.finishTime = finishTime;
- report.state = state;
- report.progress = getProgress();
- report.counters = getCounters();
- report.runningAttempts = new ArrayList<TaskAttemptID>();
- report.runningAttempts.addAll(attempts.keySet());
+ report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskReport.class);
+ report.setTaskId(taskId);
+ report.setStartTime(startTime);
+ report.setFinishTime(finishTime);
+ report.setTaskState(state);
+ report.setProgress(getProgress());
+ report.setCounters(getCounters());
+ report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
//report.successfulAttempt = taskHistory.; //TODO
}
@Override
- public boolean canCommit(TaskAttemptID taskAttemptID) {
+ public boolean canCommit(TaskAttemptId taskAttemptID) {
return false;
}
@Override
- public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+ public TaskAttempt getAttempt(TaskAttemptId attemptID) {
return attempts.get(attemptID);
}
@Override
- public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+ public Map<TaskAttemptId, TaskAttempt> getAttempts() {
return attempts;
}
@@ -99,8 +99,8 @@ public class CompletedTask implements Ta
}
@Override
- public TaskID getID() {
- return taskID;
+ public TaskId getID() {
+ return taskId;
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Thu Mar 31 22:23:22 2011
@@ -23,24 +23,25 @@ import java.util.List;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class CompletedTaskAttempt implements TaskAttempt {
private final TaskAttemptInfo attemptInfo;
- private final TaskAttemptID attemptId;
+ private final TaskAttemptId attemptId;
private final Counters counters;
private final TaskAttemptState state;
private final TaskAttemptReport report;
- private final List<CharSequence> diagnostics = new ArrayList<CharSequence>();
+ private final List<String> diagnostics = new ArrayList<String>();
- CompletedTaskAttempt(TaskID taskID, TaskAttemptInfo attemptInfo) {
+ CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) {
this.attemptInfo = attemptInfo;
this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId());
this.counters = TypeConverter.toYarn(
@@ -51,20 +52,21 @@ public class CompletedTaskAttempt implem
diagnostics.add(attemptInfo.getError());
}
- report = new TaskAttemptReport();
- report.id = attemptId;
- report.state = state;
- report.progress = getProgress();
- report.startTime = attemptInfo.getStartTime();
- report.finishTime = attemptInfo.getFinishTime();
- report.diagnosticInfo = attemptInfo.getError();
+ report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptReport.class);
+ report.setTaskAttemptId(attemptId);
+ report.setTaskAttemptState(state);
+ report.setProgress(getProgress());
+ report.setStartTime(attemptInfo.getStartTime());
+
+ report.setFinishTime(attemptInfo.getFinishTime());
+ report.setDiagnosticInfo(attemptInfo.getError());
//result.phase = attemptInfo.get;//TODO
- report.stateString = state.toString();
- report.counters = getCounters();
+ report.setStateString(state.toString());
+ report.setCounters(getCounters());
}
@Override
- public ContainerID getAssignedContainerID() {
+ public ContainerId getAssignedContainerID() {
// TODO Auto-generated method stub
return null;
}
@@ -80,7 +82,7 @@ public class CompletedTaskAttempt implem
}
@Override
- public TaskAttemptID getID() {
+ public TaskAttemptId getID() {
return attemptId;
}
@@ -105,17 +107,17 @@ public class CompletedTaskAttempt implem
}
@Override
- public List<CharSequence> getDiagnostics() {
+ public List<String> getDiagnostics() {
return diagnostics;
}
@Override
public long getLaunchTime() {
- return report.startTime;
+ return report.getStartTime();
}
@Override
public long getFinishTime() {
- return report.finishTime;
+ return report.getFinishTime();
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Thu Mar 31 22:23:22 2011
@@ -18,38 +18,54 @@
package org.apache.hadoop.mapreduce.v2.hs;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.webapp.WebApp;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
/**
@@ -110,92 +126,115 @@ public class HistoryClientService extend
private class MRClientProtocolHandler implements MRClientProtocol {
- private Job getJob(JobID jobID) throws AvroRemoteException {
- Job job = history.getJob(jobID);
+ private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ private Job getJob(JobId jobId) throws YarnRemoteException {
+ Job job = history.getJob(jobId);
if (job == null) {
- throw RPCUtil.getRemoteException("Unknown job " + jobID);
+ throw RPCUtil.getRemoteException("Unknown job " + jobId);
}
return job;
}
@Override
- public Counters getCounters(JobID jobID) throws AvroRemoteException {
- Job job = getJob(jobID);
- return job.getCounters();
+ public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ Job job = getJob(jobId);
+ GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
+ response.setCounters(job.getCounters());
+ return response;
}
-
+
@Override
- public JobReport getJobReport(JobID jobID) throws AvroRemoteException {
- Job job = getJob(jobID);
- return job.getReport();
+ public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ Job job = getJob(jobId);
+ GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
+ response.setJobReport(job.getReport());
+ return response;
}
@Override
- public TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID)
- throws AvroRemoteException {
- Job job = getJob(taskAttemptID.taskID.jobID);
- return job.getTask(taskAttemptID.taskID).
- getAttempt(taskAttemptID).getReport();
+ public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException {
+ TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+ Job job = getJob(taskAttemptId.getTaskId().getJobId());
+ GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
+ response.setTaskAttemptReport(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getReport());
+ return response;
}
@Override
- public TaskReport getTaskReport(TaskID taskID) throws AvroRemoteException {
- Job job = getJob(taskID.jobID);
- return job.getTask(taskID).getReport();
+ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException {
+ TaskId taskId = request.getTaskId();
+ Job job = getJob(taskId.getJobId());
+ GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
+ response.setTaskReport(job.getTask(taskId).getReport());
+ return response;
}
@Override
- public List<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(
- JobID jobID,
- int fromEventId, int maxEvents) throws AvroRemoteException {
- Job job = getJob(jobID);
- return Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId,
- maxEvents));
+ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ int fromEventId = request.getFromEventId();
+ int maxEvents = request.getMaxEvents();
+
+ Job job = getJob(jobId);
+
+ GetTaskAttemptCompletionEventsResponse response = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+ response.addAllCompletionEvents(Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
+ return response;
}
-
+
@Override
- public Void killJob(JobID jobID) throws AvroRemoteException {
+ public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
throw RPCUtil.getRemoteException("Invalid operation on completed job");
}
-
+
@Override
- public Void killTask(TaskID taskID) throws AvroRemoteException {
- getJob(taskID.jobID);
+ public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException {
+ TaskId taskId = request.getTaskId();
+ getJob(taskId.getJobId());
throw RPCUtil.getRemoteException("Invalid operation on completed job");
}
-
+
@Override
- public Void killTaskAttempt(TaskAttemptID taskAttemptID)
- throws AvroRemoteException {
- getJob(taskAttemptID.taskID.jobID);
+ public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException {
+ TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+ getJob(taskAttemptId.getTaskId().getJobId());
throw RPCUtil.getRemoteException("Invalid operation on completed job");
}
@Override
- public Void failTaskAttempt(TaskAttemptID taskAttemptID)
- throws AvroRemoteException {
- getJob(taskAttemptID.taskID.jobID);
- throw RPCUtil.getRemoteException("Invalid operation on completed job");
+ public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException {
+ TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+
+ Job job = getJob(taskAttemptId.getTaskId().getJobId());
+
+ GetDiagnosticsResponse response = recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+ response.addAllDiagnostics(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getDiagnostics());
+ return response;
}
- @Override
- public List<CharSequence> getDiagnostics(TaskAttemptID taskAttemptID)
- throws AvroRemoteException {
- Job job = getJob(taskAttemptID.taskID.jobID);
- return job.getTask(taskAttemptID.taskID).
- getAttempt(taskAttemptID).getDiagnostics();
+ @Override
+ public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException {
+ TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+ getJob(taskAttemptId.getTaskId().getJobId());
+ throw RPCUtil.getRemoteException("Invalid operation on completed job");
}
@Override
- public List<TaskReport> getTaskReports(JobID jobID, TaskType taskType)
- throws AvroRemoteException {
- Job job = getJob(jobID);
- List<TaskReport> reports = new ArrayList<TaskReport>();
+ public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ TaskType taskType = request.getTaskType();
+
+ GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+ Job job = getJob(jobId);
Collection<Task> tasks = job.getTasks(taskType).values();
for (Task task : tasks) {
- reports.add(task.getReport());
+ response.addTaskReport(task.getReport());
}
- return reports;
+ return response;
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java Thu Mar 31 22:23:22 2011
@@ -21,13 +21,13 @@ package org.apache.hadoop.mapreduce.v2.h
import java.util.Map;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
public interface HistoryContext {
- Job getJob(JobID id);
+ Job getJob(JobId id);
- Map<JobID, Job> getAllJobs(ApplicationID appID);
+ Map<JobId, Job> getAllJobs(ApplicationId appID);
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Thu Mar 31 22:23:22 2011
@@ -27,20 +27,20 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
/*
* Loads and manages the Job history cache.
*/
public class JobHistory implements HistoryContext {
- private Map<JobID, Job> completedJobCache =
- new ConcurrentHashMap<JobID, Job>();
+ private Map<JobId, Job> completedJobCache =
+ new ConcurrentHashMap<JobId, Job>();
private Configuration conf;
private final LinkedList<Job> jobQ = new LinkedList<Job>();
private static final Log LOG = LogFactory.getLog(JobHistory.class);
@@ -51,16 +51,16 @@ public class JobHistory implements Histo
this.conf = conf;
}
@Override
- public synchronized Job getJob(JobID jobID) {
- Job job = completedJobCache.get(jobID);
+ public synchronized Job getJob(JobId jobId) {
+ Job job = completedJobCache.get(jobId);
if (job == null) {
try {
- job = new CompletedJob(conf, jobID);
+ job = new CompletedJob(conf, jobId);
} catch (IOException e) {
LOG.warn("HistoryContext getJob failed " + e);
throw new YarnException(e);
}
- completedJobCache.put(jobID, job);
+ completedJobCache.put(jobId, job);
jobQ.add(job);
if (jobQ.size() > retiredJobsCacheSize) {
Job removed = jobQ.remove();
@@ -71,11 +71,11 @@ public class JobHistory implements Histo
}
@Override
- public Map<JobID, Job> getAllJobs(ApplicationID appID) {
+ public Map<JobId, Job> getAllJobs(ApplicationId appID) {
//currently there is 1 to 1 mapping between app and job id
org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
- Map<JobID, Job> jobs = new HashMap<JobID, Job>();
- JobID jobID = TypeConverter.toYarn(oldJobID);
+ Map<JobId, Job> jobs = new HashMap<JobId, Job>();
+ JobId jobID = TypeConverter.toYarn(oldJobID);
jobs.put(jobID, getJob(jobID));
return jobs;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Thu Mar 31 22:23:22 2011
@@ -25,17 +25,19 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobState;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -53,7 +55,7 @@ public class TestJobHistoryEvents {
MRApp app = new MRApp(2, 1, true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
- JobID jobId = job.getID();
+ JobId jobId = job.getID();
LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
app.waitForState(job, JobState.SUCCEEDED);
/*
@@ -66,16 +68,16 @@ public class TestJobHistoryEvents {
parsedJob.getCompletedMaps());
- Map<TaskID, Task> tasks = parsedJob.getTasks();
+ Map<TaskId, Task> tasks = parsedJob.getTasks();
Assert.assertEquals("No of tasks not correct", 3, tasks.size());
for (Task task : tasks.values()) {
verifyTask(task);
}
- Map<TaskID, Task> maps = parsedJob.getTasks(TaskType.MAP);
+ Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP);
Assert.assertEquals("No of maps not correct", 2, maps.size());
- Map<TaskID, Task> reduces = parsedJob.getTasks(TaskType.REDUCE);
+ Map<TaskId, Task> reduces = parsedJob.getTasks(TaskType.REDUCE);
Assert.assertEquals("No of reduces not correct", 1, reduces.size());
@@ -89,7 +91,7 @@ public class TestJobHistoryEvents {
private void verifyTask(Task task) {
Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED,
task.getState());
- Map<TaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+ Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
Assert.assertEquals("No of attempts not correct", 1, attempts.size());
for (TaskAttempt attempt : attempts.values()) {
verifyAttempt(attempt);
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Mar 31 22:23:22 2011
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.List;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -34,18 +33,27 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationState;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
@@ -55,27 +63,28 @@ public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
private Configuration conf;
- private ApplicationID currentAppId;
+ private ApplicationId currentAppId;
private final ResourceMgrDelegate rm;
private MRClientProtocol realProxy = null;
private String serviceAddr = "";
private String serviceHttpAddr = "";
+ private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm) {
this.conf = conf;
this.rm = rm;
}
- private MRClientProtocol getProxy(JobID jobId) throws AvroRemoteException {
- return getProxy(TypeConverter.toYarn(jobId).appID, false);
+ private MRClientProtocol getProxy(JobID jobId) throws YarnRemoteException {
+ return getProxy(TypeConverter.toYarn(jobId).getAppId(), false);
}
- private MRClientProtocol getRefreshedProxy(JobID jobId) throws AvroRemoteException {
- return getProxy(TypeConverter.toYarn(jobId).appID, true);
+ private MRClientProtocol getRefreshedProxy(JobID jobId) throws YarnRemoteException {
+ return getProxy(TypeConverter.toYarn(jobId).getAppId(), true);
}
- private MRClientProtocol getProxy(ApplicationID appId,
- boolean forceRefresh) throws AvroRemoteException {
+ private MRClientProtocol getProxy(ApplicationId appId,
+ boolean forceRefresh) throws YarnRemoteException {
if (!appId.equals(currentAppId) || forceRefresh) {
currentAppId = appId;
refreshProxy();
@@ -83,35 +92,35 @@ public class ClientServiceDelegate {
return realProxy;
}
- private void refreshProxy() throws AvroRemoteException {
+ private void refreshProxy() throws YarnRemoteException {
ApplicationMaster appMaster = rm.getApplicationMaster(currentAppId);
- if (ApplicationState.COMPLETED.equals(appMaster.state)) {
+ if (ApplicationState.COMPLETED.equals(appMaster.getState())) {
serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
LOG.info("Application state is completed. " +
"Redirecting to job history server " + serviceAddr);
//TODO:
serviceHttpAddr = "";
- } else if (ApplicationState.RUNNING.equals(appMaster.state)){
- serviceAddr = appMaster.host + ":" + appMaster.rpcPort;
- serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort;
+ } else if (ApplicationState.RUNNING.equals(appMaster.getState())){
+ serviceAddr = appMaster.getHost() + ":" + appMaster.getRpcPort();
+ serviceHttpAddr = appMaster.getHost() + ":" + appMaster.getHttpPort();
if (UserGroupInformation.isSecurityEnabled()) {
- String clientTokenEncoded = appMaster.clientToken.toString();
+ String clientTokenEncoded = appMaster.getClientToken();
Token<ApplicationTokenIdentifier> clientToken =
new Token<ApplicationTokenIdentifier>();
try {
clientToken.decodeFromUrlString(clientTokenEncoded);
- clientToken.setService(new Text(appMaster.host.toString() + ":"
- + appMaster.rpcPort));
+ clientToken.setService(new Text(appMaster.getHost() + ":"
+ + appMaster.getRpcPort()));
UserGroupInformation.getCurrentUser().addToken(clientToken);
} catch (IOException e) {
throw new YarnException(e);
}
}
} else {
- LOG.warn("Cannot connect to Application with state " + appMaster.state);
+ LOG.warn("Cannot connect to Application with state " + appMaster.getState());
throw new YarnException(
- "Cannot connect to Application with state " + appMaster.state);
+ "Cannot connect to Application with state " + appMaster.getState());
}
try {
instantiateProxy(serviceAddr);
@@ -138,16 +147,20 @@ public class ClientServiceDelegate {
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
InterruptedException {
- org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
try {
- return TypeConverter.fromYarn(getProxy(arg0).getCounters(jobID));
+ GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
+ request.setJobId(jobID);
+ return TypeConverter.fromYarn(getProxy(arg0).getCounters(request).getCounters());
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failing to contact application master", e);
try {
- return TypeConverter.fromYarn(getRefreshedProxy(arg0).getCounters(jobID));
+ GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
+ request.setJobId(jobID);
+ return TypeConverter.fromYarn(getRefreshedProxy(arg0).getCounters(request).getCounters());
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
@@ -162,26 +175,31 @@ public class ClientServiceDelegate {
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
int arg2) throws IOException, InterruptedException {
- org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
- List<org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent> list = null;
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
+ List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list = null;
+ GetTaskAttemptCompletionEventsRequest request = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
try {
- list = getProxy(arg0).getTaskAttemptCompletionEvents(jobID,
- arg1, arg2);
+ request.setJobId(jobID);
+ request.setFromEventId(arg1);
+ request.setMaxEvents(arg2);
+ list = getProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failed to contact application master ", e);
try {
- list = getRefreshedProxy(arg0).getTaskAttemptCompletionEvents(jobID,
- arg1, arg2);
+ request.setJobId(jobID);
+ request.setFromEventId(arg1);
+ request.setMaxEvents(arg2);
+ list = getRefreshedProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
}
}
return TypeConverter.fromYarn(
- list.toArray(new org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[0]));
+ list.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
}
public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
@@ -189,17 +207,20 @@ public class ClientServiceDelegate {
throws IOException,
InterruptedException {
- List<CharSequence> list = null;
- org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID = TypeConverter.toYarn(arg0);
+ List<String> list = null;
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(arg0);
+ GetDiagnosticsRequest request = recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
try {
- list = getProxy(arg0.getJobID()).getDiagnostics(attemptID);
+ request.setTaskAttemptId(attemptID);
+ list = getProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failed to contact application master ", e);
try {
- list = getRefreshedProxy(arg0.getJobID()).getDiagnostics(attemptID);
+ request.setTaskAttemptId(attemptID);
+ list = getRefreshedProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
@@ -207,28 +228,31 @@ public class ClientServiceDelegate {
}
String[] result = new String[list.size()];
int i = 0;
- for (CharSequence c : list) {
+ for (String c : list) {
result[i++] = c.toString();
}
return result;
}
public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
- AvroRemoteException {
- org.apache.hadoop.mapreduce.v2.api.JobID jobId =
+ YarnRemoteException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
LOG.debug("Getting Job status");
String stagingDir = conf.get("yarn.apps.stagingDir");
String jobFile = stagingDir + "/" + jobId.toString();
JobReport report = null;
+ GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
try {
- report = getProxy(oldJobID).getJobReport(jobId);
+ request.setJobId(jobId);
+ report = getProxy(oldJobID).getJobReport(request).getJobReport();
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch (Exception e) {
try {
- report = getRefreshedProxy(oldJobID).getJobReport(jobId);
+ request.setJobId(jobId);
+ report = getRefreshedProxy(oldJobID).getJobReport(request).getJobReport();
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
@@ -238,20 +262,23 @@ public class ClientServiceDelegate {
}
public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
- throws YarnRemoteException, AvroRemoteException {
- List<TaskReport> taskReports = null;
- org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID);
+ throws YarnRemoteException, YarnRemoteException {
+ List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = null;
+ org.apache.hadoop.mapreduce.v2.api.records.JobId nJobID = TypeConverter.toYarn(jobID);
+ GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
try {
- taskReports = getProxy(jobID).getTaskReports(nJobID,
- TypeConverter.toYarn(taskType));
+ request.setJobId(nJobID);
+ request.setTaskType(TypeConverter.toYarn(taskType));
+ taskReports = getProxy(jobID).getTaskReports(request).getTaskReportList();
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failed to contact application master ", e);
try {
- taskReports = getRefreshedProxy(jobID).getTaskReports(nJobID,
- TypeConverter.toYarn(taskType));
+ request.setJobId(nJobID);
+ request.setTaskType(TypeConverter.toYarn(taskType));
+ taskReports = getRefreshedProxy(jobID).getTaskReports(request).getTaskReportList();
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
@@ -262,17 +289,20 @@ public class ClientServiceDelegate {
}
public Void killJob(JobID jobID) throws YarnRemoteException,
- AvroRemoteException {
- org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID);
+ YarnRemoteException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId nJobID = TypeConverter.toYarn(jobID);
+ KillJobRequest request = recordFactory.newRecordInstance(KillJobRequest.class);
try {
- getProxy(jobID).killJob(nJobID);
+ request.setJobId(nJobID);
+ getProxy(jobID).killJob(request);
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
throw yre;
} catch(Exception e) {
LOG.debug("Failed to contact application master ", e);
try {
- getRefreshedProxy(jobID).killJob(nJobID);
+ request.setJobId(nJobID);
+ getRefreshedProxy(jobID).killJob(request);
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
throw yre;
@@ -282,14 +312,18 @@ public class ClientServiceDelegate {
}
public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
- throws YarnRemoteException, AvroRemoteException {
- org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID
+ throws YarnRemoteException {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
= TypeConverter.toYarn(taskAttemptID);
+ KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
+ FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
try {
if (fail) {
- getProxy(taskAttemptID.getJobID()).failTaskAttempt(attemptID);
+ failRequest.setTaskAttemptId(attemptID);
+ getProxy(taskAttemptID.getJobID()).failTaskAttempt(failRequest);
} else {
- getProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+ killRequest.setTaskAttemptId(attemptID);
+ getProxy(taskAttemptID.getJobID()).killTaskAttempt(killRequest);
}
} catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
LOG.warn(RPCUtil.toString(yre));
@@ -298,9 +332,11 @@ public class ClientServiceDelegate {
LOG.debug("Failed to contact application master ", e);
try {
if (fail) {
- getRefreshedProxy(taskAttemptID.getJobID()).failTaskAttempt(attemptID);
+ failRequest.setTaskAttemptId(attemptID);
+ getRefreshedProxy(taskAttemptID.getJobID()).failTaskAttempt(failRequest);
} else {
- getRefreshedProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+ killRequest.setTaskAttemptId(attemptID);
+ getRefreshedProxy(taskAttemptID.getJobID()).killTaskAttempt(killRequest);
}
} catch(YarnRemoteException yre) {
LOG.warn(RPCUtil.toString(yre));
|