openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r1464082 - in /openjpa/trunk: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/ openjpa-kernel/src/main/java/org/apache/openjpa/kernel/ openjpa-persistence/src/main/java/org/apache/openjpa/persistence/ openjpa-slice/src/main/java/o...
Date Wed, 03 Apr 2013 15:51:54 GMT
Author: ppoddar
Date: Wed Apr  3 15:51:52 2013
New Revision: 1464082

URL: http://svn.apache.org/r1464082
Log:
OPENJPA-2365: Support for delete-by-query. Correction for hint processing

Added:
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
  (with props)
    openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
  (with props)
Modified:
    openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
    openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
    openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
    openjpa/trunk/scripts/test.bat

Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
(original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
Wed Apr  3 15:51:52 2013
@@ -584,7 +584,12 @@ public class JDBCStoreQuery 
                 }
             }
         } finally {
-            try { conn.close(); } catch (SQLException se) {}
+            try { 
+            	if (conn.getAutoCommit())
+            		conn.close(); 
+            } catch (SQLException se) {
+            	
+            }
         }
 
         localContext.remove();

Modified: openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java (original)
+++ openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java Wed
Apr  3 15:51:52 2013
@@ -1829,6 +1829,25 @@ public class BrokerImpl implements Broke
             endOperation();
         }
     }
+    
+    /**
+     * Sets the given flag to the status.
+     * 
+     * @since 2.3.0
+     */
+    protected void setStatusFlag(int flag) {
+    	_flags |= flag;
+    }
+    
+    /**
+     * Clears the given flag from the status.
+     * 
+     * @since 2.3.0
+     */
+    protected void clearStatusFlag(int flag) {
+    	_flags &= ~flag;
+    }
+
 
     public void flush() {
         beginOperation(true);

Modified: openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
(original)
+++ openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
Wed Apr  3 15:51:52 2013
@@ -347,6 +347,11 @@ public class QueryImpl<X> extends Abstra
 
 	private boolean pushQueryFetchPlan() {
 		boolean fcPushed = false;
+		if (_hintHandler != null) {
+			FetchConfiguration fc = _fetch == null ? null : ((FetchPlanImpl)_fetch).getDelegate();
+			_em.pushFetchPlan(fc);
+			return true;
+		}
 		if (_fetch != null && _hintHandler != null) {
 			switch (_fetch.getReadLockMode()) {
 			case PESSIMISTIC_READ:
@@ -528,6 +533,7 @@ public class QueryImpl<X> extends Abstra
      * cache. 
      */
     private boolean preExecute(Map params) {
+    	
         PreparedQueryCache cache = _em.getPreparedQueryCache();
         if (cache == null) {
             return false;

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
Wed Apr  3 15:51:52 2013
@@ -152,6 +152,12 @@ public class DistributedBrokerImpl exten
     @Override
     public void beginStore() {
     }
+    
+    @Override
+    protected void flush(int reason) {
+    	setStatusFlag(2 << 8);
+    	super.flush(reason);
+    }
 
     /**
      * Overrides to target specific slices for find() calls.

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
Wed Apr  3 15:51:52 2013
@@ -293,6 +293,20 @@ class DistributedJDBCStoreManager extend
         }
     }
     
+    @Override
+    public void commit() {
+    	for (SliceStoreManager slice : _slices) {
+    		slice.commit();
+    	}
+    }
+    
+    @Override
+    public void rollback() {
+    	for (SliceStoreManager slice : _slices) {
+    		slice.rollback();
+    	}
+    }
+    
     /**
      * Collect the current versions of the given StateManagers.
      */

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
Wed Apr  3 15:51:52 2013
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.openjpa.jdbc.kernel.JDBCStore;
 import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.kernel.Broker;
 import org.apache.openjpa.kernel.BrokerImpl;
 import org.apache.openjpa.kernel.ExpressionStoreQuery;
 import org.apache.openjpa.kernel.FetchConfiguration;
@@ -118,15 +119,14 @@ class DistributedStoreQuery extends JDBC
 		 */
 		public ResultObjectProvider executeQuery(StoreQuery q,
 				final Object[] params, final Range range) {
-			List<Future<ResultObjectProvider>> futures = 
-				new ArrayList<Future<ResultObjectProvider>>();
+			List<Future<ResultObjectProvider>> futures = new ArrayList<Future<ResultObjectProvider>>();
             final List<Executor> usedExecutors = new ArrayList<Executor>();
-			final List<ResultObjectProvider> rops = 
-				new ArrayList<ResultObjectProvider>();
+			final List<ResultObjectProvider> rops = new ArrayList<ResultObjectProvider>();
 			List<SliceStoreManager> targets = findTargets();
 			QueryContext ctx = q.getContext();
 			boolean isReplicated = containsReplicated(ctx);
             ExecutorService threadPool = SliceThread.getPool();
+           
 			for (int i = 0; i < owner._queries.size(); i++) {
                 // if replicated, then execute only on single slice
 				if (isReplicated && !usedExecutors.isEmpty()) {
@@ -135,16 +135,12 @@ class DistributedStoreQuery extends JDBC
                 StoreManager sm = owner.getDistributedStore().getSlice(i);
 				if (!targets.contains(sm))
 					continue;
-				StoreQuery query = owner._queries.get(i);
-				Executor executor = executors.get(i);
-				if (!targets.contains(sm))
-					continue;
-				usedExecutors.add(executor);
                 QueryExecutor call = new QueryExecutor();
-                call.executor = executor;
-                call.query = query;
+                call.executor = executors.get(i);
+                call.query = owner._queries.get(i);
                 call.params = params;
                 call.range = range;
+				usedExecutors.add(call.executor);
                 futures.add(threadPool.submit(call));
 			}
 			for (Future<ResultObjectProvider> future : futures) {
@@ -157,16 +153,14 @@ class DistributedStoreQuery extends JDBC
 				}
 			}
 			
-			ResultObjectProvider[] tmp = rops
-                    .toArray(new ResultObjectProvider[rops.size()]);
+			ResultObjectProvider[] tmp = rops.toArray(new ResultObjectProvider[rops.size()]);
 			ResultObjectProvider result = null;
 			boolean[] ascending = getAscending(q);
 			boolean isAscending = ascending.length > 0;
 			boolean isAggregate = ctx.isAggregate();
 			boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
 			if (isAggregate) {
-				result = new UniqueResultObjectProvider(tmp, q,
-						getQueryExpressions());
+				result = new UniqueResultObjectProvider(tmp, q,	getQueryExpressions());
 			} else if (isAscending) {
                 result = new OrderingMergedResultObjectProvider(tmp, ascending,
                     usedExecutors.toArray(new Executor[usedExecutors.size()]),
@@ -175,8 +169,7 @@ class DistributedStoreQuery extends JDBC
 				result = new MergedResultObjectProvider(tmp);
 			}
 			if (hasRange) {
-                result = new RangeResultObjectProvider(result,
-                        ctx.getStartRange(), ctx.getEndRange());
+                result = new RangeResultObjectProvider(result, ctx.getStartRange(), ctx.getEndRange());
 			}
 			return result;
 		}
@@ -201,16 +194,18 @@ class DistributedStoreQuery extends JDBC
 		}
 
 		public Number executeDelete(StoreQuery q, Object[] params) {
-			Iterator<StoreQuery> qs = owner._queries.iterator();
-			List<Future<Number>> futures = null;
+			List<Future<Number>> futures = new ArrayList<Future<Number>>();
 			int result = 0;
             ExecutorService threadPool = SliceThread.getPool();
-			for (Executor ex : executors) {
-				if (futures == null)
-                    futures = new ArrayList<Future<Number>>();
+			List<SliceStoreManager> targets = findTargets();
+			for (int i = 0; i < owner._queries.size(); i++) {
+                StoreManager sm = owner.getDistributedStore().getSlice(i);
+				if (!targets.contains(sm))
+					continue;
+				
 				DeleteExecutor call = new DeleteExecutor();
-				call.executor = ex;
-				call.query = qs.next();
+				call.executor = executors.get(i);
+				call.query = owner._queries.get(i);
 				call.params = params;
 				futures.add(threadPool.submit(call));
 			}
@@ -256,8 +251,7 @@ class DistributedStoreQuery extends JDBC
 		}
 
 		List<SliceStoreManager> findTargets() {
-			FetchConfiguration fetch = owner.getContext()
-					.getFetchConfiguration();
+  		    FetchConfiguration fetch = owner.getContext().getFetchConfiguration();
 			return owner.getDistributedStore().getTargets(fetch);
 		}
 		

Added: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java?rev=1464082&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
(added)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
Wed Apr  3 15:51:52 2013
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+
+
+import org.apache.openjpa.kernel.BrokerFactory;
+import org.apache.openjpa.persistence.JPAFacadeHelper;
+import org.apache.openjpa.slice.DistributedBrokerFactory;
+import org.apache.openjpa.slice.SlicePersistence;
+import org.apache.openjpa.slice.policy.UniformDistributionPolicy;
+
+/**
+ * Tests delete-by-query.
+ * 
+ * @author Pinaki Poddar
+ *
+ */
+public class TestBulkDelete extends SliceTestCase {
+	private static int SLICES = 3;
+	private static List<String> SLICE_NAMES;
+	
+	@Override
+	protected String getPersistenceUnitName() {
+		return "slice";
+	}
+	public void setUp() throws Exception {
+		super.setUp(PObject.class, CLEAR_TABLES,
+				"openjpa.slice.DistributionPolicy", UniformDistributionPolicy.class.getName());
+		
+	}
+
+	public void tearDown() throws Exception {
+		System.err.println("Delete all instances from all slices");
+		EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		String delete = "delete from PObject p";
+		int m = em.createQuery(delete).executeUpdate();
+		em.getTransaction().commit();
+		super.tearDown();
+	}
+	
+	/**
+	 * Creates N instances that are distributed in 3 slices.
+	 * Deletes all instances from only one slice.
+	 */
+	public void testBulkDelete() {
+		EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration();
+		SLICE_NAMES = conf.getActiveSliceNames();
+		SLICES = SLICE_NAMES.size();
+		assertTrue(SLICES > 1);
+		int M = 4; // no of instances in each slice
+		int N = SLICES*M; // total number of instances in all 3 slices
+		
+		for (int i = 0; i < N; i++) {
+			PObject pc = new PObject();
+			em.persist(pc);
+		}
+		em.getTransaction().commit();
+		String jpql = "select count(p) from PObject p";
+		long total = em.createQuery(jpql, Long.class).getSingleResult();
+		assertEquals(N, total);
+		
+		for (int i = 0; i < SLICES; i++) {
+			System.err.println("Query only on slice [" + SLICE_NAMES.get(i) + "]");
+			long count = em.createQuery(jpql,Long.class)
+			               .setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(i))
+			               .getSingleResult();
+			assertEquals(M, count);
+		}
+		
+		em.getTransaction().begin();
+		System.err.println("Delete only from slice [" + SLICE_NAMES.get(0) + "]");
+		String delete = "delete from PObject p";
+		int m = em.createQuery(delete)
+		  .setHint(SlicePersistence.HINT_TARGET, SLICE_NAMES.get(0))
+		  .executeUpdate();
+		assertEquals(M, m);
+		em.getTransaction().commit();
+	}
+}

Propchange: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBulkDelete.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java?rev=1464082&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
(added)
+++ openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
Wed Apr  3 15:51:52 2013
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.policy;
+
+import java.util.List;
+
+import org.apache.openjpa.slice.DistributionPolicy;
+import org.apache.openjpa.slice.PObject;
+
+
+/**
+ * Distributes the instances uniformly among the available slices
+ * based on the integral value of the persistence identifier.
+ * <br>
+ * Given {@code M} slices and {@code N} instances whose identity
+ * value is uniformly distributed, this policy will persist these
+ * instances such that
+ * <LI>each slice will have N/M instances
+ * <LI>the identity of the instances in the {@code i}-th slice 
+ * will be divisible by {@code i}.
+ * 
+ * @author Pinaki Poddar
+ *
+ */
+public class UniformDistributionPolicy implements DistributionPolicy {
+
+	@Override
+	public String distribute(Object pc, List<String> slices, Object context) {
+		int N = slices.size();
+		for (int i = N; i > 0; i--) {
+			PObject p = (PObject)pc;
+			if (p.getId()%i == 0) return slices.get(i-1);
+		}
+		return slices.get(0);
+	}
+
+}

Propchange: openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/policy/UniformDistributionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: openjpa/trunk/scripts/test.bat
URL: http://svn.apache.org/viewvc/openjpa/trunk/scripts/test.bat?rev=1464082&r1=1464081&r2=1464082&view=diff
==============================================================================
--- openjpa/trunk/scripts/test.bat (original)
+++ openjpa/trunk/scripts/test.bat Wed Apr  3 15:51:52 2013
@@ -18,7 +18,6 @@
 @rem
 
 @setlocal
-pushd openjpa-persistence-jdbc
-mvn test -Dtest=%1 %2 %3 %4
+mvn test -DfailIfNoTests=false -Dbuild.enhance=false -Dtest=%1 %2 %3 %4
 popd
 @endlocal



Mime
View raw message