stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From im...@apache.org
Subject stratos git commit: Introducing StratosThreadPool.getScheduledExecutorService() method
Date Tue, 09 Dec 2014 10:02:32 GMT
Repository: stratos
Updated Branches:
  refs/heads/4.1.0-test ca93b9da3 -> 8ff932782


Introducing StratosThreadPool.getScheduledExecutorService() method


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/8ff93278
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/8ff93278
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/8ff93278

Branch: refs/heads/4.1.0-test
Commit: 8ff932782fc20cba20a59a16095d0f34eeff798b
Parents: ca93b9d
Author: Imesh Gunaratne <imesh@apache.org>
Authored: Tue Dec 9 15:32:19 2014 +0530
Committer: Imesh Gunaratne <imesh@apache.org>
Committed: Tue Dec 9 15:32:19 2014 +0530

----------------------------------------------------------------------
 .../controller/iaases/mock/MockMember.java      | 16 +++++----
 .../common/threading/StratosThreadPool.java     | 38 +++++++++++++++-----
 2 files changed, 39 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/8ff93278/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/MockMember.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/MockMember.java
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/MockMember.java
index 65a5cb2..f97ccaf 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/MockMember.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/mock/MockMember.java
@@ -22,6 +22,7 @@ package org.apache.stratos.cloud.controller.iaases.mock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.iaases.mock.statistics.MockHealthStatisticsNotifier;
+import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupClusterEvent;
 import org.apache.stratos.messaging.event.instance.notifier.InstanceCleanupMemberEvent;
@@ -31,7 +32,6 @@ import org.apache.stratos.messaging.message.receiver.instance.notifier.InstanceN
 
 import java.io.Serializable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -41,10 +41,12 @@ import java.util.concurrent.TimeUnit;
 public class MockMember implements Runnable, Serializable {
 
     private static final Log log = LogFactory.getLog(MockMember.class);
-    private static final ExecutorService executorService =
-            Executors.newFixedThreadPool(MockConstants.MAX_MOCK_MEMBER_COUNT);
-    private static final ScheduledExecutorService scheduler =
-            Executors.newScheduledThreadPool(MockConstants.MAX_MOCK_MEMBER_COUNT);
+    private static final ExecutorService instanceNotifierExecutorService =
+            StratosThreadPool.getExecutorService("MOCK_MEMBER_INSTANCE_NOTIFIER",
+                    MockConstants.MAX_MOCK_MEMBER_COUNT);
+    private static final ScheduledExecutorService healthStatPublisherExecutorService =
+            StratosThreadPool.getScheduledExecutorService("MOCK_MEMBER_HEALTH_STAT_PUBLISHER",
+                    MockConstants.MAX_MOCK_MEMBER_COUNT);
     private static final int HEALTH_STAT_INTERVAL = 15; // 15 seconds
 
     private final MockMemberContext mockMemberContext;
@@ -105,7 +107,7 @@ public class MockMember implements Runnable, Serializable {
             }
         });
 
-        executorService.submit(new Runnable() {
+        instanceNotifierExecutorService.submit(new Runnable() {
             @Override
             public void run() {
                 instanceNotifierEventReceiver.execute();
@@ -128,7 +130,7 @@ public class MockMember implements Runnable, Serializable {
         if (log.isDebugEnabled()) {
             log.debug(String.format("Starting health statistics notifier: [member-id] %s",
mockMemberContext.getMemberId()));
         }
-        scheduler.scheduleAtFixedRate(new MockHealthStatisticsNotifier(mockMemberContext),
+        healthStatPublisherExecutorService.scheduleAtFixedRate(new MockHealthStatisticsNotifier(mockMemberContext),
                 HEALTH_STAT_INTERVAL, HEALTH_STAT_INTERVAL, TimeUnit.SECONDS);
 
         if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/8ff93278/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 3e1ebbe..c85e146 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -20,38 +20,60 @@
  */
 package org.apache.stratos.common.threading;
 
-import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Utility class for Stratos thread pool
  */
 public class StratosThreadPool {
 
-	private static HashMap<String, ExecutorService> mapExecutorService = new HashMap<String,
ExecutorService>();
-	private static Object mutex = new Object();
+	private static Map<String, ExecutorService> executorServiceMap = new ConcurrentHashMap<String,
ExecutorService>();
+    private static Map<String, ScheduledExecutorService> scheduledServiceMap = new
ConcurrentHashMap<String, ScheduledExecutorService>();
+	private static Object executorServiceMapLock = new Object();
+    private static Object scheduledServiceMapLock = new Object();
 
 	/**
 	 * Return the executor service based on the identifier and thread pool size
 	 *
-	 * @param identifier     Component identifier name
+	 * @param identifier     Thread pool identifier name
 	 * @param threadPoolSize Thread pool size
 	 * @return ExecutorService
 	 */
 	public static ExecutorService getExecutorService(String identifier, int threadPoolSize)
{
-		ExecutorService executorService = mapExecutorService.get(identifier);
+		ExecutorService executorService = executorServiceMap.get(identifier);
 		if (executorService == null) {
-			synchronized (mutex) {
+			synchronized (executorServiceMapLock) {
 				if (executorService == null) {
 					executorService = Executors.newFixedThreadPool(threadPoolSize);
-					mapExecutorService.put(identifier, executorService);
+					executorServiceMap.put(identifier, executorService);
 				}
 			}
 
 		}
 		return executorService;
-
 	}
 
+    /**
+     * Returns a scheduled executor for given thread pool size.
+     * @param identifier     Thread pool identifier name
+     * @param threadPoolSize Thread pool size
+     * @return
+     */
+    public static ScheduledExecutorService getScheduledExecutorService(String identifier,
int threadPoolSize) {
+        ScheduledExecutorService scheduledExecutorService = scheduledServiceMap.get(identifier);
+        if (scheduledExecutorService == null) {
+            synchronized (scheduledServiceMapLock) {
+                if (scheduledExecutorService == null) {
+                    scheduledExecutorService = Executors.newScheduledThreadPool(threadPoolSize);
+                    scheduledServiceMap.put(identifier, scheduledExecutorService);
+                }
+            }
+
+        }
+        return scheduledExecutorService;
+    }
 }


Mime
View raw message