Repository: helix
Updated Branches:
refs/heads/master 6d30c9c58 -> 63f084bd2
[HELIX-443] Race condition in Helix register/unregister MessageHandlerFactory, rb=21248
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fdee2dd7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fdee2dd7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fdee2dd7
Branch: refs/heads/master
Commit: fdee2dd73dcb074a1e689e2431134edee9104768
Parents: 6d30c9c
Author: zzhang <zzhang@apache.org>
Authored: Wed May 21 15:09:14 2014 -0700
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Jul 11 10:30:10 2014 -0700
----------------------------------------------------------------------
.../messaging/handling/HelixTaskExecutor.java | 115 +++++++++++--------
1 file changed, 64 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/fdee2dd7/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 8da53ea..e2af382 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -49,6 +49,9 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
@@ -103,19 +106,22 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
@Override
public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory,
int threadpoolSize) {
- if (!_handlerFactoryMap.containsKey(type)) {
- if (!type.equalsIgnoreCase(factory.getMessageType())) {
- throw new HelixException("Message factory type mismatch. Type: " + type + " factory
: "
- + factory.getMessageType());
+ if (!type.equalsIgnoreCase(factory.getMessageType())) {
+ throw new HelixException("Message factory type mismatch. Type: " + type + ", factory:
"
+ + factory.getMessageType());
+ }
+ MessageHandlerFactory prevFactory = _handlerFactoryMap.putIfAbsent(type, factory);
+ if (prevFactory == null) {
+ if (!_executorMap.contains(type)) {
+ _executorMap.put(type, Executors.newFixedThreadPool(threadpoolSize));
+ } else {
+ LOG.error("Skip to create new thread pool for type: " + type);
}
- _handlerFactoryMap.put(type, factory);
- ExecutorService executorSvc = Executors.newFixedThreadPool(threadpoolSize);
- _executorMap.put(type, executorSvc);
-
- LOG.info("Added msg-factory for type: " + type + ", threadpool size " + threadpoolSize);
+ LOG.info("Registered message handler factory for type: " + type + ", poolSize: "
+ + threadpoolSize + ", factory: " + factory + ", pool: " + _executorMap.get(type));
} else {
- LOG.warn("Fail to register msg-handler-factory for type: " + type + ", pool-size: "
+ LOG.warn("Fail to register message handler factory for type: " + type + ", poolSize:
"
+ threadpoolSize + ", factory: " + factory);
}
}
@@ -133,9 +139,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
int threadpoolSize = -1;
ConfigAccessor configAccessor = manager.getConfigAccessor();
if (configAccessor != null) {
- ConfigScope scope =
- new ConfigScopeBuilder().forCluster(manager.getClusterName()).forResource(resourceName)
- .build();
+ HelixConfigScope scope =
+ new HelixConfigScopeBuilder(ConfigScopeProperty.RESOURCE)
+ .forCluster(manager.getClusterName()).forResource(resourceName).build();
String threadpoolSizeStr = configAccessor.get(scope, MAX_THREADS);
try {
@@ -242,6 +248,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
synchronized (_lock) {
if (!_taskMap.containsKey(taskId)) {
ExecutorService exeSvc = findExecutorServiceForMsg(message);
+
+ LOG.info("Submit task: " + taskId + " to pool: " + exeSvc);
Future<HelixTaskResult> future = exeSvc.submit(task);
TimerTask timerTask = null;
@@ -296,8 +304,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
// If the thread is still running it will be interrupted if cancel(true)
// is called. So state transition callbacks should implement logic to
- // return
- // if it is interrupted.
+ // return if it is interrupted.
if (future.cancel(true)) {
_statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " +
taskId,
notificationContext.getManager().getHelixDataAccessor());
@@ -346,42 +353,63 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
accessor.setChildren(readMsgKeys, readMsgs);
}
+ private void shutdownAndAwaitTermination(ExecutorService pool) {
+ LOG.info("Shutting down pool: " + pool);
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)) {
+ List<Runnable> waitingTasks = pool.shutdownNow(); // Cancel currently executing
tasks
+ LOG.info("Tasks that never commenced execution: " + waitingTasks);
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(200, TimeUnit.MILLISECONDS)) {
+ LOG.error("Pool did not fully terminate in 200ms. pool: " + pool);
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ LOG.error("Interruped when waiting for shutdown pool: " + pool, ie);
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
/**
* remove message-handler factory from map, shutdown the associated executor
* @param type
*/
void unregisterMessageHandlerFactory(String type) {
- // shutdown executor-service. disconnect if fail
- ExecutorService executorSvc = _executorMap.remove(type);
- if (executorSvc != null) {
- List<Runnable> tasksLeft = executorSvc.shutdownNow();
- LOG.info(tasksLeft.size() + " tasks never executed for msgType: " + type + ". tasks:
"
- + tasksLeft);
- try {
- if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
- LOG.error("executor-service for msgType: " + type
- + " is not fully terminated in 200ms. will disconnect helix-participant");
- throw new HelixException("fail to unregister msg-handler for msgType: " + type);
- }
- } catch (InterruptedException e) {
- LOG.error("interruped when waiting for executor-service shutdown for msgType: " +
type, e);
- }
+ ExecutorService pool = _executorMap.remove(type);
+ MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
+
+ LOG.info("Unregistering message handler factory for type: " + type + ", factory: " +
handlerFty
+ + ", pool: " + pool);
+
+ if (pool != null) {
+ shutdownAndAwaitTermination(pool);
}
// reset state-model
- MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
if (handlerFty != null) {
handlerFty.reset();
}
+
+ LOG.info("Unregistered message handler factory for type: " + type + ", factory: " + handlerFty
+ + ", pool: " + pool);
}
void reset() {
- LOG.info("Get FINALIZE notification");
+ LOG.info("Reset HelixTaskExecutor");
for (String msgType : _executorMap.keySet()) {
unregisterMessageHandlerFactory(msgType);
}
- // clear task-map, all tasks should be terminated by now
+ // Log all tasks that fail to terminate
+ for (String taskId : _taskMap.keySet()) {
+ MessageTaskInfo info = _taskMap.get(taskId);
+ LOG.warn("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage());
+ }
_taskMap.clear();
}
@@ -559,26 +587,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
@Override
public void shutdown() {
- LOG.info("shutting down TaskExecutor");
+ LOG.info("Shutting down HelixTaskExecutor");
_timer.cancel();
- synchronized (_lock) {
- for (String msgType : _executorMap.keySet()) {
- List<Runnable> tasksLeft = _executorMap.get(msgType).shutdownNow();
- LOG.info(tasksLeft.size() + " tasks are still in the threadpool for msgType " + msgType);
- }
- for (String msgType : _executorMap.keySet()) {
- try {
- if (!_executorMap.get(msgType).awaitTermination(200, TimeUnit.MILLISECONDS)) {
- LOG.warn(msgType + " is not fully termimated in 200 MS");
- System.out.println(msgType + " is not fully termimated in 200 MS");
- }
- } catch (InterruptedException e) {
- LOG.error("Interrupted", e);
- }
- }
- }
+ reset();
_monitor.shutDown();
- LOG.info("shutdown finished");
+ LOG.info("Shutdown HelixTaskExecutor finished");
}
}
|