helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: tempararily revert changes to HelixTaskExecutor.java, reset HelixTaskExecutor needs more refactoring including 1) handling InterruptedException carefully; 2) use of _lock and synchronized on manager instance in postMessageHandling
Date Mon, 24 Jun 2013 19:44:48 GMT
Updated Branches:
  refs/heads/master 3059f7b5b -> fbcd316d6


tempararily revert changes to HelixTaskExecutor.java, reset HelixTaskExecutor needs more refactoring
including 1) handling InterruptedException carefully; 2) use of _lock and synchronized on
manager instance in postMessageHandling


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/fbcd316d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/fbcd316d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/fbcd316d

Branch: refs/heads/master
Commit: fbcd316d658016bc7677eb048c413fba5986a292
Parents: 3059f7b
Author: zzhang <zzhang5@uci.edu>
Authored: Mon Jun 24 12:44:39 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Mon Jun 24 12:44:39 2013 -0700

----------------------------------------------------------------------
 .../messaging/handling/HelixTaskExecutor.java   | 154 +++++++------------
 1 file changed, 58 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fbcd316d/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 22ca9ba..cb07494 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -95,7 +95,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     _statusUpdateUtil = new StatusUpdateUtil();
     _monitor = new ParticipantMonitor();
     
-    _timer = new Timer(true);	// created as a daemon timer thread to handle task timeout
+    _timer = new Timer(true); // created as a daemon timer thread to handle task timeout
 
     startMonitorThread();
   }
@@ -129,7 +129,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     else
     {
       LOG.warn("Fail to register msg-handler-factory for type: " + type 
-    		  + ", pool-size: " + threadpoolSize + ", factory: " + factory);
+          + ", pool-size: " + threadpoolSize + ", factory: " + factory);
     }
   }
 
@@ -208,24 +208,24 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
   @Override
   public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask>
tasks, long timeout, TimeUnit unit) throws InterruptedException
   {
-	  if (tasks == null || tasks.size() == 0) {
-		  return null;
-	  }
-	  
-	  // check all tasks use the same executor-service
-	  ExecutorService exeSvc = findExecutorServiceForMsg(tasks.get(0).getMessage());
-	  for (int i = 1; i < tasks.size(); i++) {
-		  MessageTask task = tasks.get(i);
-		  ExecutorService curExeSvc = findExecutorServiceForMsg(task.getMessage());
-		  if (curExeSvc != exeSvc) {
-			  LOG.error("Fail to invoke all tasks because they are not using the same executor-service");
-			  return null;
-		  }
-	  }
-	  
-	  // TODO: check if any of the task has already been scheduled
-	  	  
-	  // this is a blocking call
+    if (tasks == null || tasks.size() == 0) {
+      return null;
+    }
+    
+    // check all tasks use the same executor-service
+    ExecutorService exeSvc = findExecutorServiceForMsg(tasks.get(0).getMessage());
+    for (int i = 1; i < tasks.size(); i++) {
+      MessageTask task = tasks.get(i);
+      ExecutorService curExeSvc = findExecutorServiceForMsg(task.getMessage());
+      if (curExeSvc != exeSvc) {
+        LOG.error("Fail to invoke all tasks because they are not using the same executor-service");
+        return null;
+      }
+    }
+    
+    // TODO: check if any of the task has already been scheduled
+        
+    // this is a blocking call
       List<Future<HelixTaskResult>> futures = exeSvc.invokeAll(tasks, timeout,
unit);
       
       return futures;
@@ -234,17 +234,17 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
   @Override
   public boolean cancelTimeoutTask(MessageTask task)
   {
-	  synchronized(_lock) {
-		  String taskId = task.getTaskId();
+    synchronized(_lock) {
+      String taskId = task.getTaskId();
           if (_taskMap.containsKey(taskId)) {
-        	  MessageTaskInfo info = _taskMap.get(taskId);
-        	  if (info._timerTask != null) {
-        		  info._timerTask.cancel();
-        	  }
-        	  return true;
+            MessageTaskInfo info = _taskMap.get(taskId);
+            if (info._timerTask != null) {
+              info._timerTask.cancel();
+            }
+            return true;
           }
           return false;
-	  }
+    }
   }
   
   @Override 
@@ -277,9 +277,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
         {
           if (!_taskMap.containsKey(taskId))
           {
-          	ExecutorService exeSvc = findExecutorServiceForMsg(message);
-        	Future<HelixTaskResult> future = exeSvc.submit(task);
-        	
+            ExecutorService exeSvc = findExecutorServiceForMsg(message);
+          Future<HelixTaskResult> future = exeSvc.submit(task);
+          
             TimerTask timerTask = null;
             if (message.getExecutionTimeout() > 0)
             {
@@ -293,7 +293,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
               LOG.debug("Message does not have timeout. MsgId: " + task.getTaskId());
             }
 
-        	_taskMap.put(taskId, new MessageTaskInfo(task, future, timerTask));
+          _taskMap.put(taskId, new MessageTaskInfo(task, future, timerTask));
 
             LOG.info("Message: " + taskId + " handling task scheduled");
 
@@ -335,13 +335,13 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
       {
         if (_taskMap.containsKey(taskId))
         {
-      	  MessageTaskInfo taskInfo = _taskMap.get(taskId);
-    	  // cancel timeout task
+          MessageTaskInfo taskInfo = _taskMap.get(taskId);
+        // cancel timeout task
           if (taskInfo._timerTask != null) {
-        	  taskInfo._timerTask.cancel();
+            taskInfo._timerTask.cancel();
           }
 
-    	  // cancel task
+        // cancel task
           Future<HelixTaskResult> future = taskInfo.getFuture();
 
           _statusUpdateUtil.logInfo(message,
@@ -384,7 +384,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
   @Override
   public void finishTask(MessageTask task)
   {
-	Message message = task.getMessage();
+  Message message = task.getMessage();
     String taskId = task.getTaskId();
     LOG.info("message finished: " + taskId + ", took "
             + (new Date().getTime() - message.getExecuteStartTimeStamp()));
@@ -395,8 +395,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
       {
           MessageTaskInfo info = _taskMap.remove(taskId);
           if (info._timerTask != null) {
-        	  // ok to cancel multiple times
-        	  info._timerTask.cancel();
+            // ok to cancel multiple times
+            info._timerTask.cancel();
           }
       }
       else
@@ -419,58 +419,6 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     accessor.setChildren(readMsgKeys, readMsgs);
   }
 
-  /**
-   * remove message-handler factory from map, shutdown the associated executor
-   * 
-   * @param type
-   */
-  private void unregisterMessageHandlerFactory(String type) {
-    // shutdown executor-service. disconnect if fail
-    ExecutorService executorSvc = _executorMap.remove(type);
-    if (executorSvc != null) {
-      List<Runnable> tasksLeft = executorSvc.shutdownNow();
-      LOG.info(tasksLeft.size() + " tasks never executed for msgType: "
-          + type + ". tasks: " + tasksLeft);
-      try {
-        if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
-          LOG.error("executor-service for msgType: " + type 
-              + " is not fully terminated in 200ms. will disconnect helix-participant");
-          throw new HelixException("fail to unregister msg-handler for msgType: " + type);
-
-        }
-      } catch (InterruptedException e) {
-        LOG.error("interruped when waiting for executor-service shutdown for msgType: " +
type, e);
-      }
-    }
-
-    // reset state-model
-    MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
-    if (handlerFty != null) {
-      handlerFty.reset();
-    }
-  }
-  
-  /**
-   * shutdown executor, wait for shutdown complete
-   * if shutdown fails/timeouts, disconnect HelixParticipant
-   * if shutdown completes successfully, reset all state models
-   */
-  private void reset() {
-    LOG.info("Get FINALIZE notification");
-    
-    // shutdown all executor-services
-    synchronized (_lock)
-    {
-      for (String msgType : _executorMap.keySet())
-      {
-        unregisterMessageHandlerFactory(msgType);
-      }
-      
-      // clear task-map, all tasks should be terminated by now
-      _taskMap.clear();
-    }
-  }
-  
   @Override
   public void onMessage(String instanceName,
                         List<Message> messages,
@@ -481,7 +429,21 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     // TODO: see if we should have a separate notification call for resetting
     if (changeContext.getType() == Type.FINALIZE)
     {
-      reset();
+      LOG.info("Get FINALIZE notification");
+      for (MessageHandlerFactory factory : _handlerFactoryMap.values())
+      {
+        factory.reset();
+      }
+      // Cancel all scheduled tasks
+      synchronized (_lock)
+      {
+          for (MessageTaskInfo info : _taskMap.values())
+          {
+            cancelTask(info._task);
+          }
+        _taskMap.clear();
+      }
+      return;
     }
 
     if (messages == null || messages.size() == 0)
@@ -547,9 +509,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
         // read. Instead we keep it until the current state is updated.
         // We will read the message again if there is a new message but we
         // check for the status and ignore if its already read
-    	if (LOG.isTraceEnabled()) {
-    		LOG.trace("Message already read. msgId: " + message.getMsgId());
-    	}
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Message already read. msgId: " + message.getMsgId());
+      }
         continue;
       }
 
@@ -710,4 +672,4 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     _monitor.shutDown();
     LOG.info("shutdown finished");
   }
-}
+}
\ No newline at end of file


Mime
View raw message