openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r1135806 - in /openjpa/trunk: openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/ openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/
Date Tue, 14 Jun 2011 21:33:29 GMT
Author: ppoddar
Date: Tue Jun 14 21:33:28 2011
New Revision: 1135806

URL: http://svn.apache.org/viewvc?rev=1135806&view=rev
Log:
OPENJPA-2008: Draft support for native SQL in Slice

Added:
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java
Modified:
    openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java
    openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java

Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java?rev=1135806&r1=1135805&r2=1135806&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java
(original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/SQLStoreQuery.java
Tue Jun 14 21:33:28 2011
@@ -98,7 +98,7 @@ public class SQLStoreQuery
     /**
      * Executes the filter as a SQL query.
      */
-    protected static class SQLExecutor
+    public static class SQLExecutor
         extends AbstractExecutor {
 
         private final ClassMetaData _meta;

Modified: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java?rev=1135806&r1=1135805&r2=1135806&view=diff
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
(original)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCStoreManager.java
Tue Jun 14 21:33:28 2011
@@ -41,6 +41,7 @@ import org.apache.openjpa.jdbc.kernel.Co
 import org.apache.openjpa.jdbc.kernel.JDBCFetchConfigurationImpl;
 import org.apache.openjpa.jdbc.kernel.JDBCStore;
 import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
+import org.apache.openjpa.jdbc.kernel.SQLStoreQuery;
 import org.apache.openjpa.jdbc.sql.Result;
 import org.apache.openjpa.jdbc.sql.ResultSetResult;
 import org.apache.openjpa.kernel.FetchConfiguration;
@@ -437,7 +438,18 @@ class DistributedJDBCStoreManager extend
      * Construct a distributed query to be executed against all the slices.
      */
     public StoreQuery newQuery(String language) {
+    	if (QueryLanguages.LANG_SQL.equals(language)) {
+    		DistributedSQLStoreQuery ret = new DistributedSQLStoreQuery(this);
+            for (SliceStoreManager slice : _slices) {
+                ret.add(slice.newQuery(language));
+            }
+            return ret;
+    	}
         ExpressionParser parser = QueryLanguages.parserForLanguage(language);
+        if (parser == null) {
+    		throw new UnsupportedOperationException("Language [" + language + "] not supported");
+        } 
+
         DistributedStoreQuery ret = new DistributedStoreQuery(this, parser);
         for (SliceStoreManager slice : _slices) {
             ret.add(slice.newQuery(language));

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java?rev=1135806&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java
(added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedSQLStoreQuery.java
Tue Jun 14 21:33:28 2011
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.openjpa.jdbc.kernel.JDBCStore;
+import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.jdbc.kernel.SQLStoreQuery;
+import org.apache.openjpa.kernel.AbstractStoreQuery;
+import org.apache.openjpa.kernel.BrokerImpl;
+import org.apache.openjpa.kernel.ExpressionStoreQuery;
+import org.apache.openjpa.kernel.FetchConfiguration;
+import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
+import org.apache.openjpa.kernel.QueryContext;
+import org.apache.openjpa.kernel.QueryImpl;
+import org.apache.openjpa.kernel.StoreManager;
+import org.apache.openjpa.kernel.StoreQuery;
+import org.apache.openjpa.kernel.exps.ExpressionParser;
+import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
+import org.apache.openjpa.lib.rop.RangeResultObjectProvider;
+import org.apache.openjpa.lib.rop.ResultObjectProvider;
+import org.apache.openjpa.meta.ClassMetaData;
+import org.apache.openjpa.slice.DistributedConfiguration;
+import org.apache.openjpa.slice.SliceThread;
+import org.apache.openjpa.util.StoreException;
+
+/**
+ * A query for distributed databases.
+ * 
+ * @author Pinaki Poddar
+ * 
+ */
+@SuppressWarnings("serial")
+class DistributedSQLStoreQuery extends SQLStoreQuery {
+	private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
+
+	public DistributedSQLStoreQuery(JDBCStore store) {
+		super(store);
+	}
+
+	void add(StoreQuery q) {
+		_queries.add(q);
+	}
+
+	public DistributedJDBCStoreManager getDistributedStore() {
+		return (DistributedJDBCStoreManager) getStore();
+	}
+
+	public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
+		boolean parallel = !getContext().getStoreContext().getBroker()
+			.getMultithreaded();
+        ParallelExecutor ex = new ParallelExecutor(this, meta, parallel);
+		for (StoreQuery q : _queries) {
+			ex.addExecutor(q.newDataStoreExecutor(meta, subs));
+		}
+		return ex;
+	}
+
+	public void setContext(QueryContext ctx) {
+		super.setContext(ctx);
+		for (StoreQuery q : _queries)
+			q.setContext(ctx);
+	}
+
+	/**
+	 * Executes queries on multiple databases.
+	 * 
+	 * @author Pinaki Poddar
+	 * 
+	 */
+	public static class ParallelExecutor extends
+			SQLStoreQuery.SQLExecutor {
+		private List<Executor> executors = new ArrayList<Executor>();
+		private DistributedSQLStoreQuery owner = null;
+
+        public ParallelExecutor(DistributedSQLStoreQuery dsq, ClassMetaData meta, boolean
p) {
+			super(dsq, meta);
+			owner = dsq;
+		}
+
+		public void addExecutor(Executor ex) {
+			executors.add(ex);
+		}
+
+		/**
+         * Each child query must be executed with slice context and not the
+		 * given query context.
+		 */
+		public ResultObjectProvider executeQuery(StoreQuery q,
+				final Object[] params, final Range range) {
+			List<Future<ResultObjectProvider>> futures = 
+				new ArrayList<Future<ResultObjectProvider>>();
+            final List<Executor> usedExecutors = new ArrayList<Executor>();
+			final List<ResultObjectProvider> rops = 
+				new ArrayList<ResultObjectProvider>();
+			List<SliceStoreManager> targets = findTargets();
+			QueryContext ctx = q.getContext();
+			boolean isReplicated = containsReplicated(ctx);
+            ExecutorService threadPool = SliceThread.getPool();
+			for (int i = 0; i < owner._queries.size(); i++) {
+                // if replicated, then execute only on single slice
+				if (isReplicated && !usedExecutors.isEmpty()) {
+					break;
+				}
+                StoreManager sm = owner.getDistributedStore().getSlice(i);
+				if (!targets.contains(sm))
+					continue;
+				StoreQuery query = owner._queries.get(i);
+				Executor executor = executors.get(i);
+				if (!targets.contains(sm))
+					continue;
+				usedExecutors.add(executor);
+                QueryExecutor call = new QueryExecutor();
+                call.executor = executor;
+                call.query = query;
+                call.params = params;
+                call.range = range;
+                futures.add(threadPool.submit(call));
+			}
+			for (Future<ResultObjectProvider> future : futures) {
+				try {
+					rops.add(future.get());
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
+				}
+			}
+			
+			ResultObjectProvider[] tmp = rops
+                    .toArray(new ResultObjectProvider[rops.size()]);
+			ResultObjectProvider result = null;
+			boolean[] ascending = getAscending(q);
+			boolean isAscending = ascending.length > 0;
+			boolean isAggregate = ctx.isAggregate();
+			boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
+			if (isAggregate) {
+				result = new UniqueResultObjectProvider(tmp, q,
+						getQueryExpressions());
+			} else if (isAscending) {
+                result = new OrderingMergedResultObjectProvider(tmp, ascending,
+                    usedExecutors.toArray(new Executor[usedExecutors.size()]),
+					q, params);
+			} else {
+				result = new MergedResultObjectProvider(tmp);
+			}
+			if (hasRange) {
+                result = new RangeResultObjectProvider(result,
+                        ctx.getStartRange(), ctx.getEndRange());
+			}
+			return result;
+		}
+
+		/**
+         * Scans metadata to find out if a replicated class is the candidate.
+		 */
+		boolean containsReplicated(QueryContext query) {
+			Class<?> candidate = query.getCandidateType();
+			DistributedConfiguration conf = (DistributedConfiguration)query.getStoreContext()
+			    .getConfiguration();
+			if (candidate != null) {
+			    return conf.isReplicated(candidate);
+			}
+			ClassMetaData[] metas = query.getAccessPathMetaDatas();
+			if (metas == null || metas.length < 1)
+				return false;
+			for (ClassMetaData meta : metas)
+				if (conf.isReplicated(meta.getDescribedType()))
+					return true;
+			return false;
+		}
+
+		public Number executeDelete(StoreQuery q, Object[] params) {
+			Iterator<StoreQuery> qs = owner._queries.iterator();
+			List<Future<Number>> futures = null;
+			int result = 0;
+            ExecutorService threadPool = SliceThread.getPool();
+			for (Executor ex : executors) {
+				if (futures == null)
+                    futures = new ArrayList<Future<Number>>();
+				DeleteExecutor call = new DeleteExecutor();
+				call.executor = ex;
+				call.query = qs.next();
+				call.params = params;
+				futures.add(threadPool.submit(call));
+			}
+			for (Future<Number> future : futures) {
+				try {
+					Number n = future.get();
+					if (n != null)
+						result += n.intValue();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
+				}
+			}
+			return result;
+		}
+
+		public Number executeUpdate(StoreQuery q, Object[] params) {
+			Iterator<StoreQuery> qs = owner._queries.iterator();
+			List<Future<Number>> futures = null;
+			int result = 0;
+            ExecutorService threadPool = SliceThread.getPool();
+			for (Executor ex : executors) {
+				if (futures == null)
+                    futures = new ArrayList<Future<Number>>();
+				UpdateExecutor call = new UpdateExecutor();
+				call.executor = ex;
+				call.query = qs.next();
+				call.params = params;
+				futures.add(threadPool.submit(call));
+			}
+			for (Future<Number> future : futures) {
+				try {
+					Number n = future.get();
+                    result += (n == null) ? 0 : n.intValue();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
+				}
+			}
+			return result;
+		}
+
+		List<SliceStoreManager> findTargets() {
+			FetchConfiguration fetch = owner.getContext()
+					.getFetchConfiguration();
+			return owner.getDistributedStore().getTargets(fetch);
+		}
+		
+	}
+
+	static class QueryExecutor implements Callable<ResultObjectProvider> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+		Range range;
+
+		public ResultObjectProvider call() throws Exception {
+			return executor.executeQuery(query, params, range);
+		}
+	}
+
+	static class DeleteExecutor implements Callable<Number> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+
+		public Number call() throws Exception {
+			return executor.executeDelete(query, params);
+		}
+	}
+
+	static class UpdateExecutor implements Callable<Number> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+
+		public Number call() throws Exception {
+		    return executor.executeUpdate(query, params);
+		}
+	}
+}



Mime
View raw message