openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r748292 - in /openjpa/trunk/openjpa-slice/src: main/java/org/apache/openjpa/slice/ main/java/org/apache/openjpa/slice/jdbc/ test/java/org/apache/openjpa/slice/
Date Thu, 26 Feb 2009 20:20:45 GMT
Author: ppoddar
Date: Thu Feb 26 20:20:44 2009
New Revision: 748292

URL: http://svn.apache.org/viewvc?rev=748292&view=rev
Log:
OPENJPA-825: A new threading policy for Slice via extension

Added:
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
Modified:
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
Thu Feb 26 20:20:44 2009
@@ -21,6 +21,8 @@
 import org.apache.openjpa.kernel.FinalizingBrokerImpl;
 import org.apache.openjpa.kernel.OpCallbacks;
 import org.apache.openjpa.kernel.OpenJPAStateManager;
+import org.apache.openjpa.kernel.QueryImpl;
+import org.apache.openjpa.kernel.StoreQuery;
 import org.apache.openjpa.lib.util.Localizer;
 
 /**
@@ -37,9 +39,16 @@
 public class DistributedBrokerImpl extends FinalizingBrokerImpl {
 	private transient String _rootSlice;
 	private transient DistributedConfiguration _conf;
+	private final ReentrantSliceLock _lock;
+	
 	private static final Localizer _loc =
 			Localizer.forPackage(DistributedBrokerImpl.class);
 
+	public DistributedBrokerImpl() {
+	    super();
+	    _lock = new ReentrantSliceLock();
+	}
+	
     public DistributedConfiguration getConfiguration() {
     	if (_conf == null) {
     		_conf = (DistributedConfiguration)super.getConfiguration();
@@ -89,6 +98,26 @@
 	    return true;
 	}
 	
+    /**
+     * Create a new query.
+     */
+    protected QueryImpl newQueryImpl(String lang, StoreQuery sq) {
+        return new DistributedQueryImpl(this, lang, sq);
+    }
+    
+	/**
+	 * Always uses lock irrespective of super's multi-threaded settings.
+	 */
+    @Override
+    public void lock() {
+        _lock.lock();
+    }
+    
+    @Override
+    public void unlock() {
+        _lock.unlock();
+    }
+	
 	/**
 	 * A virtual datastore need not be opened.
 	 */

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java?rev=748292&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java
(added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java
Thu Feb 26 20:20:44 2009
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice;
+
+import org.apache.openjpa.kernel.Broker;
+import org.apache.openjpa.kernel.QueryImpl;
+import org.apache.openjpa.kernel.StoreQuery;
+
+/**
+ * Extension with slice locking policy.
+ * 
+ * @author Pinaki Poddar
+ *
+ */
+public class DistributedQueryImpl extends QueryImpl {
+    private final ReentrantSliceLock _lock;
+    public DistributedQueryImpl(Broker broker, String language, StoreQuery storeQuery) {
+        super(broker, language, storeQuery);
+        _lock = new ReentrantSliceLock();
+    }
+    
+    /**
+     * Always uses lock irrespective of super's multi-threaded settings.
+     */
+    @Override
+    public void lock() {
+        _lock.lock();
+    }
+    
+    @Override
+    public void unlock() {
+        _lock.unlock();
+    }
+}

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java
Thu Feb 26 20:20:44 2009
@@ -62,6 +62,8 @@
 
     /**
      * Configures a cached or fixed thread pool.
+     * The factory always produces SliceThread which uses special locking.
+     * 
      */
     @Override
     public Object instantiate(Class type, Configuration conf, boolean fatal) {
@@ -73,21 +75,7 @@
 
         Options opts = Configurations.parseProperties(getProperties());
 
-        ThreadFactory factory = null;
-        if (opts.containsKey("ThreadFactory")) {
-            String fName = opts.getProperty("ThreadFactory");
-            try {
-                factory = (ThreadFactory) Class.forName(fName).newInstance();
-                Configurations.configureInstance(factory, conf, opts,
-                        getProperty());
-            } catch (Throwable t) {
-                throw new UserException(_loc.get("bad-thread-factory", fName), t);
-            } finally {
-                opts.removeProperty("ThreadFactory");
-            }
-        } else {
-            factory = Executors.defaultThreadFactory();
-        }
+        ThreadFactory factory = new SliceThreadFactory();
         if ("cached".equals(cls)) {
             obj = Executors.newCachedThreadPool(factory);
         } else if ("fixed".equals(cls)) {
@@ -105,4 +93,10 @@
         set(obj, true);
         return obj;
     }
+    
+    private static class SliceThreadFactory implements ThreadFactory {
+        public Thread newThread(Runnable r) {
+            return new SliceThread(Thread.currentThread(), r);
+        }
+    }
 }

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java?rev=748292&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java
(added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java
Thu Feb 26 20:20:44 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A reentrant lock that lets a child to work with the parent's lock.
+ * 
+ * @author Pinaki Poddar
+ *
+ */
+public class ReentrantSliceLock extends ReentrantLock {
+
+    public ReentrantSliceLock() {
+    }
+
+    public ReentrantSliceLock(boolean fair) {
+        super(fair);
+    }
+    
+    /**
+     * Locks only for parent thread and let the child use parent's lock. 
+     */
+    @Override
+    public void lock() {
+        if (Thread.currentThread() instanceof SliceThread) 
+            return;
+        super.lock();
+    }
+
+    /**
+     * Unlocks only if parent thread. 
+     */
+    @Override
+    public void unlock() {
+        if (Thread.currentThread() instanceof SliceThread) 
+            return;
+        super.unlock();
+    }
+}

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java?rev=748292&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java (added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java Thu
Feb 26 20:20:44 2009
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A thread to execute operation against each database slice.
+ * 
+ * @author Pinaki Poddar
+ *
+ */
+public class SliceThread extends Thread {
+    private final Thread _parent;
+
+    public SliceThread(String name, Thread parent, Runnable r) {
+        super(r, name);
+        _parent = parent;
+    }
+    
+    public SliceThread(Thread parent, Runnable r) {
+        super(r);
+        _parent = parent;
+    }
+    
+    /**
+     * Gets the parent thread of this receiver.
+     * 
+     */
+    public Thread getParent() {
+        return _parent;
+    }
+    
+    /** 
+     * Create a pool of given size.
+     * The thread factory is specialized to create SliceThread which gets
+     * preferential treatment for locking.
+     * 
+     */
+
+    public static ExecutorService newPool(int size) {
+        return new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS, 
+            new SynchronousQueue<Runnable>(), new SliceThreadFactory());
+    }
+    
+    static class SliceThreadFactory implements ThreadFactory {
+        int n = 0;
+        public Thread newThread(Runnable r) {
+            Thread parent = Thread.currentThread();
+            return new SliceThread(parent.getName()+"-slice-"+n++, parent, r);
+        }
+    }
+
+}

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
Thu Feb 26 20:20:44 2009
@@ -18,34 +18,22 @@
  */
 package org.apache.openjpa.slice.jdbc;
 
-import java.util.concurrent.ExecutorService;
-
 import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
 import org.apache.openjpa.slice.DistributedConfiguration;
 import org.apache.openjpa.slice.Slice;
 
 /**
- * A distributed configuration that is a ordered collection of 
+ * A distributed configuration that is a ordered collection of
  * JDBCConfigurations.
  * 
- * @author Pinaki Poddar 
- *
+ * @author Pinaki Poddar
+ * 
  */
-public interface DistributedJDBCConfiguration extends JDBCConfiguration, 
-	DistributedConfiguration {
+public interface DistributedJDBCConfiguration extends JDBCConfiguration,
+    DistributedConfiguration {
     /**
      * Gets the master slice.
      */
     Slice getMaster();
-    
-    /**
-     * Gets the alias for ExecutorService being used.
-     */
-    
-    String getExecutorService();
-    
-    /**
-     * Gets the ExecutorService being used.
-     */
-    ExecutorService getExecutorServiceInstance();
+
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
Thu Feb 26 20:20:44 2009
@@ -477,19 +477,4 @@
             _master = activeSlices.get(0);
         }
     }
-    
-    public String getExecutorService() {
-        return executorServicePlugin.getString();
-    }
-
-    public void setExecutorService(ExecutorService txnManager) {
-        executorServicePlugin.set(txnManager);
-    }
-
-    public ExecutorService getExecutorServiceInstance() {
-        if (executorServicePlugin.get() == null) {
-            executorServicePlugin.instantiate(ExecutorService.class, this);
-        }
-        return (ExecutorService) executorServicePlugin.get();
-    }    
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
Thu Feb 26 20:20:44 2009
@@ -59,6 +59,7 @@
 import org.apache.openjpa.slice.ProductDerivation;
 import org.apache.openjpa.slice.SliceImplHelper;
 import org.apache.openjpa.slice.SliceInfo;
+import org.apache.openjpa.slice.SliceThread;
 import org.apache.openjpa.util.InternalException;
 import org.apache.openjpa.util.StoreException;
 
@@ -78,7 +79,6 @@
     private final DistributedJDBCConfiguration _conf;
     private static final Localizer _loc =
             Localizer.forPackage(DistributedStoreManager.class);
-    private static ExecutorService threadPool = Executors.newCachedThreadPool();
 
     /**
      * Constructs a set of child StoreManagers each connected to a physical
@@ -249,7 +249,7 @@
         Map<String, StateManagerSet> subsets = bin(sms, null);
         Collection<StateManagerSet> remaining = 
             new ArrayList<StateManagerSet>(subsets.values());
-        boolean parallel = !getConfiguration().getMultithreaded();
+        ExecutorService threadPool = SliceThread.newPool(_slices.size());
         for (int i = 0; i < _slices.size(); i++) {
             SliceStoreManager slice = _slices.get(i);
             StateManagerSet subset = subsets.get(slice.getName());
@@ -262,24 +262,19 @@
                 remaining.remove(subset);
             	rollbackVersion(subset.getReplicated(), oldVersions, remaining);
             } else {
-            	if (parallel) {
-            		futures.add(threadPool.submit(new Flusher(slice, subset)));
-            	} else {
-                	collectException(slice.flush(subset), exceptions);
-            	}
+            	futures.add(threadPool.submit(new Flusher(slice, subset)));
             }
         }
-        if (parallel) {
-	        for (Future<Collection> future : futures) {
-	            try {
-	            	collectException(future.get(), exceptions);
-	            } catch (InterruptedException e) {
-	                throw new StoreException(e);
-	            } catch (ExecutionException e) {
-	                throw new StoreException(e.getCause());
-	            }
-	        }
+        for (Future<Collection> future : futures) {
+            try {
+            	collectException(future.get(), exceptions);
+            } catch (InterruptedException e) {
+                throw new StoreException(e);
+            } catch (ExecutionException e) {
+                throw new StoreException(e.getCause());
+            }
         }
+        
 	    return exceptions;
     }
     
@@ -498,12 +493,7 @@
         }
 
         public Collection call() throws Exception {
-        	((BrokerImpl)store.getContext()).startLocking();
-        	try {
-        		return store.flush(toFlush);
-        	} finally {
-            	((BrokerImpl)store.getContext()).stopLocking();
-        	}
+        	return store.flush(toFlush);
         }
     }
     

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
Thu Feb 26 20:20:44 2009
@@ -25,6 +25,10 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.openjpa.jdbc.kernel.JDBCStore;
 import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
@@ -41,6 +45,7 @@
 import org.apache.openjpa.lib.rop.RangeResultObjectProvider;
 import org.apache.openjpa.lib.rop.ResultObjectProvider;
 import org.apache.openjpa.meta.ClassMetaData;
+import org.apache.openjpa.slice.SliceThread;
 import org.apache.openjpa.util.StoreException;
 
 /**
@@ -84,12 +89,6 @@
 			q.setContext(ctx);
 	}
 
-	public ExecutorService getExecutorServiceInstance() {
-		DistributedJDBCConfiguration conf = ((DistributedJDBCConfiguration) 
-			getStore().getConfiguration());
-		return conf.getExecutorServiceInstance();
-	}
-
 	/**
 	 * Executes queries on multiple databases.
 	 * 
@@ -100,16 +99,12 @@
 			ExpressionStoreQuery.DataStoreExecutor {
 		private List<Executor> executors = new ArrayList<Executor>();
 		private DistributedStoreQuery owner = null;
-		private ExecutorService threadPool = null;
-		private final boolean parallel;
 
 		public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta,
 				boolean subclasses, ExpressionParser parser, Object parsed, 
 				boolean parallel) {
 			super(dsq, meta, subclasses, parser, parsed);
 			owner = dsq;
-			threadPool = dsq.getExecutorServiceInstance();
-			this.parallel = parallel;
 		}
 
 		public void addExecutor(Executor ex) {
@@ -130,6 +125,7 @@
 			List<SliceStoreManager> targets = findTargets();
 			QueryContext ctx = q.getContext();
 			boolean isReplicated = containsReplicated(ctx);
+			ExecutorService threadPool = SliceThread.newPool(owner._queries.size());
 			for (int i = 0; i < owner._queries.size(); i++) {
 				// if replicated, then execute only on single slice
 				if (isReplicated && !usedExecutors.isEmpty()) {
@@ -143,28 +139,23 @@
 				if (!targets.contains(sm))
 					continue;
 				usedExecutors.add(executor);
-				if (!parallel) {
-					rops.add(executor.executeQuery(query, params, range));
-				} else {
 					QueryExecutor call = new QueryExecutor();
 					call.executor = executor;
 					call.query = query;
 					call.params = params;
 					call.range = range;
 					futures.add(threadPool.submit(call));
-				}
 			}
-			if (parallel) {
-				for (Future<ResultObjectProvider> future : futures) {
-					try {
-						rops.add(future.get());
-					} catch (InterruptedException e) {
-						throw new RuntimeException(e);
-					} catch (ExecutionException e) {
-						throw new StoreException(e.getCause());
-					}
+			for (Future<ResultObjectProvider> future : futures) {
+				try {
+					rops.add(future.get());
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
 				}
 			}
+			
 			ResultObjectProvider[] tmp = rops
 					.toArray(new ResultObjectProvider[rops.size()]);
 			ResultObjectProvider result = null;
@@ -214,6 +205,7 @@
 			Iterator<StoreQuery> qs = owner._queries.iterator();
 			List<Future<Number>> futures = null;
 			int result = 0;
+			ExecutorService threadPool = SliceThread.newPool(executors.size());
 			for (Executor ex : executors) {
 				if (futures == null)
 					futures = new ArrayList<Future<Number>>();
@@ -241,6 +233,7 @@
 			Iterator<StoreQuery> qs = owner._queries.iterator();
 			List<Future<Number>> futures = null;
 			int result = 0;
+            ExecutorService threadPool = SliceThread.newPool(executors.size());
 			for (Executor ex : executors) {
 				if (futures == null)
 					futures = new ArrayList<Future<Number>>();
@@ -268,6 +261,7 @@
 					.getFetchConfiguration();
 			return owner.getDistributedStore().getTargets(fetch);
 		}
+		
 	}
 
 	static class QueryExecutor implements Callable<ResultObjectProvider> {
@@ -277,18 +271,7 @@
 		Range range;
 
 		public ResultObjectProvider call() throws Exception {
-			((QueryImpl)query.getContext()).startLocking();
-			((BrokerImpl)query.getContext().getStoreContext()).startLocking();
-			((QueryImpl)query.getContext()).lock();
-			((BrokerImpl)query.getContext().getStoreContext()).lock();
-			try { 
-				return executor.executeQuery(query, params, range);
-			} finally {
-				((QueryImpl)query.getContext()).unlock();
-				((BrokerImpl)query.getContext().getStoreContext()).unlock();
-				((QueryImpl)query.getContext()).stopLocking();
-				((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
-			}
+			return executor.executeQuery(query, params, range);
 		}
 	}
 
@@ -298,18 +281,7 @@
 		Object[] params;
 
 		public Number call() throws Exception {
-			((QueryImpl)query.getContext()).startLocking();
-			((BrokerImpl)query.getContext().getStoreContext()).startLocking();
-			((QueryImpl)query.getContext()).lock();
-			((BrokerImpl)query.getContext().getStoreContext()).lock();
-			try { 
-				return executor.executeDelete(query, params);
-			} finally {
-				((QueryImpl)query.getContext()).unlock();
-				((BrokerImpl)query.getContext().getStoreContext()).unlock();
-				((QueryImpl)query.getContext()).stopLocking();
-				((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
-			}
+			return executor.executeDelete(query, params);
 		}
 	}
 
@@ -319,18 +291,7 @@
 		Object[] params;
 
 		public Number call() throws Exception {
-			((QueryImpl)query.getContext()).startLocking();
-			((BrokerImpl)query.getContext().getStoreContext()).startLocking();
-			((QueryImpl)query.getContext()).lock();
-			((BrokerImpl)query.getContext().getStoreContext()).lock();
-			try { 
-				return executor.executeUpdate(query, params);
-			} finally {
-				((QueryImpl)query.getContext()).unlock();
-				((BrokerImpl)query.getContext().getStoreContext()).unlock();
-				((QueryImpl)query.getContext()).stopLocking();
-				((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
-			}
+		    return executor.executeUpdate(query, params);
 		}
 	}
 }

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java Thu
Feb 26 20:20:44 2009
@@ -128,18 +128,17 @@
 
         EntityManager em = emf.createEntityManager();
         em.getTransaction().begin();
-        PObject pc2 = em.getReference(PObject.class, pc.getId());
-        assertNotNull(pc2);
-        assertNotEquals(pc, pc2);
-        assertEquals(pc.getId(), pc2.getId());
-        assertEquals(value, pc2.getValue());
-        pc2.setValue(value+1);
-        em.merge(pc2);
+        PObject ref = em.getReference(PObject.class, pc.getId());
+        assertNotNull(ref);
+        assertNotEquals(pc, ref);
+        assertEquals(ref.getId(), pc.getId());
+        pc.setValue(value+1);
+        em.merge(pc);
         em.getTransaction().commit();
         em.clear();
         
         em.getTransaction().begin();
-        PObject pc3 = em.getReference(PObject.class, pc.getId());
+        PObject pc3 = em.find(PObject.class, pc.getId());
         assertEquals(value+1, pc3.getValue());
         em.getTransaction().commit();
         



Mime
View raw message