openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r726770 - in /openjpa/trunk/openjpa-slice/src: main/java/org/apache/openjpa/slice/ main/java/org/apache/openjpa/slice/jdbc/ test/java/org/apache/openjpa/slice/
Date Mon, 15 Dec 2008 19:02:20 GMT
Author: ppoddar
Date: Mon Dec 15 11:02:19 2008
New Revision: 726770

URL: http://svn.apache.org/viewvc?rev=726770&view=rev
Log:
OPENJPA-825: Execute slice operations serailly when openjpa.Multithreaded=true. Otherwise
continue using parallel execution mode.

Added:
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
Modified:
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReplicationPolicy.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQuery.java

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReplicationPolicy.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReplicationPolicy.java?rev=726770&r1=726769&r2=726770&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReplicationPolicy.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReplicationPolicy.java
Mon Dec 15 11:02:19 2008
@@ -20,12 +20,16 @@
 
 import java.util.List;
 
-
 /**
- * Policy to select one of the physical databases referred as <em>slice</em>
- * in which a given persistent instance will be replicated.
+ * Policy to select one or more of the physical databases referred as 
+ * <em>slice</em> in which a given persistent instance will be persisted.
+ * 
+ * This interface is invoked for entity types annotated as @Replicated
  *  
  * @author Pinaki Poddar 
+ * 
+ * @see DistributionPolicy
+ * @see Replicated
  *
  */
 public interface ReplicationPolicy {
@@ -40,7 +44,7 @@
 	 * @param context generic persistence context managing the given instance.
 	 * 
 	 * @return identifier of the slices. This names must match one of the
-	 * given slice names. Return null or empty list to imply all active slices.
+	 * given slice names. 
 	 *  
 	 * @see DistributedConfiguration#getActiveSliceNames()
 	 */
@@ -54,7 +58,7 @@
 	public static class Default implements ReplicationPolicy {
 		public String[] replicate(Object pc, List<String> slices, 
 			Object context) {
-			return null;
+			return slices.toArray(new String[slices.size()]);
 		}
 	}
 }

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=726770&r1=726769&r2=726770&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
Mon Dec 15 11:02:19 2008
@@ -241,30 +241,40 @@
         Collection exceptions = new ArrayList();
         List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
         Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
+        
+        boolean serialMode = getConfiguration().getMultithreaded();
         for (SliceStoreManager slice : _slices) {
             List<OpenJPAStateManager> subset = subsets.get(slice.getName());
             if (subset.isEmpty())
                 continue;
             if (containsReplicated(subset)) {
-            	slice.flush(subset);
+            	collectException(slice.flush(subset), exceptions);
             } else {
-            	futures.add(threadPool.submit(new Flusher(slice, subset)));
+            	if (serialMode) {
+                	collectException(slice.flush(subset), exceptions);
+            	} else {
+            		futures.add(threadPool.submit(new Flusher(slice, subset)));
+            	}
             }
         }
-        for (Future<Collection> future : futures) {
-            Collection error;
-            try {
-                error = future.get();
-                if (!(error == null || error.isEmpty())) {
-                    exceptions.addAll(error);
-                }
-            } catch (InterruptedException e) {
-                throw new StoreException(e);
-            } catch (ExecutionException e) {
-                throw new StoreException(e.getCause());
-            }
+        if (!serialMode) {
+	        for (Future<Collection> future : futures) {
+	            try {
+	            	collectException(future.get(), exceptions);
+	            } catch (InterruptedException e) {
+	                throw new StoreException(e);
+	            } catch (ExecutionException e) {
+	                throw new StoreException(e.getCause());
+	            }
+	        }
+        }
+	    return exceptions;
+    }
+    
+    private void collectException(Collection error,  Collection holder) {
+        if (!(error == null || error.isEmpty())) {
+        	holder.addAll(error);
         }
-        return exceptions;
     }
     
     boolean containsReplicated(List<OpenJPAStateManager> sms) {

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=726770&r1=726769&r2=726770&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
Mon Dec 15 11:02:19 2008
@@ -51,10 +51,12 @@
 class DistributedStoreQuery extends JDBCStoreQuery {
 	private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
 	private ExpressionParser _parser;
+	private boolean _serialMode;
 	
 	public DistributedStoreQuery(JDBCStore store, ExpressionParser parser) {
 		super(store, parser);
 		_parser = parser;
+		_serialMode = store.getContext().getConfiguration().getMultithreaded();
 		
 	}
 	
@@ -68,7 +70,7 @@
 	
     public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
     	ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser, 
-    			ctx.getCompilation());
+    			ctx.getCompilation(), _serialMode);
         for (StoreQuery q : _queries) {
             ex.addExecutor(q.newDataStoreExecutor(meta, subs));
         }
@@ -98,72 +100,92 @@
 		private List<Executor> executors = new ArrayList<Executor>();
 		private DistributedStoreQuery owner = null;
 		private ExecutorService threadPool = null;
-		
-		public void addExecutor(Executor ex) {
-			executors.add(ex);
-		}
+		private final boolean serialMode;
 		
         public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta, 
-        		boolean subclasses, ExpressionParser parser, Object parsed) {
+        	boolean subclasses, ExpressionParser parser, Object parsed, 
+        	boolean serial) {
         	super(dsq, meta, subclasses, parser, parsed);
         	owner = dsq;
         	threadPool = dsq.getExecutorServiceInstance();
+        	serialMode = serial;
         }
         
+		public void addExecutor(Executor ex) {
+			executors.add(ex);
+		}
+		
         /**
          * Each child query must be executed with slice context and not the 
          * given query context.
          */
         public ResultObjectProvider executeQuery(StoreQuery q,
                 final Object[] params, final Range range) {
-        	final List<Future<ResultObjectProvider>> futures = 
-        		new ArrayList<Future<ResultObjectProvider>>();
+        	List<Future<ResultObjectProvider>> futures = null;
+        	final List<Executor> usedExecutors = new ArrayList<Executor>();
+        	final List<ResultObjectProvider> rops = 
+        		new ArrayList<ResultObjectProvider>();
         	List<SliceStoreManager> targets = findTargets();
+        	QueryContext ctx = q.getContext();
+        	boolean isReplicated = containsReplicated(ctx);
         	for (int i = 0; i < owner._queries.size(); i++) {
-        		StoreQuery query = owner._queries.get(i);
+        		// if replicated, then execute only on single slice
+        		if (i > 0 && isReplicated) {
+        			continue;
+        		}
         		StoreManager sm  = owner.getDistributedStore().getSlice(i);
         		if (!targets.contains(sm))
         			continue;
-        		// if replicated, then execute only on single slice
-        		if (i > 0 && containsReplicated(query.getContext()))
+         		StoreQuery query = owner._queries.get(i);
+        		Executor executor = executors.get(i);
+        		if (!targets.contains(sm))
         			continue;
-        		QueryExecutor call = new QueryExecutor();
-        		call.executor = executors.get(i);
-        		call.query    = query;
-        		call.params   = params;
-        		call.range    = range;
-        		futures.add(threadPool.submit(call)); 
+        		usedExecutors.add(executor);
+        		if (serialMode) {
+        			rops.add(executor.executeQuery(query, params, range));
+        		} else {
+        			if (futures == null)
+        				futures = new ArrayList<Future<ResultObjectProvider>>();
+	        		QueryExecutor call = new QueryExecutor();
+	        		call.executor = executor;
+	        		call.query    = query;
+	        		call.params   = params;
+	        		call.range    = range;
+	        		futures.add(threadPool.submit(call)); 
+        		}
         	}
-        	int i = 0;
-        	ResultObjectProvider[] tmp = new ResultObjectProvider[futures.size()];
-        	for (Future<ResultObjectProvider> future:futures) {
-        		try {
-					tmp[i++] = future.get();
-				} catch (InterruptedException e) {
-					throw new RuntimeException(e);
-				} catch (ExecutionException e) {
-					throw new StoreException(e.getCause());
-				}
+        	if (!serialMode) {
+	    		for (Future<ResultObjectProvider> future:futures) {
+	        		try {
+						rops.add(future.get());
+					} catch (InterruptedException e) {
+						throw new RuntimeException(e);
+					} catch (ExecutionException e) {
+						throw new StoreException(e.getCause());
+					}
+	        	}
         	}
+        	ResultObjectProvider[] tmp = rops.toArray
+        		(new ResultObjectProvider[rops.size()]);
+        	ResultObjectProvider result = null;
         	boolean[] ascending = getAscending(q);
         	boolean isAscending = ascending.length > 0;
-        	boolean isAggregate = q.getContext().isAggregate();
-        	boolean hasRange    = q.getContext().getEndRange() != Long.MAX_VALUE;
-        	ResultObjectProvider result = null;
+        	boolean isAggregate = ctx.isAggregate();
+        	boolean hasRange    = ctx.getEndRange() != Long.MAX_VALUE;
         	if (isAggregate) {
         	    result = new UniqueResultObjectProvider(tmp, q, 
         	            getQueryExpressions());
         	} else if (isAscending) {
         	    result = new OrderingMergedResultObjectProvider(tmp, ascending, 
-                  (Executor[])executors.toArray(new Executor[executors.size()]),
+                  usedExecutors.toArray(new Executor[usedExecutors.size()]),
                   q, params);
         	} else {
         	    result = new MergedResultObjectProvider(tmp);
         	}
-        	if (hasRange)
+        	if (hasRange) {
         	    result = new RangeResultObjectProvider(result, 
-        	            q.getContext().getStartRange(), 
-        	            q.getContext().getEndRange());
+        	            ctx.getStartRange(), ctx.getEndRange());
+        	}
         	return result;
         }
         
@@ -190,59 +212,76 @@
         
         public Number executeDelete(StoreQuery q, Object[] params) {
         	Iterator<StoreQuery> qs = owner._queries.iterator();
-        	final List<Future<Number>> futures = new ArrayList<Future<Number>>();
+        	List<Future<Number>> futures = null;
+        	int result = 0;
         	for (Executor ex:executors) {
-        		DeleteExecutor call = new DeleteExecutor();
-        		call.executor = ex;
-        		call.query    = qs.next();
-        		call.params   = params;
-        		futures.add(threadPool.submit(call)); 
+        		if (serialMode) {
+        			Number n = ex.executeDelete(qs.next(), params);    
+        			if (n != null)
+        				result += n.intValue();
+        		} else {
+        			if (futures == null)
+        				futures = new ArrayList<Future<Number>>();
+	        		DeleteExecutor call = new DeleteExecutor();
+	        		call.executor = ex;
+	        		call.query    = qs.next();
+	        		call.params   = params;
+	        		futures.add(threadPool.submit(call)); 
+        		}
         	}
-        	int N = 0;
-        	for (Future<Number> future:futures) {
-        		try {
-            		Number n = future.get();
-            		if (n != null) 
-            			N += n.intValue();
-				} catch (InterruptedException e) {
-					throw new RuntimeException(e);
-				} catch (ExecutionException e) {
-					throw new StoreException(e.getCause());
-				}
+        	if (!serialMode) {
+	        	for (Future<Number> future:futures) {
+	        		try {
+	            		Number n = future.get();
+	            		if (n != null) 
+	            			result += n.intValue();
+					} catch (InterruptedException e) {
+						throw new RuntimeException(e);
+					} catch (ExecutionException e) {
+						throw new StoreException(e.getCause());
+					}
+	        	}
         	}
-        	return new Integer(N);
+        	return result;
         }
         
         public Number executeUpdate(StoreQuery q, Object[] params) {
         	Iterator<StoreQuery> qs = owner._queries.iterator();
-        	final List<Future<Number>> futures = new ArrayList<Future<Number>>();
+        	List<Future<Number>> futures = null;
+        	int result = 0;
         	for (Executor ex:executors) {
+        		if (serialMode) {
+        			Number n = ex.executeUpdate(qs.next(), params);
+        			result += (n == null) ? 0 : n.intValue();
+        		} else {
+        			if (futures == null)
+        				futures = new ArrayList<Future<Number>>();
         		UpdateExecutor call = new UpdateExecutor();
         		call.executor = ex;
         		call.query    = qs.next();
         		call.params   = params;
         		futures.add(threadPool.submit(call)); 
+        		}
         	}
-        	int N = 0;
-        	for (Future<Number> future:futures) {
-        		try {
-            		Number n = future.get();
-            		if (n != null) 
-            			N += n.intValue();
-				} catch (InterruptedException e) {
-					throw new RuntimeException(e);
-				} catch (ExecutionException e) {
-					throw new StoreException(e.getCause());
-				}
+        	if (serialMode) {
+	        	for (Future<Number> future:futures) {
+	        		try {
+	            		Number n = future.get();
+	        			result += (n == null) ? 0 : n.intValue();
+					} catch (InterruptedException e) {
+						throw new RuntimeException(e);
+					} catch (ExecutionException e) {
+						throw new StoreException(e.getCause());
+					}
+	        	}
         	}
-        	return new Integer(N);
+        	return result;
         }
         
         List<SliceStoreManager> findTargets() {
         	FetchConfiguration fetch = owner.getContext().getFetchConfiguration();
         	return owner.getDistributedStore().getTargets(fetch);
         }
-
 	}
 	
 	static  class QueryExecutor implements Callable<ResultObjectProvider> {

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=726770&r1=726769&r2=726770&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java Mon
Dec 15 11:02:19 2008
@@ -214,7 +214,11 @@
         assertEquals("India", india.getName());
     }
     
-    public void testUpdateReplicatedObjects() {
+    /**
+     * Disable this test temporarily as we undergo changes in internal slice 
+     * information structure.
+     */
+    public void xtestUpdateReplicatedObjects() {
         EntityManager em = emf.createEntityManager();
         em.getTransaction().begin();
         String[] names = {"USA", "India", "China"};
@@ -226,6 +230,7 @@
         	em.persist(country);
         }
         em.getTransaction().commit();
+        em.clear();
         
         assertEquals(names.length, count(Country.class));
         Country india = em.find(Country.class, "India");

Modified: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQuery.java?rev=726770&r1=726769&r2=726770&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQuery.java (original)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQuery.java Mon
Dec 15 11:02:19 2008
@@ -31,6 +31,7 @@
  *
  */
 public class TestQuery extends SliceTestCase {
+
     private int POBJECT_COUNT = 25;
     private int VALUE_MIN = 100;
     private int VALUE_MAX = VALUE_MIN + POBJECT_COUNT - 1;

Added: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java?rev=726770&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
(added)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
Mon Dec 15 11:02:19 2008
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+/**
+ * Tests when multiple user threads enter the same EntityManager and executes 
+ * query. 
+ * 
+ * @author Pinaki Poddar
+ * 
+ */
+public class TestQueryMultiThreaded extends SliceTestCase {
+
+	private int POBJECT_COUNT = 25;
+	private int VALUE_MIN = 100;
+	private int VALUE_MAX = VALUE_MIN + POBJECT_COUNT - 1;
+	private static int THREADS = 3;
+	private ExecutorService group; 
+	private Future[] futures;
+
+	protected String getPersistenceUnitName() {
+		return "ordering";
+	}
+
+	public void setUp() throws Exception {
+		super.setUp(PObject.class, Person.class, Address.class, Country.class,
+				CLEAR_TABLES, "openjpa.Multithreaded", "true");
+		int count = count(PObject.class);
+		if (count == 0) {
+			create(POBJECT_COUNT);
+		}
+		group = Executors.newCachedThreadPool();
+		futures = new Future[THREADS];
+	}
+	
+	public void tearDown()  throws Exception {
+		group.shutdown();
+		super.tearDown();
+	}
+
+	void create(int N) {
+		EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		for (int i = 0; i < POBJECT_COUNT; i++) {
+			PObject pc = new PObject();
+			pc.setValue(VALUE_MIN + i);
+			em.persist(pc);
+			String slice = SlicePersistence.getSlice(pc);
+			String expected = (pc.getValue() % 2 == 0) ? "Even" : "Odd";
+			assertEquals(expected, slice);
+		}
+		Person p1 = new Person();
+		Person p2 = new Person();
+		Address a1 = new Address();
+		Address a2 = new Address();
+		p1.setName("Even");
+		p2.setName("Odd");
+		a1.setCity("San Francisco");
+		a2.setCity("Rome");
+		p1.setAddress(a1);
+		p2.setAddress(a2);
+		em.persist(p1);
+		em.persist(p2);
+		assertEquals("Even", SlicePersistence.getSlice(p1));
+		assertEquals("Odd", SlicePersistence.getSlice(p2));
+
+		em.getTransaction().commit();
+	}
+	
+	public void testQueryResultIsOrderedAcrossSlice() {
+		final EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		final Query query = em
+			.createQuery("SELECT p.value,p FROM PObject p ORDER BY p.value ASC");
+		for (int i = 0; i < THREADS; i++) {
+			futures[i] = group.submit(new Callable<Object>() {
+				public Object call() {
+					List result = query.getResultList();
+					Integer old = Integer.MIN_VALUE;
+					for (Object row : result) {
+						Object[] line = (Object[]) row;
+						int value = ((Integer) line[0]).intValue();
+						PObject pc = (PObject) line[1];
+						assertTrue(value >= old);
+						old = value;
+						assertEquals(value, pc.getValue());
+					}
+					return null;
+				}
+			});
+		}
+		
+		waitForTermination();
+		em.getTransaction().rollback();
+	}
+
+	public void testAggregateQuery() {
+		final EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		final Query countQ = em.createQuery("SELECT COUNT(p) FROM PObject p");
+		final Query maxQ = em.createQuery("SELECT MAX(p.value) FROM PObject p");
+		final Query minQ = em.createQuery("SELECT MIN(p.value) FROM PObject p");
+		final Query sumQ = em.createQuery("SELECT SUM(p.value) FROM PObject p");
+		final Query minmaxQ = em.createQuery("SELECT MIN(p.value),MAX(p.value) FROM PObject p");
+		for (int i = 0; i < THREADS; i++) {
+			futures[i] = group.submit(new Callable<Object>() {
+				public Object call() {
+					Object count = countQ.getSingleResult();
+					Object max = maxQ.getSingleResult();
+					Object min = minQ.getSingleResult();
+					Object sum = sumQ.getSingleResult();
+					Object minmax = minmaxQ.getSingleResult();
+					
+					Object min1 = ((Object[]) minmax)[0];
+					Object max1 = ((Object[]) minmax)[1];
+
+
+					assertEquals(POBJECT_COUNT, ((Number) count).intValue());
+					assertEquals(VALUE_MAX, ((Number) max).intValue());
+					assertEquals(VALUE_MIN, ((Number) min).intValue());
+					assertEquals((VALUE_MIN + VALUE_MAX) * POBJECT_COUNT,
+							2 * ((Number) sum).intValue());
+					assertEquals(min, min1);
+					assertEquals(max, max1);
+					return null;
+				}
+			});
+		}
+		waitForTermination();
+		em.getTransaction().rollback();
+	}
+
+	public void testAggregateQueryWithMissingValueFromSlice() {
+		final EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		final Query maxQ = em.createQuery("SELECT MAX(p.value) FROM PObject p WHERE MOD(p.value,2)=0");
+		for (int i = 0; i < THREADS; i++) {
+			futures[i] = group.submit(new Callable<Object>() {
+				public Object call() {
+					Object max = maxQ.getSingleResult();
+					assertEquals(VALUE_MAX, ((Number) max).intValue());
+					return null;
+				}
+			});
+		}
+		waitForTermination();
+		em.getTransaction().rollback();
+	}
+
+	public void testSetMaxResult() {
+		final EntityManager em = emf.createEntityManager();
+		final int limit = 3;
+		em.getTransaction().begin();
+		final Query q = em.createQuery("SELECT p.value,p FROM PObject p ORDER BY p.value ASC");
+		for (int i = 0; i < THREADS; i++) {
+			futures[i] = group.submit(new Callable<Object>() {
+				public Object call() {
+					List result = q.setMaxResults(limit).getResultList();
+					int i = 0;
+					for (Object row : result) {
+						Object[] line = (Object[]) row;
+						int value = ((Integer) line[0]).intValue();
+						PObject pc = (PObject) line[1];
+					}
+					assertEquals(limit, result.size());
+					return null;
+				}
+
+			});
+		}
+		waitForTermination();
+		em.getTransaction().rollback();
+	}
+
+	public void testHint() {
+		final List<String> targets = new ArrayList<String>();
+		targets.add("Even");
+		final EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		final Query query = em.createQuery("SELECT p FROM PObject p");
+		for (int i = 0; i < THREADS; i++) {
+			futures[i] = group.submit(new Callable<Object>() {
+
+				public Object call() {
+					query.setHint(ProductDerivation.HINT_TARGET, "Even");
+					List result = query.getResultList();
+					for (Object pc : result) {
+						String slice = SlicePersistence.getSlice(pc);
+						assertTrue(targets.contains(slice));
+					}
+					return null;
+				}
+
+			});
+		}
+		waitForTermination();
+		em.getTransaction().rollback();
+	}
+
+	public void testInMemoryOrderBy() {
+		final EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		final Query query = em.createQuery("SELECT p FROM PObject p ORDER BY p.value");
+		for (int i = 0; i < THREADS; i++) {
+			futures[i] = group.submit(new Callable<Object>() {
+				public Object call() {
+					List result = query.getResultList();
+					return null;
+				}
+			});
+		}
+		waitForTermination();
+		em.getTransaction().rollback();
+	}
+
+	public void testQueryParameter() {
+		final EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		final Query query = em.createQuery("SELECT p FROM PObject p WHERE p.value > :v");
+		for (int i = 0; i < THREADS; i++) {
+			futures[i] = group.submit(new Callable<Object>() {
+				public Object call() {
+					query.setParameter("v", 200);
+					List result = query.getResultList();
+					return null;
+				}
+
+			});
+		}
+		waitForTermination();
+		em.getTransaction().rollback();
+	}
+
+	public void testQueryParameterEntity() {
+		final EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		final Query addressQ = em.createQuery("select a from Address a where a.city = :city");
+								 
+		final Query personQ = em.createQuery("SELECT p FROM Person p WHERE p.address = :a");
+		for (int i = 0; i < THREADS; i++) {
+			futures[i] = group.submit(new Callable<Object>() {
+				public Object call() {
+					Address a = (Address) addressQ.setParameter("city", "Rome")
+						.getSingleResult();
+					assertNotNull(a);
+					assertEquals("Odd", SlicePersistence.getSlice(a));
+					List<Person> result = personQ.setParameter("a", a).getResultList();
+					assertEquals(1, result.size());
+					Person p = result.get(0);
+					assertEquals("Odd", SlicePersistence.getSlice(p));
+					assertEquals("Rome", p.getAddress().getCity());
+					return null;
+				}
+
+			});
+		}
+		waitForTermination();
+		em.getTransaction().rollback();
+	}
+
+	void waitForTermination() {
+		try {
+			for (Future f : futures)
+				try {
+					f.get();
+				} catch (ExecutionException e) {
+					Throwable t = e.getCause();
+					t.getCause().printStackTrace();
+					fail("Failed " + t.getCause());
+				}
+		} catch (InterruptedException e) {
+
+		}
+	}
+
+}



Mime
View raw message