stratos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From im...@apache.org
Subject [3/3] stratos git commit: Moving distributed locks handled by distributed object provider and to functional methods
Date Wed, 03 Dec 2014 18:44:52 GMT
Moving distributed locks handled by distributed object provider and to functional methods


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/e8914f3d
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/e8914f3d
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/e8914f3d

Branch: refs/heads/master
Commit: e8914f3d048866ab8913801aa702c6f234b41ce5
Parents: 08de729
Author: Imesh Gunaratne <imesh@apache.org>
Authored: Wed Dec 3 18:19:36 2014 +0530
Committer: Imesh Gunaratne <imesh@apache.org>
Committed: Thu Dec 4 00:14:40 2014 +0530

----------------------------------------------------------------------
 .../context/CloudControllerContext.java         | 148 +++++++---
 .../impl/CloudControllerServiceImpl.java        |  19 +-
 .../clustering/DistributedObjectProvider.java   |   9 +-
 .../impl/HazelcastDistributedListProvider.java  | 268 +++++++++++++++++++
 .../HazelcastDistributedObjectProvider.java     | 219 ++++++++-------
 .../clustering/impl/ListEntryListener.java      |  37 +++
 .../test/DistributedObjectProviderTest.java     |  78 ++----
 .../load/balancer/context/AlgorithmContext.java |  22 +-
 .../context/map/AlgorithmContextMap.java        |  16 +-
 9 files changed, 598 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
index af8741c..2d4e195 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/context/CloudControllerContext.java
@@ -36,12 +36,8 @@ import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
 import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
 import org.wso2.carbon.registry.core.exceptions.RegistryException;
 
-import com.google.common.net.InetAddresses;
-
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -49,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.locks.Lock;
 
 /**
  * This object holds all runtime data and provides faster access. This is a Singleton class.
@@ -58,15 +55,23 @@ public class CloudControllerContext implements Serializable {
     private static final long serialVersionUID = -2662307358852779897L;
     private static final Log log = LogFactory.getLog(CloudControllerContext.class);
 
-    public static final String KUB_GROUP_ID_TO_GROUP_MAP = "KUB_GROUP_ID_TO_GROUP_MAP";
-    public static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX";
-    public static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX";
-    public static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK";
-    public static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
-    public static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX";
-    public static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
-    public static final String CC_CARTRIDGES = "CC_CARTRIDGES";
-    public static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
+    private static final String CC_CLUSTER_ID_TO_MEMBER_CTX = "CC_CLUSTER_ID_TO_MEMBER_CTX";
+    private static final String CC_CLUSTER_ID_TO_CLUSTER_CTX = "CC_CLUSTER_ID_TO_CLUSTER_CTX";
+    private static final String CC_MEMBER_ID_TO_MEMBER_CTX = "CC_MEMBER_ID_TO_MEMBER_CTX";
+    private static final String CC_MEMBER_ID_TO_SCH_TASK = "CC_MEMBER_ID_TO_SCH_TASK";
+    private static final String CC_KUB_GROUP_ID_TO_GROUP_MAP = "CC_KUB_GROUP_ID_TO_GROUP_MAP";
+    private static final String CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX";
+    private static final String CC_CARTRIDGE_TYPE_TO_PARTITION_IDS = "CC_CARTRIDGE_TYPE_TO_PARTITION_IDS";
+    private static final String CC_CARTRIDGES = "CC_CARTRIDGES";
+    private static final String CC_SERVICE_GROUPS = "CC_SERVICE_GROUPS";
+
+    private static final String CC_CLUSTER_CTX_LOCK = "CC_CLUSTER_ID_TO_MEMBER_CTX_LOCK";
+    private static final String CC_MEMBER_CTX_LOCK = "CC_MEMBER_ID_TO_MEMBER_CTX_LOCK";
+    private static final String CC_SCH_TASK_LOCK = "CC_MEMBER_ID_TO_SCH_TASK_LOCK";
+    private static final String CC_KUB_GROUP_LOCK = "CC_KUB_GROUP_ID_TO_GROUP_MAP_LOCK";
+    private static final String CC_KUB_CLUSTER_CTX_LOCK = "CC_KUB_CLUSTER_ID_TO_KUB_CLUSTER_CTX_LOCK";
+    private static final String CC_CARTRIDGES_LOCK = "CC_CARTRIDGES_LOCK";
+    private static final String CC_SERVICE_GROUPS_LOCK = "CC_SERVICE_GROUPS_LOCK";
 
     private static volatile CloudControllerContext instance;
 
@@ -151,7 +156,7 @@ public class CloudControllerContext implements Serializable {
         distributedObjectProvider = ServiceReferenceHolder.getInstance().getDistributedObjectProvider();
 
         // Initialize objects
-        kubernetesGroupsMap = distributedObjectProvider.getMap(KUB_GROUP_ID_TO_GROUP_MAP);
+        kubernetesGroupsMap = distributedObjectProvider.getMap(CC_KUB_GROUP_ID_TO_GROUP_MAP);
         clusterIdToMemberContextListMap = distributedObjectProvider.getMap(CC_CLUSTER_ID_TO_MEMBER_CTX);
         memberIdToMemberContextMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_MEMBER_CTX);
         memberIdToScheduledTaskMap = distributedObjectProvider.getMap(CC_MEMBER_ID_TO_SCH_TASK);
@@ -201,8 +206,72 @@ public class CloudControllerContext implements Serializable {
         return null;
     }
 
+    private Lock acquireWriteLock(String object) {
+        return distributedObjectProvider.acquireLock(object);
+    }
+
+    private void releaseWriteLock(Lock lock) {
+        distributedObjectProvider.releaseLock(lock);
+    }
+
+    public Lock acquireKubernetesWriteLock() {
+        return acquireWriteLock(CC_CLUSTER_CTX_LOCK);
+    }
+
+    public Lock acquireMemberContextWriteLock() {
+        return acquireWriteLock(CC_MEMBER_CTX_LOCK);
+    }
+
+    public Lock acquireScheduleTaskWriteLock() {
+        return acquireWriteLock(CC_SCH_TASK_LOCK);
+    }
+
+    public Lock acquireKubernetesGroupWriteLock() {
+        return acquireWriteLock(CC_KUB_GROUP_LOCK);
+    }
+
+    public Lock acquireKubernetesClusterContextWriteLock() {
+        return acquireWriteLock(CC_KUB_CLUSTER_CTX_LOCK);
+    }
+
+    public Lock acquireCartridgesWriteLock() {
+        return acquireWriteLock(CC_CARTRIDGES_LOCK);
+    }
+
+    public Lock acquireServiceGroupsWriteLock() {
+        return acquireWriteLock(CC_SERVICE_GROUPS_LOCK);
+    }
+
+    public void releaseKubernetesWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseMemberContextWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseScheduleTaskWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseKubernetesGroupWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseKubernetesClusterContextWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseCartridgesWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
+    public void releaseServiceGroupsWriteLock(Lock lock) {
+        releaseWriteLock(lock);
+    }
+
     public void addCartridge(Cartridge newCartridges) {
-        distributedObjectProvider.addToList(cartridges, newCartridges);
+        cartridges.add(newCartridges);
     }
 
     public ServiceGroup getServiceGroup(String name) {
@@ -215,7 +284,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addServiceGroup(ServiceGroup newServiceGroup) {
-        distributedObjectProvider.addToList(serviceGroups, newServiceGroup);
+        serviceGroups.add(newServiceGroup);
     }
 
     public void removeServiceGroup(List<ServiceGroup> serviceGroup) {
@@ -257,38 +326,37 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addMemberContext(MemberContext memberContext) {
-        distributedObjectProvider.putToMap(memberIdToMemberContextMap, memberContext.getMemberId(), memberContext);
+        memberIdToMemberContextMap.put(memberContext.getMemberId(), memberContext);
 
         List<MemberContext> memberContextList;
         if ((memberContextList = clusterIdToMemberContextListMap.get(memberContext.getClusterId())) == null) {
             memberContextList = new ArrayList<MemberContext>();
         }
         if (memberContextList.contains(memberContext)) {
-            distributedObjectProvider.removeFromList(memberContextList,memberContext);
+            memberContextList.remove(memberContext);
         }
-        distributedObjectProvider.addToList(memberContextList, memberContext);
-        distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, memberContext.getClusterId(),
-                memberContextList);
+        memberContextList.add(memberContext);
+        clusterIdToMemberContextListMap.put(memberContext.getClusterId(), memberContextList);
         if (log.isDebugEnabled()) {
             log.debug("Added member context to the cloud controller context: " + memberContext);
         }
     }
 
     public void addScheduledFutureJob(String memberId, ScheduledFuture<?> job) {
-        distributedObjectProvider.putToMap(memberIdToScheduledTaskMap, memberId, job);
+        memberIdToScheduledTaskMap.put(memberId, job);
     }
 
     public List<MemberContext> removeMemberContextsOfCluster(String clusterId) {
         List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
-        distributedObjectProvider.removeFromMap(clusterIdToMemberContextListMap, clusterId);
+        clusterIdToMemberContextListMap.remove(clusterId);
         if (memberContextList == null) {
             return new ArrayList<MemberContext>();
         }
         for (MemberContext memberContext : memberContextList) {
             String memberId = memberContext.getMemberId();
-            distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
+            memberIdToMemberContextMap.remove(memberId);
             ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
-            distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
+            memberIdToScheduledTaskMap.remove(memberId);
             stopTask(task);
 
             if (log.isDebugEnabled()) {
@@ -301,7 +369,7 @@ public class CloudControllerContext implements Serializable {
 
     public MemberContext removeMemberContext(String memberId, String clusterId) {
         MemberContext removedMemberContext = memberIdToMemberContextMap.get(memberId);
-        distributedObjectProvider.removeFromMap(memberIdToMemberContextMap, memberId);
+        memberIdToMemberContextMap.remove(memberId);
 
         List<MemberContext> memberContextList = clusterIdToMemberContextListMap.get(clusterId);
         if (memberContextList != null) {
@@ -315,10 +383,10 @@ public class CloudControllerContext implements Serializable {
                     iterator.remove();
                 }
             }
-            distributedObjectProvider.putToMap(clusterIdToMemberContextListMap, clusterId, newCtxts);
+            clusterIdToMemberContextListMap.put(clusterId, newCtxts);
         }
         ScheduledFuture<?> task = memberIdToScheduledTaskMap.get(memberId);
-        distributedObjectProvider.removeFromMap(memberIdToScheduledTaskMap, memberId);
+        memberIdToScheduledTaskMap.remove(memberId);
         stopTask(task);
         return removedMemberContext;
     }
@@ -339,7 +407,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addClusterContext(ClusterContext ctxt) {
-        distributedObjectProvider.putToMap(clusterIdToContextMap, ctxt.getClusterId(), ctxt);
+        clusterIdToContextMap.put(ctxt.getClusterId(), ctxt);
     }
 
     public ClusterContext getClusterContext(String clusterId) {
@@ -348,7 +416,7 @@ public class CloudControllerContext implements Serializable {
 
     public ClusterContext removeClusterContext(String clusterId) {
         ClusterContext removed = clusterIdToContextMap.get(clusterId);
-        distributedObjectProvider.removeFromMap(clusterIdToContextMap, clusterId);
+        clusterIdToContextMap.remove(clusterId);
         return removed;
     }
 
@@ -366,11 +434,11 @@ public class CloudControllerContext implements Serializable {
             list = new ArrayList<String>();
         }
         list.add(partitionId);
-        distributedObjectProvider.putToMap(cartridgeTypeToPartitionIdsMap, cartridgeType, list);
+        cartridgeTypeToPartitionIdsMap.put(cartridgeType, list);
     }
 
     public void removeFromCartridgeTypeToPartitionIds(String cartridgeType) {
-        distributedObjectProvider.removeFromMap(cartridgeTypeToPartitionIdsMap, cartridgeType);
+        cartridgeTypeToPartitionIdsMap.remove(cartridgeType);
     }
 
     public KubernetesClusterContext getKubernetesClusterContext(String kubClusterId) {
@@ -378,7 +446,7 @@ public class CloudControllerContext implements Serializable {
     }
 
     public void addKubernetesClusterContext(KubernetesClusterContext kubernetesClusterContext) {
-        distributedObjectProvider.putToMap(kubClusterIdToKubClusterContextMap,
+        kubClusterIdToKubClusterContextMap.put(
                 kubernetesClusterContext.getKubernetesClusterId(),
                 kubernetesClusterContext);
     }
@@ -388,7 +456,7 @@ public class CloudControllerContext implements Serializable {
      */
     public synchronized void removeKubernetesGroup(String kubernetesGroupId) {
         // Remove entry from information model
-        distributedObjectProvider.removeFromMap(kubernetesGroupsMap, kubernetesGroupId);
+        kubernetesGroupsMap.remove(kubernetesGroupId);
     }
 
     /**
@@ -435,8 +503,12 @@ public class CloudControllerContext implements Serializable {
         }
     }
 
-    public void addKubernetesGroupToInformationModel(KubernetesGroup kubernetesGroup) {
-        distributedObjectProvider.putToMap(kubernetesGroupsMap, kubernetesGroup.getGroupId(), kubernetesGroup);
+    public void addKubernetesGroup(KubernetesGroup kubernetesGroup) {
+        kubernetesGroupsMap.put(kubernetesGroup.getGroupId(), kubernetesGroup);
+    }
+
+    public void updateKubernetesGroup(KubernetesGroup kubernetesGroup) {
+        kubernetesGroupsMap.put(kubernetesGroup.getGroupId(), kubernetesGroup);
     }
     
     public boolean kubernetesGroupExists(KubernetesGroup kubernetesGroup) {
@@ -574,13 +646,13 @@ public class CloudControllerContext implements Serializable {
 
     private void copyMap(Map sourceMap, Map destinationMap) {
         for(Object key : sourceMap.keySet()) {
-            distributedObjectProvider.putToMap(destinationMap, key, sourceMap.get(key));
+            destinationMap.put(key, sourceMap.get(key));
         }
     }
 
     private void copyList(List sourceList, List destinationList) {
         for(Object item : sourceList) {
-            distributedObjectProvider.addToList(destinationList, item);
+            destinationList.add(item);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 4843565..548b743 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -73,6 +73,7 @@ import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
 
 /**
  * Cloud Controller Service is responsible for starting up new server instances,
@@ -2087,20 +2088,24 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             LOG.info("Deploying new Kubernetes group: " + kubernetesGroup);
         }
         CloudControllerUtil.validateKubernetesGroup(kubernetesGroup);
+        Lock lock = null;
         try {
+            lock = cloudControllerContext.acquireKubernetesGroupWriteLock();
             // Add to information model
-            cloudControllerContext.addKubernetesGroupToInformationModel(kubernetesGroup);
-
+            cloudControllerContext.addKubernetesGroup(kubernetesGroup);
             persist();
             
             if (LOG.isInfoEnabled()) {
                 LOG.info(String.format("Kubernetes group deployed successfully: [id] %s, [description] %s",
                         kubernetesGroup.getGroupId(), kubernetesGroup.getDescription()));
             }
-            
             return true;
         } catch (Exception e) {
             throw new InvalidKubernetesGroupException(e.getMessage(), e);
+        } finally {
+            if(lock != null) {
+                cloudControllerContext.releaseKubernetesGroupWriteLock(lock);
+            }
         }
     }
     
@@ -2117,7 +2122,9 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             LOG.info("Deploying new Kubernetes Host: " + kubernetesHost + " for Kubernetes group id: " + kubernetesGroupId);
         }
         CloudControllerUtil.validateKubernetesHost(kubernetesHost);
+        Lock lock = null;
         try {
+            lock = CloudControllerContext.getInstance().acquireKubernetesGroupWriteLock();
             KubernetesGroup kubernetesGroupStored = getKubernetesGroup(kubernetesGroupId);
             ArrayList<KubernetesHost> kubernetesHostArrayList;
 
@@ -2134,7 +2141,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 
             // Update information model
             kubernetesGroupStored.setKubernetesHosts(kubernetesHostArrayList.toArray(new KubernetesHost[kubernetesHostArrayList.size()]));
-
+            CloudControllerContext.getInstance().updateKubernetesGroup(kubernetesGroupStored);
             persist();
             
             if (LOG.isInfoEnabled()) {
@@ -2144,6 +2151,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
             return true;
         } catch (Exception e) {
             throw new InvalidKubernetesHostException(e.getMessage(), e);
+        } finally {
+            if(lock != null) {
+                cloudControllerContext.releaseKubernetesGroupWriteLock(lock);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
index 2fd471b..7e7d130 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/DistributedObjectProvider.java
@@ -22,6 +22,7 @@ package org.apache.stratos.common.clustering;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
 
 /**
  * Distributed object provider service interface.
@@ -31,11 +32,7 @@ public interface DistributedObjectProvider extends Serializable {
 
     List getList(String name);
 
-    void putToMap(Map map, Object key, Object value);
+    Lock acquireLock(Object object);
 
-    void removeFromMap(Map map, Object key);
-
-    void addToList(List list, Object value);
-
-    void removeFromList(List list, Object value);
+    void releaseLock(Lock lock);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
new file mode 100644
index 0000000..537cea9
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedListProvider.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.clustering.impl;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IList;
+import com.hazelcast.core.ItemEvent;
+import com.hazelcast.core.ItemListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+
+/**
+ * Hazelcast distributed list provider.
+ */
+public class HazelcastDistributedListProvider {
+    private static final Log log = LogFactory.getLog(HazelcastDistributedListProvider.class);
+
+    private HazelcastInstance hazelcastInstance;
+    private Map<String, DistList> listMap;
+
+    public HazelcastDistributedListProvider(HazelcastInstance hazelcastInstance) {
+        this.hazelcastInstance = hazelcastInstance;
+    }
+
+    public List getList(String name, ListEntryListener listEntryListener) {
+        List list = listMap.get(name);
+        if(list == null) {
+            synchronized (HazelcastDistributedListProvider.class) {
+                if(list == null) {
+                    list = new DistList(name, listEntryListener);
+                }
+            }
+        }
+        return list;
+    }
+
+    public void removeList(String name) {
+        DistList list = listMap.get(name);
+        if(list != null) {
+            IList ilist = (IList) list;
+            ilist.removeItemListener(list.getListenerId());
+            listMap.remove(list);
+            ilist.destroy();
+        }
+    }
+
+    private class DistList implements List {
+        private IList list;
+        private String listenerId;
+
+        public DistList(String name, final ListEntryListener listEntryListener) {
+            this.list = hazelcastInstance.getList(name);
+            listenerId = list.addItemListener(new ItemListener() {
+                @Override
+                public void itemAdded(ItemEvent itemEvent) {
+                    listEntryListener.itemAdded(itemEvent.getItem());
+                }
+
+                @Override
+                public void itemRemoved(ItemEvent itemEvent) {
+                    listEntryListener.itemRemoved(itemEvent.getItem());
+                }
+            }, false);
+        }
+
+        @Override
+        public int size() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.size();
+            }
+            return 0;
+        }
+
+        @Override
+        public boolean isEmpty() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.isEmpty();
+            }
+            return true;
+        }
+
+        @Override
+        public boolean contains(Object object) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.contains(object);
+            }
+            return false;
+        }
+
+        @Override
+        public Iterator iterator() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.iterator();
+            }
+            return null;
+        }
+
+        @Override
+        public Object[] toArray() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.toArray();
+            }
+            return new Object[0];
+        }
+
+        @Override
+        public boolean add(Object object) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.add(object);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean remove(Object object) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.remove(object);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean addAll(Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.addAll(collection);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean addAll(int i, Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.addAll(i, collection);
+            }
+            return false;
+        }
+
+        @Override
+        public void clear() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                list.clear();
+            }
+        }
+
+        @Override
+        public Object get(int i) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.get(i);
+            }
+            return null;
+        }
+
+        @Override
+        public Object set(int i, Object o) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.set(i, o);
+            }
+            return null;
+        }
+
+        @Override
+        public void add(int i, Object o) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                list.add(i, o);
+            }
+        }
+
+        @Override
+        public Object remove(int i) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.remove(i);
+            }
+            return null;
+        }
+
+        @Override
+        public int indexOf(Object o) {
+            return list.indexOf(o);
+        }
+
+        @Override
+        public int lastIndexOf(Object o) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.lastIndexOf(o);
+            }
+            return -1;
+        }
+
+        @Override
+        public ListIterator listIterator() {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.listIterator();
+            }
+            return null;
+        }
+
+        @Override
+        public ListIterator listIterator(int i) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.listIterator(i);
+            }
+            return null;
+        }
+
+        @Override
+        public List subList(int i, int i2) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.subList(i, i2);
+            }
+            return null;
+        }
+
+        @Override
+        public boolean retainAll(Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.retainAll(collection);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean removeAll(Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.removeAll(collection);
+            }
+            return false;
+        }
+
+        @Override
+        public boolean containsAll(Collection collection) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.containsAll(collection);
+            }
+            return false;
+        }
+
+        @Override
+        public Object[] toArray(Object[] objects) {
+            if (hazelcastInstance.getLifecycleService().isRunning()) {
+                return list.toArray(objects);
+            }
+            return null;
+        }
+
+        public String getListenerId() {
+            return listenerId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
index 55d765e..e5dab09 100644
--- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/HazelcastDistributedObjectProvider.java
@@ -20,19 +20,22 @@
 package org.apache.stratos.common.clustering.impl;
 
 import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IList;
 import com.hazelcast.core.ILock;
-import com.hazelcast.core.IMap;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.clustering.DistributedObjectProvider;
 import org.apache.stratos.common.internal.ServiceReferenceHolder;
+import org.wso2.carbon.caching.impl.MapEntryListener;
+import org.wso2.carbon.core.clustering.hazelcast.HazelcastDistributedMapProvider;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Provides objects to be managed in distributed and non-distributed environments.
@@ -40,46 +43,15 @@ import java.util.concurrent.ConcurrentHashMap;
 public class HazelcastDistributedObjectProvider implements DistributedObjectProvider {
     private static final Log log = LogFactory.getLog(HazelcastDistributedObjectProvider.class);
 
-    public HazelcastDistributedObjectProvider() {
-    }
-
-    private boolean isClustered() {
-        AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration();
-        return ((axisConfiguration != null) && (axisConfiguration.getClusteringAgent() != null)
-                && (getHazelcastInstance() != null));
-    }
-
-    private HazelcastInstance getHazelcastInstance() {
-        return ServiceReferenceHolder.getInstance().getHazelcastInstance();
-    }
+    private HazelcastDistributedMapProvider mapProvider;
+    private HazelcastDistributedListProvider listProvider;
+    private Map<Object, Lock> locksMap;
 
-    private com.hazelcast.core.ILock acquireDistributedLock(Object object) {
-        if((!isClustered()) || (object == null)) {
-            return null;
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
-        }
-        ILock lock = getHazelcastInstance().getLock(object);
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
-        }
-        return lock;
-    }
-
-    private void releaseDistributedLock(ILock lock) {
-        if(lock == null) {
-            return;
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
-        }
-        lock.forceUnlock();
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("Distributed lock released for %s", lock.getKey()));
-        }
+    public HazelcastDistributedObjectProvider() {
+        HazelcastInstance hazelcastInstance = ServiceReferenceHolder.getInstance().getHazelcastInstance();
+        mapProvider = new HazelcastDistributedMapProvider(hazelcastInstance);
+        listProvider = new HazelcastDistributedListProvider(hazelcastInstance);
+        locksMap = new HashMap<Object, Lock>();
     }
 
     /**
@@ -91,7 +63,28 @@ public class HazelcastDistributedObjectProvider implements DistributedObjectProv
     @Override
     public Map getMap(String key) {
         if(isClustered()) {
-            return getHazelcastInstance().getMap(key);
+            return mapProvider.getMap(key, new MapEntryListener() {
+                @Override
+                public <X> void entryAdded(X key) {
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Entry added to distributed map: [key] %s", key));
+                    }
+                }
+
+                @Override
+                public <X> void entryRemoved(X key) {
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Entry removed from distributed map: [key] %s", key));
+                    }
+                }
+
+                @Override
+                public <X> void entryUpdated(X key) {
+                    if(log.isDebugEnabled()) {
+                        log.debug(String.format("Entry updated in distributed map: [key] %s", key));
+                    }
+                }
+            });
         } else {
             return new ConcurrentHashMap<Object, Object>();
         }
@@ -106,94 +99,96 @@ public class HazelcastDistributedObjectProvider implements DistributedObjectProv
     @Override
     public List getList(String name) {
         if(isClustered()) {
-            return getHazelcastInstance().getList(name);
+            return listProvider.getList(name, new ListEntryListener() {
+                @Override
+                public void itemAdded(Object item) {
+                    if(log.isDebugEnabled()) {
+                        log.debug("Item added to distributed list: " + item);
+                    }
+                }
+
+                @Override
+                public void itemRemoved(Object item) {
+                    if(log.isDebugEnabled()) {
+                        log.debug("Item removed from distributed list: " + item);
+                    }
+                }
+            });
         } else {
             return new ArrayList();
         }
     }
 
-    /**
-     * Put a key value pair to a map, if clustered use a distributed lock.
-     * @param map
-     * @param key
-     * @param value
-     */
     @Override
-    public void putToMap(Map map, Object key, Object value) {
+    public Lock acquireLock(Object object) {
+        if(isClustered()) {
+            return acquireDistributedLock(object);
+        } else {
+            Lock lock = locksMap.get(object);
+            if(lock == null) {
+                synchronized (object) {
+                    if(lock == null) {
+                        lock = new ReentrantLock();
+                        locksMap.put(object, lock);
+                    }
+                }
+            }
+            lock.lock();
+            return lock;
+        }
+    }
+
+    @Override
+    public void releaseLock(Lock lock) {
          if(isClustered()) {
-             ILock lock = null;
-             try {
-                 IMap imap = (IMap) map;
-                 lock = acquireDistributedLock(imap.getName());
-                 imap.set(key, value);
-             } finally {
-                 releaseDistributedLock(lock);
-             }
+             releaseDistributedLock((ILock)lock);
          } else {
-            map.put(key, value);
+             lock.unlock();
          }
     }
 
-    /**
-     * Remove an object from a map, if clustered use a distributed lock.
-     * @param map
-     * @param key
-     */
-    @Override
-    public void removeFromMap(Map map, Object key) {
-        if(isClustered()) {
-            ILock lock = null;
-            try {
-                IMap imap = (IMap) map;
-                lock = acquireDistributedLock(imap.getName());
-                imap.delete(key);
-            } finally {
-                releaseDistributedLock(lock);
-            }
-        } else {
-            map.remove(key);
-        }
+    private boolean isClustered() {
+        AxisConfiguration axisConfiguration = ServiceReferenceHolder.getInstance().getAxisConfiguration();
+        return ((axisConfiguration != null) && (axisConfiguration.getClusteringAgent() != null)
+                && (getHazelcastInstance() != null));
     }
 
-    /**
-     * Add an object to a list, if clustered use a distributed lock.
-     * @param list
-     * @param value
-     */
-    @Override
-    public void addToList(List list, Object value) {
-        if(isClustered()) {
-            ILock lock = null;
-            try {
-                IList ilist = (IList) list;
-                lock = acquireDistributedLock(ilist.getName());
-                ilist.add(value);
-            } finally {
-                releaseDistributedLock(lock);
+    private HazelcastInstance getHazelcastInstance() {
+        return ServiceReferenceHolder.getInstance().getHazelcastInstance();
+    }
+
+    protected com.hazelcast.core.ILock acquireDistributedLock(Object object) {
+        if(object == null) {
+            if(log.isWarnEnabled()) {
+                log.warn("Could not acquire distributed lock, object is null");
             }
-        } else {
-            list.add(value);
+            return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Acquiring distributed lock for %s...", object.getClass().getSimpleName()));
+        }
+        ILock lock = getHazelcastInstance().getLock(object);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Distributed lock acquired for %s", object.getClass().getSimpleName()));
         }
+        return lock;
     }
 
-    /**
-     * Remove an object from a list, if clustered use a distributed lock.
-     * @param list
-     * @param value
-     */
-    @Override
-    public void removeFromList(List list, Object value) {
-        if(isClustered()) {
-            ILock lock = null;
-            try {
-                IList ilist = (IList) list;
-                lock = acquireDistributedLock(ilist.getName());
-                ilist.remove(value);
-            } finally {
-                releaseDistributedLock(lock);
+    protected void releaseDistributedLock(ILock lock) {
+        if(lock == null) {
+            if(log.isWarnEnabled()) {
+                log.warn("Could not release distributed lock, lock is null");
             }
-        } else {
-            list.remove(value);
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Releasing distributed lock for %s...", lock.getKey()));
+        }
+        lock.forceUnlock();
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Distributed lock released for %s", lock.getKey()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
new file mode 100644
index 0000000..96f72dc
--- /dev/null
+++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/clustering/impl/ListEntryListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.common.clustering.impl;
+
+/**
+ * List entry listener interface.
+ */
+public interface ListEntryListener {
+    /**
+     * Invoked when an item is added to the distributed list.
+     * @param item
+     */
+    void itemAdded(Object item);
+
+    /**
+     * Invoked when an item is removed from the distributed list.
+     * @param item
+     */
+    void itemRemoved(Object item);
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
index afb0c83..60ebc99 100644
--- a/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
+++ b/components/org.apache.stratos.common/src/test/java/org/apache/stratos/common/test/DistributedObjectProviderTest.java
@@ -28,10 +28,9 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
 
-import static junit.framework.TestCase.assertFalse;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -51,6 +50,7 @@ public class DistributedObjectProviderTest {
         ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
         HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
         testPutToMap(provider);
+        testPutToMap(provider);
     }
 
     @Test
@@ -58,77 +58,47 @@ public class DistributedObjectProviderTest {
         ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
         HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
         testPutToMap(provider);
+        testPutToMap(provider);
     }
 
     private void testPutToMap(HazelcastDistributedObjectProvider provider) {
         Map<String, String> map = provider.getMap("MAP1");
-        provider.putToMap(map, "key1", "value1");
-        assertEquals(map.get("key1"), "value1");
-    }
-
-    @Test
-    public void testRemoveFromMapLocal() {
-        ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
-        HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
-        testRemoveFromMap(provider);
-    }
-
-    @Test
-    public void testRemoveFromMapDistributed() {
-        ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
-        HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
-        testRemoveFromMap(provider);
-    }
-
-    private void testRemoveFromMap(HazelcastDistributedObjectProvider provider) {
-        Map<String, String> map = provider.getMap("MAP1");
-        provider.putToMap(map, "key1", "value1");
-        assertEquals(map.get("key1"), "value1");
-        provider.removeFromMap(map, "key1");
-        assertNull(map.get("key1"));
+        Lock lock = null;
+        try {
+            lock = provider.acquireLock("MAP1_WRITE_LOCK");
+            map.put("key1", "value1");
+            assertEquals(map.get("key1"), "value1");
+        } finally {
+            provider.releaseLock(lock);
+        }
     }
 
     @Test
-    public void testAddToListLocal() {
+    public void testGetListLocal() {
         ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
         HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
         testAddToList(provider);
+        testAddToList(provider);
     }
 
     @Test
-    public void testAddToListDistributed() {
+    public void testGetListDistributed() {
         ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
         HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
         testAddToList(provider);
+        testAddToList(provider);
     }
 
     private void testAddToList(HazelcastDistributedObjectProvider provider) {
         List list = provider.getList("LIST1");
-        String value1 = "value1";
-        provider.addToList(list, value1);
-        assertTrue(list.contains(value1));
-    }
-
-    @Test
-    public void testRemoveFromListLocal() {
-        ServiceReferenceHolder.getInstance().setHazelcastInstance(null);
-        HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
-        testRemovalFromList(provider);
-    }
-
-    @Test
-    public void testRemoveFromListDistributed() {
-        ServiceReferenceHolder.getInstance().setHazelcastInstance(hazelcastInstance);
-        HazelcastDistributedObjectProvider provider = new HazelcastDistributedObjectProvider();
-        testRemovalFromList(provider);
-    }
-
-    private void testRemovalFromList(HazelcastDistributedObjectProvider provider) {
-        List list = provider.getList("LIST1");
-        String value1 = "value1";
-        provider.addToList(list, value1);
-        assertTrue(list.contains(value1));
-        provider.removeFromList(list, value1);
-        assertFalse(list.contains(value1));
+        Lock lock = null;
+        try {
+            lock = provider.acquireLock("LIST1_WRITE_LOCK");
+            String value1 = "value1";
+            list.add(value1);
+            assertTrue(list.contains(value1));
+        } finally {
+            provider.releaseLock(lock);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
index 3ff824d..240ebba 100755
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/AlgorithmContext.java
@@ -21,6 +21,8 @@ package org.apache.stratos.load.balancer.context;
 
 import org.apache.stratos.load.balancer.context.map.AlgorithmContextMap;
 
+import java.util.concurrent.locks.Lock;
+
 /**
  * Algorithm context is used for identifying the cluster and its current member for executing load balancing algorithms.
  * Key: service name, cluster id
@@ -32,7 +34,15 @@ public class AlgorithmContext {
     public AlgorithmContext(String serviceName, String clusterId) {
         this.serviceName = serviceName;
         this.clusterId = clusterId;
-        AlgorithmContextMap.getInstance().putCurrentMemberIndex(serviceName, clusterId, 0);
+        Lock lock = null;
+        try {
+            lock = AlgorithmContextMap.getInstance().acquireCurrentMemberIndexLock();
+            AlgorithmContextMap.getInstance().putCurrentMemberIndex(serviceName, clusterId, 0);
+        } finally {
+            if(lock != null) {
+                AlgorithmContextMap.getInstance().releaseCurrentMemberIndexLock(lock);
+            }
+        }
     }
 
     public String getServiceName() {
@@ -48,6 +58,14 @@ public class AlgorithmContext {
     }
 
     public void setCurrentMemberIndex(int currentMemberIndex) {
-        AlgorithmContextMap.getInstance().putCurrentMemberIndex(getServiceName(), getClusterId(), currentMemberIndex);
+        Lock lock = null;
+        try {
+            lock = AlgorithmContextMap.getInstance().acquireCurrentMemberIndexLock();
+            AlgorithmContextMap.getInstance().putCurrentMemberIndex(getServiceName(), getClusterId(), currentMemberIndex);
+        } finally {
+            if(lock != null) {
+                AlgorithmContextMap.getInstance().releaseCurrentMemberIndexLock(lock);
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/e8914f3d/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
index 2c03ccb..35acdb0 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/map/AlgorithmContextMap.java
@@ -25,6 +25,7 @@ import org.apache.stratos.common.clustering.DistributedObjectProvider;
 import org.apache.stratos.load.balancer.internal.ServiceReferenceHolder;
 
 import java.util.Map;
+import java.util.concurrent.locks.Lock;
 
 /**
  * Algorithm context map is a singleton class for managing load balancing algorithm context
@@ -34,6 +35,7 @@ public class AlgorithmContextMap {
     @SuppressWarnings("unused")
     private static final Log log = LogFactory.getLog(AlgorithmContextMap.class);
     private static final String LOAD_BALANCER_ALGORITHM_CONTEXT_MAP = "LOAD_BALANCER_ALGORITHM_CONTEXT_MAP";
+    private static final String CURRENT_MEMBER_INDEX_MAP_LOCK = "CURRENT_MEMBER_INDEX_MAP_LOCK";
     private static AlgorithmContextMap instance;
 
     private final Map<String, Integer> clusterMemberIndexMap;
@@ -61,14 +63,24 @@ public class AlgorithmContextMap {
         return String.format("%s-%s", serviceName, clusterId);
     }
 
+    public Lock acquireCurrentMemberIndexLock() {
+        return distributedObjectProvider.acquireLock(CURRENT_MEMBER_INDEX_MAP_LOCK);
+    }
+
+    public void releaseCurrentMemberIndexLock(Lock lock) {
+        if(lock != null) {
+            distributedObjectProvider.releaseLock(lock);
+        }
+    }
+
     public void putCurrentMemberIndex(String serviceName, String clusterId, int currentMemberIndex) {
         String key = constructKey(serviceName, clusterId);
-        distributedObjectProvider.putToMap(clusterMemberIndexMap, key, currentMemberIndex);
+        clusterMemberIndexMap.put(key, currentMemberIndex);
     }
 
     public void removeCluster(String serviceName, String clusterId) {
         String key = constructKey(serviceName, clusterId);
-        distributedObjectProvider.removeFromMap(clusterMemberIndexMap, key);
+        clusterMemberIndexMap.remove(key);
     }
 
     public int getCurrentMemberIndex(String serviceName, String clusterId) {


Mime
View raw message