openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r728807 - in /openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice: SlicePersistence.java jdbc/DistributedStoreManager.java
Date Mon, 22 Dec 2008 22:11:49 GMT
Author: ppoddar
Date: Mon Dec 22 14:11:48 2008
New Revision: 728807

URL: http://svn.apache.org/viewvc?rev=728807&view=rev
Log:
Handle version adjustment when replicated instances are flushed to multiple slices. Refactor
untyped OpenJPAStateManager collections to save some redundant iteration.

Modified:
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java?rev=728807&r1=728806&r2=728807&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java
Mon Dec 22 14:11:48 2008
@@ -31,6 +31,13 @@
  *
  */
 public class SlicePersistence {
+    /**
+     * The key for setting the Query hints. The value is comma-separated Slice
+     * names. If the hint is specified then the query is executed only on the
+     * listed slices. 
+     */
+    public static final String HINT_TARGET = ProductDerivation.HINT_TARGET;
+    
 	/**
 	 * Get the slice identifier for the given instance if it is a managed
 	 * instance and has been assigned to a slice.
@@ -69,5 +76,4 @@
             return false;
         return info.isReplicated();
     }
-
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=728807&r1=728806&r2=728807&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
Mon Dec 22 14:11:48 2008
@@ -25,6 +25,7 @@
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -245,19 +246,21 @@
     public Collection flush(Collection sms) {
         Collection exceptions = new ArrayList();
         List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
-        Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
-        
+        Map<String, StateManagerSet> subsets = bin(sms, null);
+        Collection<StateManagerSet> remaining = 
+            new ArrayList<StateManagerSet>(subsets.values());
         boolean parallel = !getConfiguration().getMultithreaded();
         for (int i = 0; i < _slices.size(); i++) {
             SliceStoreManager slice = _slices.get(i);
-            List<OpenJPAStateManager> subset = subsets.get(slice.getName());
+            StateManagerSet subset = subsets.get(slice.getName());
             if (subset.isEmpty())
                 continue;
-            if (containsReplicated(subset)) {
-                Map<OpenJPAStateManager, Object> versions = cacheVersion(subset);
+            if (subset.containsReplicated()) {
+                Map<OpenJPAStateManager, Object> oldVersions = cacheVersion(
+                    subset.getReplicated());
             	collectException(slice.flush(subset), exceptions);
-            	if (i != _slices.size()-1)
-            	    rollbackVersion(subset, versions);
+                remaining.remove(subset);
+            	rollbackVersion(subset.getReplicated(), oldVersions, remaining);
             } else {
             	if (parallel) {
             		futures.add(threadPool.submit(new Flusher(slice, subset)));
@@ -286,40 +289,51 @@
         }
     }
     
-    boolean containsReplicated(List<OpenJPAStateManager> sms) {
-    	for (OpenJPAStateManager sm : sms)
-    		if (sm.getMetaData().isReplicated())
-    			return true;
-    	return false;
-    }
-    
-    private Map<OpenJPAStateManager, Object> cacheVersion
-        (List<OpenJPAStateManager> sms) {
+    /**
+     * Collect the current versions of the given StateManagers.
+     */
+    private Map<OpenJPAStateManager, Object> cacheVersion(
+        List<OpenJPAStateManager> sms) {
         Map<OpenJPAStateManager, Object> result = 
-               new HashMap<OpenJPAStateManager, Object>();
+            new HashMap<OpenJPAStateManager, Object>();
         for (OpenJPAStateManager sm : sms)
-            if (sm.getMetaData().isReplicated())
-                result.put(sm, sm.getVersion());
+            result.put(sm, sm.getVersion());
         return result;
     }
     
+    /**
+     * Sets the version of the given StateManagers from the cached versions.
+     * Provided that the StateManager does not appear in the FlusSets of the
+     * remaining.
+     */
     private void rollbackVersion(List<OpenJPAStateManager> sms, 
-        Map<OpenJPAStateManager, Object> result) {
-        for (OpenJPAStateManager sm : sms)
-            if (sm.getMetaData().isReplicated())
-                sm.setVersion(result.get(sm));
+        Map<OpenJPAStateManager, Object> oldVersions, 
+        Collection<StateManagerSet> reminder) {
+        if (reminder.isEmpty())
+            return;
+        for (OpenJPAStateManager sm : sms) {
+            if (occurs(sm, reminder))
+              sm.setVersion(oldVersions.get(sm));
+        }
+    }
+    
+    boolean occurs(OpenJPAStateManager sm, 
+        Collection<StateManagerSet> reminder) {
+        for (StateManagerSet set : reminder)
+            if (set.contains(sm))
+                return true;
+        return false;
     }
     
     /**
      * Separate the given list of StateManagers in separate lists for each slice 
      * by the associated slice identifier of each StateManager.
      */
-    private Map<String, List<OpenJPAStateManager>> bin(
-            Collection/*<StateManage>*/ sms, Object edata) {
-        Map<String, List<OpenJPAStateManager>> subsets =
-                new HashMap<String, List<OpenJPAStateManager>>();
+    private Map<String, StateManagerSet> bin(Collection sms, Object edata) {
+        Map<String, StateManagerSet> subsets =  
+            new HashMap<String, StateManagerSet>();
         for (SliceStoreManager slice : _slices)
-            subsets.put(slice.getName(), new ArrayList<OpenJPAStateManager>());
+            subsets.put(slice.getName(), new StateManagerSet());
         for (Object x : sms) {
             OpenJPAStateManager sm = (OpenJPAStateManager) x;
             String[] targets = findSliceNames(sm, edata).getSlices();
@@ -379,10 +393,10 @@
 
     public Collection loadAll(Collection sms, PCState state, int load,
             FetchConfiguration fetch, Object edata) {
-        Map<String, List<OpenJPAStateManager>> subsets = bin(sms, edata);
+        Map<String, StateManagerSet> subsets = bin(sms, edata);
         Collection result = new ArrayList();
         for (SliceStoreManager slice : _slices) {
-            List<OpenJPAStateManager> subset = subsets.get(slice.getName());
+            StateManagerSet subset = subsets.get(slice.getName());
             if (subset.isEmpty())
                 continue;
             Collection tmp = slice.loadAll(subset, state, load, fetch, edata);
@@ -474,15 +488,11 @@
         return targets;
     }
     
-    void log(String s) {
-        System.out.println("["+Thread.currentThread().getName()+"] " + this + s);
-    }
-
     private static class Flusher implements Callable<Collection> {
         final SliceStoreManager store;
-        final Collection toFlush;
+        final StateManagerSet toFlush;
 
-        Flusher(SliceStoreManager store, Collection toFlush) {
+        Flusher(SliceStoreManager store, StateManagerSet toFlush) {
             this.store = store;
             this.toFlush = toFlush;
         }
@@ -496,5 +506,37 @@
         	}
         }
     }
-
+    
+    /**
+     * A specialized, insert-only collection of StateManagers that notes 
+     * if any of its member is replicated.
+     *  
+     */
+    private static class StateManagerSet extends HashSet<OpenJPAStateManager> {
+        List<OpenJPAStateManager> replicated;
+        
+        @Override
+        public boolean add(OpenJPAStateManager sm) {
+            boolean isReplicated = sm.getMetaData().isReplicated();
+            if (isReplicated) {
+                if (replicated == null)
+                    replicated = new ArrayList<OpenJPAStateManager>();
+                replicated.add(sm);
+            }
+            return super.add(sm);
+        }
+        
+        @Override
+        public boolean remove(Object sm) {
+            throw new UnsupportedOperationException();
+        }
+        
+        boolean containsReplicated() {
+            return replicated != null && !replicated.isEmpty();
+        }
+        
+        List<OpenJPAStateManager> getReplicated() {
+            return replicated;
+        }
+    }
 }



Mime
View raw message