helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [19/33] helix git commit: Add WorkflowType and JobType in WorkflowConfig and JobConfig
Date Wed, 17 Aug 2016 04:27:15 GMT
Add WorkflowType and JobType in WorkflowConfig and JobConfig

Add WorkflowType and JobType in WorkflowConfig and JobConfig

RB=710119
G=nuage-reviewers
R=lxia,cji
A=lxia


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/be78a4f1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/be78a4f1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/be78a4f1

Branch: refs/heads/helix-0.6.x
Commit: be78a4f14076736cca511b1d3116546162e026fc
Parents: 99a4008
Author: Junkai Xue <jxue@linkedin.com>
Authored: Thu Apr 21 13:04:22 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Tue Jul 5 15:03:23 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/JobConfig.java   | 34 +++++++++--
 .../org/apache/helix/task/WorkflowConfig.java   | 27 ++++++++-
 .../org/apache/helix/task/beans/JobBean.java    |  1 +
 .../apache/helix/task/beans/WorkflowBean.java   |  1 +
 .../task/TestJobAndWorkflowType.java            | 60 ++++++++++++++++++++
 5 files changed, 116 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/be78a4f1/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index ac086d3..1eeca60 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -113,7 +113,12 @@ public class JobConfig {
     /**
      * Disable external view (not showing) for this job resource
      */
-    DisableExternalView
+    DisableExternalView,
+
+    /**
+     * The type of the job
+     */
+    JobType
   }
 
   //Default property values
@@ -128,6 +133,7 @@ public class JobConfig {
 
   private final String _workflow;
   private final String _targetResource;
+  private final String _jobType;
   private final List<String> _targetPartitions;
   private final Set<String> _targetPartitionStates;
   private final String _command;
@@ -146,7 +152,8 @@ public class JobConfig {
       Set<String> targetPartitionStates, String command, Map<String, String>
jobCommandConfigMap,
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
-      boolean disableExternalView, boolean ignoreDependentJobFailure, Map<String, TaskConfig>
taskConfigMap) {
+      boolean disableExternalView, boolean ignoreDependentJobFailure,
+      Map<String, TaskConfig> taskConfigMap, String jobType) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -166,6 +173,7 @@ public class JobConfig {
     } else {
       _taskConfigMap = Collections.emptyMap();
     }
+    _jobType = jobType;
   }
 
   public String getWorkflow() {
@@ -267,9 +275,16 @@ public class JobConfig {
         "" + _numConcurrentTasksPerInstance);
     cfgMap.put(JobConfigProperty.IgnoreDependentJobFailure.name(),
         Boolean.toString(_ignoreDependentJobFailure));
+   if (_jobType != null) {
+     cfgMap.put(JobConfigProperty.JobType.name(), _jobType);
+   }
     return cfgMap;
   }
 
+  public String getJobType() {
+    return _jobType;
+  }
+
   public static JobConfig fromHelixProperty(HelixProperty property)
       throws IllegalArgumentException {
     Map<String, String> configs = property.getRecord().getSimpleFields();
@@ -282,6 +297,7 @@ public class JobConfig {
   public static class Builder {
     private String _workflow;
     private String _targetResource;
+    private String _jobType;
     private List<String> _targetPartitions;
     private Set<String> _targetPartitionStates;
     private String _command;
@@ -302,7 +318,7 @@ public class JobConfig {
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
-          _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap);
+          _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType);
     }
 
     /**
@@ -364,6 +380,9 @@ public class JobConfig {
         b.setIgnoreDependentJobFailure(
             Boolean.valueOf(cfg.get(JobConfigProperty.IgnoreDependentJobFailure.name())));
       }
+      if (cfg.containsKey(JobConfigProperty.JobType.name())) {
+        b.setJobType(cfg.get(JobConfigProperty.JobType.name()));
+      }
       return b;
     }
 
@@ -451,6 +470,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setJobType(String jobType) {
+      _jobType = jobType;
+      return this;
+    }
+
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
@@ -529,7 +553,9 @@ public class JobConfig {
         }
         b.addTaskConfigs(taskConfigs);
       }
-
+      if (jobBean.jobType != null) {
+        b.setJobType(jobBean.jobType);
+      }
       return b;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/be78a4f1/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
index cbd22cd..ddd37d5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java
@@ -56,7 +56,8 @@ public class WorkflowConfig {
     Terminable,
     FailureThreshold,
     /* this is for non-terminable workflow. */
-    capacity
+    capacity,
+    WorkflowType
   }
 
   /* Default values */
@@ -77,9 +78,11 @@ public class WorkflowConfig {
   private final ScheduleConfig _scheduleConfig;
   private final int _failureThreshold;
   private final int _capacity;
+  private final String _workflowType;
 
   protected WorkflowConfig(JobDag jobDag, int parallelJobs, TargetState targetState, long
expiry,
-      int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig, int capacity)
{
+      int failureThreshold, boolean terminable, ScheduleConfig scheduleConfig, int capacity,
+      String workflowType) {
     _jobDag = jobDag;
     _parallelJobs = parallelJobs;
     _targetState = targetState;
@@ -88,6 +91,7 @@ public class WorkflowConfig {
     _terminable = terminable;
     _scheduleConfig = scheduleConfig;
     _capacity = capacity;
+    _workflowType = workflowType;
   }
 
   public JobDag getJobDag() {
@@ -117,6 +121,10 @@ public class WorkflowConfig {
    */
   public int getCapacity() { return _capacity; }
 
+  public String getWorkflowType() {
+    return _workflowType;
+  }
+
   public boolean isTerminable() {
     return _terminable;
   }
@@ -184,6 +192,9 @@ public class WorkflowConfig {
             scheduleConfig.getRecurrenceInterval().toString());
       }
     }
+    if (_workflowType != null) {
+      cfgMap.put(WorkflowConfigProperty.WorkflowType.name(), _workflowType);
+    }
     return cfgMap;
   }
 
@@ -238,12 +249,13 @@ public class WorkflowConfig {
     private boolean _isTerminable = true;
     private int _capacity = Integer.MAX_VALUE;
     private ScheduleConfig _scheduleConfig;
+    private String _workflowType;
 
     public WorkflowConfig build() {
       validate();
 
       return new WorkflowConfig(_taskDag, _parallelJobs, _targetState, _expiry, _failureThreshold,
-          _isTerminable, _scheduleConfig, _capacity);
+          _isTerminable, _scheduleConfig, _capacity, _workflowType);
     }
 
     public Builder() {}
@@ -289,6 +301,11 @@ public class WorkflowConfig {
       return this;
     }
 
+    public Builder setWorkFlowType(String workflowType) {
+      _workflowType = workflowType;
+      return this;
+    }
+
     public Builder setTerminable(boolean isTerminable) {
       _isTerminable = isTerminable;
       return this;
@@ -355,6 +372,10 @@ public class WorkflowConfig {
       if (scheduleConfig != null) {
         setScheduleConfig(scheduleConfig);
       }
+
+      if (cfg.containsKey(WorkflowConfigProperty.WorkflowType.name())) {
+        setWorkFlowType(cfg.get(WorkflowConfigProperty.WorkflowType.name()));
+      }
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/be78a4f1/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index a570026..0080cc6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -31,6 +31,7 @@ public class JobBean {
   public String name;
   public List<String> parents;
   public String targetResource;
+  public String jobType;
   public List<String> targetPartitionStates;
   public List<String> targetPartitions;
   public String command;

http://git-wip-us.apache.org/repos/asf/helix/blob/be78a4f1/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
index a59e818..2a9563e 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java
@@ -31,4 +31,5 @@ public class WorkflowBean {
   public List<JobBean> jobs;
   public ScheduleBean schedule;
   public long expiry = WorkflowConfig.DEFAULT_EXPIRY;
+  public String workflowType;
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/be78a4f1/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
new file mode 100644
index 0000000..0b02085
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestJobAndWorkflowType.java
@@ -0,0 +1,60 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestJobAndWorkflowType extends TaskTestBase {
+  private static final Logger LOG = Logger.getLogger(TestJobAndWorkflowType.class);
+
+  @Test
+  public void testJobAndWorkflowType() throws InterruptedException {
+    LOG.info("Start testing job and workflow type");
+    String jobName = TestHelper.getTestMethodName();
+    JobConfig.Builder jobConfig = JobConfig.Builder.fromMap(WorkflowGenerator.DEFAULT_JOB_CONFIG)
+        .setJobCommandConfigMap(WorkflowGenerator.DEFAULT_COMMAND_CONFIG).setJobType("JobTestType");
+
+    Map<String, String> tmp = new HashMap<String, String>();
+    tmp.put("WorkflowType", "WorkflowTestType");
+    Workflow.Builder builder =
+        WorkflowGenerator.generateSingleJobWorkflowBuilder(jobName, jobConfig).fromMap(tmp);
+
+    // Start workflow
+    _driver.start(builder.build());
+
+    TaskTestUtil.pollForWorkflowState(_driver, jobName, TaskState.COMPLETED);
+    String fetchedJobType =
+        _driver.getJobConfig(String.format("%s_%s", jobName, jobName)).getJobType();
+    String fetchedWorkflowType =
+        _driver.getWorkflowConfig(jobName).getWorkflowType();
+
+    Assert.assertEquals(fetchedJobType, "JobTestType");
+    Assert.assertEquals(fetchedWorkflowType, "WorkflowTestType");
+  }
+}


Mime
View raw message