openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r619145 [2/3] - in /openjpa/trunk: ./ openjpa-project/src/doc/manual/ openjpa-slice/ openjpa-slice/src/ openjpa-slice/src/main/ openjpa-slice/src/main/java/ openjpa-slice/src/main/java/org/ openjpa-slice/src/main/java/org/apache/ openjpa-sl...
Date Wed, 06 Feb 2008 20:26:17 GMT
Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=619145&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java (added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java Wed Feb  6 12:26:14 2008
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+import javax.sql.DataSource;
+import javax.sql.XADataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.openjpa.conf.OpenJPAConfiguration;
+import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
+import org.apache.openjpa.jdbc.conf.JDBCConfigurationImpl;
+import org.apache.openjpa.jdbc.schema.DataSourceFactory;
+import org.apache.openjpa.lib.conf.BooleanValue;
+import org.apache.openjpa.lib.conf.ConfigurationProvider;
+import org.apache.openjpa.lib.conf.PluginValue;
+import org.apache.openjpa.lib.conf.StringListValue;
+import org.apache.openjpa.lib.conf.StringValue;
+import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
+import org.apache.openjpa.lib.jdbc.DelegatingDataSource;
+import org.apache.openjpa.lib.log.Log;
+import org.apache.openjpa.lib.log.LogFactory;
+import org.apache.openjpa.lib.log.LogFactoryImpl;
+import org.apache.openjpa.lib.util.Localizer;
+import org.apache.openjpa.slice.DistributedBrokerImpl;
+import org.apache.openjpa.slice.DistributionPolicy;
+import org.apache.openjpa.slice.ExecutorServiceValue;
+import org.apache.openjpa.slice.Slice;
+import org.apache.openjpa.slice.SliceVersion;
+import org.apache.openjpa.util.UserException;
+
+/**
+ * Implements a distributed configuration of JDBCStoreManagers.
+ * 
+ * @author Pinaki Poddar
+ * 
+ */
+public class DistributedJDBCConfigurationImpl extends JDBCConfigurationImpl
+        implements DistributedJDBCConfiguration {
+
+    private final List<Slice> _slices = new ArrayList<Slice>();
+    private List<String> _activeSliceNames = new ArrayList<String>();
+    private Slice _master;
+    
+    private DecoratingDataSource virtualDataSource;
+    protected BooleanValue lenientPlugin;
+    protected StringValue masterPlugin;
+    protected StringListValue namesPlugin;
+    protected PluginValue txnMgrPlugin;
+    protected ExecutorServiceValue executorServicePlugin;
+    protected PluginValue distributionPolicyPlugin;
+
+    private static Localizer _loc =
+            Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
+    public static final String PREFIX_SLICE = "slice.";
+    public static final String PREFIX_OPENJPA = "openjpa.";
+
+    /**
+     * Configure itself as well as underlying slices.
+     * 
+     */
+    public DistributedJDBCConfigurationImpl(ConfigurationProvider cp) {
+        super(true, false);
+        Map p = cp.getProperties();
+        String pUnit = getPersistenceUnitName(p);
+        setDiagnosticContext(pUnit);
+        Log log = getConfigurationLog();
+        log.info(_loc.get("config-init", SliceVersion.VERSION));
+        
+        brokerPlugin.setString(DistributedBrokerImpl.class.getName());
+        
+        distributionPolicyPlugin = addPlugin("DistributionPolicy", true);
+        distributionPolicyPlugin.setDynamic(true);
+        
+        lenientPlugin = addBoolean("Lenient");
+        
+        masterPlugin = addString("Master");
+        
+        namesPlugin = addStringList("Names");
+        
+        txnMgrPlugin = addPlugin("TransactionPolicy", true);
+        txnMgrPlugin.setAlias("default", 
+                "org.apache.openjpa.slice.transaction.NaiveTransactionManager");
+        txnMgrPlugin.setAlias("xa", 
+                "org.apache.openjpa.slice.transaction.DistributedTransactionManager");
+        txnMgrPlugin.setAlias("jndi", 
+                "org.apache.openjpa.slice.transaction.LookUpTransactionManager");
+        txnMgrPlugin.setDefault("default");
+        txnMgrPlugin.setString("default");
+        
+        
+        executorServicePlugin = new ExecutorServiceValue();
+        addValue(executorServicePlugin);
+        
+        setSlices(p);
+    }
+    
+    private String getPersistenceUnitName(Map p) {
+        Object unit = p.get(PREFIX_OPENJPA+id.getProperty());
+        return (unit == null) ? "?" : unit.toString();
+    }
+    
+    private void setDiagnosticContext(String unit) {
+        LogFactory logFactory = getLogFactory();
+        if (logFactory instanceof LogFactoryImpl) {
+            ((LogFactoryImpl)logFactory).setDiagnosticContext(unit);
+        }
+    }
+
+    /**
+     * Gets the name of the active slices.
+     */
+    public List<String> getActiveSliceNames() {
+        if (_activeSliceNames.isEmpty()) {
+            for (Slice slice:_slices)
+                if (slice.isActive())
+                    _activeSliceNames.add(slice.getName());
+        }
+        return _activeSliceNames;
+    }
+    
+    /**
+     * Gets the name of the available slices.
+     */
+    public List<String> getAvailableSliceNames() {
+        List<String> result = new ArrayList<String>();
+        for (Slice slice:_slices)
+            result.add(slice.getName());
+        return result;
+    }
+    
+    /**
+     * Gets the slices of given status. Null returns all irrespective of status.
+     */
+    public List<Slice> getSlices(Slice.Status...statuses) {
+        if (statuses == null)
+            return Collections.unmodifiableList(_slices);
+        List<Slice> result = new ArrayList<Slice>();
+        for (Slice slice:_slices) {
+            for (Slice.Status status:statuses)
+                if (slice.getStatus().equals(status))
+                    result.add(slice);
+        }
+        return result;
+    }
+    
+    /**
+     * Gets the master slice. 
+     */
+    public Slice getMaster() {
+        return _master;
+    }
+
+    /**
+     * Get the configuration for given slice.
+     */
+    public Slice getSlice(String name) {
+        for (Slice slice:_slices)
+            if (slice.getName().equals(name))
+                return slice;
+        throw new UserException(_loc.get("slice-not-found", name,
+                    getActiveSliceNames()));
+    }
+
+    public DistributionPolicy getDistributionPolicyInstance() {
+        if (distributionPolicyPlugin.get() == null) {
+            distributionPolicyPlugin.instantiate(DistributionPolicy.class,
+                    this, true);
+        }
+        return (DistributionPolicy) distributionPolicyPlugin.get();
+    }
+
+    public void setDistributionPolicyInstance(String val) {
+        distributionPolicyPlugin.set(val);
+    }
+
+    public Object getConnectionFactory() {
+        if (virtualDataSource == null) {
+            DistributedDataSource ds = createDistributedDataStore();
+            virtualDataSource =
+                    DataSourceFactory.installDBDictionary(
+                            getDBDictionaryInstance(), ds, this, false);
+        }
+        return virtualDataSource;
+    }
+
+    /**
+     * Create a virtual DistributedDataSource as a composite of individual
+     * slices as per configuration, optionally ignoring slices that can not be
+     * connected.
+     */
+    private DistributedDataSource createDistributedDataStore() {
+        List<DataSource> dataSources = new ArrayList<DataSource>();
+        boolean isLenient = lenientPlugin.get();
+        boolean isXA = true;
+        for (Slice slice : _slices) {
+            JDBCConfiguration conf = (JDBCConfiguration)slice.getConfiguration();
+            Log log = conf.getConfigurationLog();
+            String url = getConnectionInfo(conf);
+            if (log.isInfoEnabled())
+                log.info(_loc.get("slice-connect", slice, url));
+            try {
+                DataSource ds = DataSourceFactory.newDataSource(conf, false);
+                DecoratingDataSource dds = new DecoratingDataSource(ds);
+                ds = DataSourceFactory.installDBDictionary(
+                        conf.getDBDictionaryInstance(), dds, conf, false);
+                if (verifyDataSource(slice, ds)) {
+                    dataSources.add(ds);
+                    isXA &= isXACompliant(ds);
+                }
+            } catch (Throwable ex) {
+                handleBadConnection(isLenient, slice, ex);
+            }
+        }
+        DistributedDataSource result = new DistributedDataSource(dataSources);
+        return result;
+    }
+
+    String getConnectionInfo(OpenJPAConfiguration conf) {
+        String result = conf.getConnectionURL();
+        if (result == null) {
+            result = conf.getConnectionDriverName();
+            String props = conf.getConnectionProperties();
+            if (props != null)
+                result += "(" + props + ")";
+        }
+        return result;
+    }
+
+    boolean isXACompliant(DataSource ds) {
+        if (ds instanceof DelegatingDataSource)
+            return ((DelegatingDataSource) ds).getInnermostDelegate() 
+               instanceof XADataSource;
+        return ds instanceof XADataSource;
+    }
+
+    /**
+     * Verify that a connection can be established to the given slice. If
+     * connection can not be established then slice is set to INACTIVE state.
+     */
+    private boolean verifyDataSource(Slice slice, DataSource ds) {
+        Connection con = null;
+        try {
+            con = ds.getConnection();
+            slice.setStatus(Slice.Status.ACTIVE);
+            if (con == null) {
+                slice.setStatus(Slice.Status.INACTIVE);
+                return false;
+            }
+            return true;
+        } catch (SQLException ex) {
+            slice.setStatus(Slice.Status.INACTIVE);
+            return false;
+        } finally {
+            if (con != null)
+                try {
+                    con.close();
+                } catch (SQLException ex) {
+                    // ignore
+                }
+        }
+    }
+
+    /**
+     * Either throw a user exception or add the configuration to the given list,
+     * based on <code>isLenient</code>.
+     */
+    private void handleBadConnection(boolean isLenient, Slice slice,
+            Throwable ex) {
+        OpenJPAConfiguration conf = slice.getConfiguration();
+        String url = conf.getConnectionURL();
+        Log log = getLog(LOG_RUNTIME);
+        if (isLenient) {
+            if (ex != null) {
+                log.warn(_loc.get("slice-connect-known-warn", slice, url, ex
+                        .getCause()));
+            } else {
+                log.warn(_loc.get("slice-connect-warn", slice, url));
+            }
+        } else if (ex != null) {
+            throw new UserException(_loc.get("slice-connect-known-error",
+                    slice, url, ex), ex.getCause());
+        } else {
+            throw new UserException(_loc.get("slice-connect-error", slice, url));
+        }
+    }
+
+    /**
+     * Create individual slices with configurations from the given properties.
+     */
+    void setSlices(Map original) {
+        List<String> sliceNames = findSlices(original);
+        Log log = getConfigurationLog();
+        if (sliceNames.isEmpty()) {
+            throw new UserException(_loc.get("slice-none-configured"));
+        } 
+        for (String key : sliceNames) {
+            JDBCConfiguration child = new JDBCConfigurationImpl();
+            child.fromProperties(createSliceProperties(original, key));
+            child.setId(PREFIX_SLICE + key);
+            Slice slice = new Slice(key, child);
+            _slices.add(slice);
+            if (log.isTraceEnabled())
+                log.trace(_loc.get("slice-configuration", key, child
+                        .toProperties(false)));
+        }
+        setMaster();
+    }
+
+    /**
+     * Finds the slices. If <code>slice.Names</code> property is available
+     * then the slices are ordered in the way they are listed. Otherwise scans
+     * all available slices by looking for property of the form
+     * <code>slice.XYZ.abc</code> where <code>XYZ</code> is the slice
+     * identifier and <code>abc</code> is openjpa property name. The slices
+     * are then ordered alphabetically.
+     */
+    private List<String> findSlices(Map p) {
+        List<String> sliceNames = new ArrayList<String>();
+        
+        Log log = getConfigurationLog();
+        String key = PREFIX_SLICE+namesPlugin.getProperty();
+        boolean explicit = p.containsKey(key);
+        if (explicit) {
+            String[] values = p.get(key).toString().split("\\,");
+            for (String name:values)
+                if (!sliceNames.contains(name.trim()))
+                    sliceNames.add(name.trim());
+        } else {
+            if (log.isWarnEnabled())
+                log.warn(_loc.get("no-slice-names"));
+            sliceNames = scanForSliceNames(p);
+            Collections.sort(sliceNames);
+        }
+        if (log.isInfoEnabled()) {
+            log.info(_loc.get("slice-available", sliceNames));
+        }
+        return sliceNames;
+    }
+    
+    private List<String> scanForSliceNames(Map p) {
+        List<String> sliceNames = new ArrayList<String>();
+        for (Object o : p.keySet()) {
+            String key = o.toString();
+            if (key.startsWith(PREFIX_SLICE) && key.split("\\.").length > 2) {
+                String sliceName =
+                    chopTail(chopHead(o.toString(), PREFIX_SLICE), ".");
+                if (!sliceNames.contains(sliceName))
+                    sliceNames.add(sliceName);
+            }
+        }
+        return sliceNames;
+    }
+
+    static String chopHead(String s, String head) {
+        if (s.startsWith(head))
+            return s.substring(head.length());
+        return s;
+    }
+
+    static String chopTail(String s, String tail) {
+        int i = s.lastIndexOf(tail);
+        if (i == -1)
+            return s;
+        return s.substring(0, i);
+    }
+
+    /**
+     * Creates given <code>slice</code> specific configuration properties from
+     * given <code>original</code> key-value map. The rules are
+     * <LI> if key begins with <code>"slice.XXX."</code> where
+     * <code>XXX</code> is the given slice name, then replace
+     * <code>"slice.XXX.</code> with <code>openjpa.</code>.
+     * <LI>if key begins with <code>"slice."</code> but not with
+     * <code>"slice.XXX."</code>, the ignore i.e. any property of other
+     * slices or global slice property e.g.
+     * <code>slice.DistributionPolicy</code>
+     * <code>if key starts with <code>"openjpa."</code> and a corresponding
+     * <code>"slice.XXX."</code> property does not exist, then use this as
+     * default property
+     * <code>property with any other prefix is simply copied
+     *
+     */
+    Map createSliceProperties(Map original, String slice) {
+        Map result = new Properties();
+        String prefix = PREFIX_SLICE + slice + ".";
+        for (Object o : original.keySet()) {
+            String key = o.toString();
+            if (key.startsWith(prefix)) {
+                String newKey = PREFIX_OPENJPA + key.substring(prefix.length());
+                result.put(newKey, original.get(o));
+            } else if (key.startsWith(PREFIX_SLICE)) {
+                // ignore keys that are in 'slice.' namespace but not this slice
+            } else if (key.startsWith(PREFIX_OPENJPA)) {
+                String newKey = prefix + key.substring(PREFIX_OPENJPA.length());
+                if (!original.containsKey(newKey))
+                    result.put(key, original.get(o));
+            } else { // keys that are neither "openjpa" nor "slice" namespace
+                result.put(key, original.get(o));
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Determine the master slice.
+     */
+    private void setMaster() {
+        String masterSlice = masterPlugin.get();
+        Log log = getConfigurationLog();
+        List<Slice> activeSlices = getSlices(null);
+        if (masterSlice == null || masterSlice.length() == 0) {
+            _master = activeSlices.get(0);
+            if (log.isWarnEnabled())
+                log.warn(_loc.get("no-master-slice", _master));
+            return;
+        }
+        for (Slice slice:activeSlices)
+            if (slice.getName().equals(masterSlice))
+                _master = slice;
+        if (_master == null) {
+            _master = activeSlices.get(0);
+        }
+    }
+    
+    public String getTransactionManager() {
+        return txnMgrPlugin.getString();
+    }
+
+    public void setTransactionManager(TransactionManager txnManager) {
+        txnMgrPlugin.set(txnManager);
+    }
+
+    public TransactionManager getTransactionManagerInstance() {
+        if (txnMgrPlugin.get() == null) {
+            txnMgrPlugin.instantiate(TransactionManager.class, this);
+        }
+        return (TransactionManager) txnMgrPlugin.get();
+    }
+
+    public String getExecutorService() {
+        return executorServicePlugin.getString();
+    }
+
+    public void setExecutorService(ExecutorService txnManager) {
+        executorServicePlugin.set(txnManager);
+    }
+
+    public ExecutorService getExecutorServiceInstance() {
+        if (executorServicePlugin.get() == null) {
+            executorServicePlugin.instantiate(ExecutorService.class, this);
+        }
+        return (ExecutorService) executorServicePlugin.get();
+    }
+    
+}

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedPreparedStatement.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedPreparedStatement.java?rev=619145&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedPreparedStatement.java (added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedPreparedStatement.java Wed Feb  6 12:26:14 2008
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+/**
+ * A virtual PreparedStaement that delegates to a set of actual PreparedStatements.
+ * 
+ * @author Pinaki Poddar 
+ *
+ */
+class DistributedPreparedStatement extends DistributedTemplate<PreparedStatement>
+		implements PreparedStatement {
+
+	DistributedPreparedStatement(DistributedConnection c) {
+		super(c);
+	}
+
+	public void clearParameters() throws SQLException {
+		for (PreparedStatement s : this)
+			s.clearParameters();
+	}
+
+	public boolean execute() throws SQLException {
+		boolean ret = true;
+		for (PreparedStatement s : this)
+			ret = s.execute() & ret;
+		return ret;
+	}
+
+	public ResultSet executeQuery() throws SQLException {
+		DistributedResultSet mrs = new DistributedResultSet();
+		for (PreparedStatement t : this)
+			mrs.add(t.executeQuery());
+		return mrs;
+	}
+
+	public int executeUpdate() throws SQLException {
+		int ret = 0;
+		for (PreparedStatement t : this)
+			ret += t.executeUpdate();
+		return ret;
+	}
+
+	public ResultSetMetaData getMetaData() throws SQLException {
+		return master.getMetaData();
+	}
+
+	public ParameterMetaData getParameterMetaData() throws SQLException {
+		throw new UnsupportedOperationException();
+	}
+
+	public void setArray(int i, Array x) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setArray(i, x);
+	}
+
+	public void setAsciiStream(int arg0, InputStream arg1, int arg2)
+			throws SQLException {
+		for (PreparedStatement t : this)
+			t.setAsciiStream(arg0, arg1, arg2);
+	}
+
+	public void setBigDecimal(int arg0, BigDecimal arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setBigDecimal(arg0, arg1);
+	}
+
+	public void setBinaryStream(int arg0, InputStream arg1, int arg2)
+			throws SQLException {
+		for (PreparedStatement t : this)
+			t.setBinaryStream(arg0, arg1, arg2);
+	}
+
+	public void setBlob(int arg0, Blob arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setBlob(arg0, arg1);
+	}
+
+	public void setBoolean(int arg0, boolean arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setBoolean(arg0, arg1);
+	}
+
+	public void setByte(int arg0, byte arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setByte(arg0, arg1);
+	}
+
+	public void setBytes(int arg0, byte[] arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setBytes(arg0, arg1);
+	}
+
+	public void setCharacterStream(int arg0, Reader arg1, int arg2)
+			throws SQLException {
+		for (PreparedStatement t : this)
+			t.setCharacterStream(arg0, arg1, arg2);
+	}
+
+	public void setClob(int arg0, Clob arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setClob(arg0, arg1);
+	}
+
+	public void setDate(int arg0, Date arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setDate(arg0, arg1);
+	}
+
+	public void setDate(int arg0, Date arg1, Calendar arg2) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setDate(arg0, arg1, arg2);
+	}
+
+	public void setDouble(int arg0, double arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setDouble(arg0, arg1);
+	}
+
+	public void setFloat(int arg0, float arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setFloat(arg0, arg1);
+	}
+
+	public void setInt(int arg0, int arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setInt(arg0, arg1);
+	}
+
+	public void setLong(int arg0, long arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setLong(arg0, arg1);
+	}
+
+	public void setNull(int arg0, int arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setNull(arg0, arg1);
+	}
+
+	public void setNull(int arg0, int arg1, String arg2) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setNull(arg0, arg1, arg2);
+	}
+
+	public void setObject(int arg0, Object arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setObject(arg0, arg1);
+	}
+
+	public void setObject(int arg0, Object arg1, int arg2) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setObject(arg0, arg1, arg2);
+	}
+
+	public void setObject(int arg0, Object arg1, int arg2, int arg3)
+			throws SQLException {
+		for (PreparedStatement t : this)
+			t.setObject(arg0, arg1, arg2, arg3);
+	}
+
+	public void setRef(int arg0, Ref arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setRef(arg0, arg1);
+	}
+
+	public void setShort(int arg0, short arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setShort(arg0, arg1);
+	}
+
+	public void setString(int arg0, String arg1) throws SQLException {
+		for (PreparedStatement t : this)
+			t.setString(arg0, arg1);
+	}
+
+	 public void setTime(int arg0, Time arg1) throws SQLException {
+			for (PreparedStatement t : this)
+				t.setTime(arg0, arg1);
+	 }
+	
+	 public void setTime(int arg0, Time arg1, Calendar arg2) throws
+	 SQLException {
+			for (PreparedStatement t : this)
+				t.setTime(arg0, arg1, arg2);
+	 }
+	
+	 public void setTimestamp(int arg0, Timestamp arg1) throws SQLException {
+			for (PreparedStatement t : this)
+				t.setTimestamp(arg0, arg1);
+	 }
+	
+	 public void setTimestamp(int arg0, Timestamp arg1, Calendar arg2)
+	 throws SQLException {
+			for (PreparedStatement t : this)
+				t.setTimestamp(arg0, arg1, arg2);
+	 }
+	
+	 public void setURL(int arg0, URL arg1) throws SQLException {
+			for (PreparedStatement t : this)
+				t.setURL(arg0, arg1);
+	 }
+	
+	 public void setUnicodeStream(int arg0, InputStream arg1, int arg2)
+	 throws SQLException {
+			for (PreparedStatement t : this)
+				t.setUnicodeStream(arg0, arg1, arg2);
+	 }
+	
+	 public void addBatch() throws SQLException {
+				for (PreparedStatement t:this)
+					t.addBatch();
+	 }
+	
+	// public void cancel() throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void clearBatch() throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void clearWarnings() throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void close() throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public boolean execute(String arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return false;
+	// }
+	//
+	// public boolean execute(String arg0, int arg1) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return false;
+	// }
+	//
+	// public boolean execute(String arg0, int[] arg1) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return false;
+	// }
+	//
+	// public boolean execute(String arg0, String[] arg1) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return false;
+	// }
+	//
+	// public int[] executeBatch() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return null;
+	// }
+	//
+	// public ResultSet executeQuery(String arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return null;
+	// }
+	//
+	// public int executeUpdate(String arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public int executeUpdate(String arg0, int arg1) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public int executeUpdate(String arg0, int[] arg1) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public int executeUpdate(String arg0, String[] arg1) throws SQLException
+	// {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public Connection getConnection() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return null;
+	// }
+	//
+	// public int getFetchDirection() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public int getFetchSize() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public ResultSet getGeneratedKeys() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return null;
+	// }
+	//
+	// public int getMaxFieldSize() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public int getMaxRows() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public boolean getMoreResults() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return false;
+	// }
+	//
+	// public boolean getMoreResults(int arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	// return false;
+	// }
+	//
+	// public int getQueryTimeout() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public ResultSet getResultSet() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return null;
+	// }
+	//
+	// public int getResultSetConcurrency() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public int getResultSetHoldability() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public int getResultSetType() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public int getUpdateCount() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return 0;
+	// }
+	//
+	// public SQLWarning getWarnings() throws SQLException {
+	// // TODO Auto-generated method stub
+	// return null;
+	// }
+	//
+	// public void setCursorName(String arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void setEscapeProcessing(boolean arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void setFetchDirection(int arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void setFetchSize(int arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void setMaxFieldSize(int arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void setMaxRows(int arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+	//
+	// public void setQueryTimeout(int arg0) throws SQLException {
+	// // TODO Auto-generated method stub
+	//
+	// }
+
+}

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedResultSet.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedResultSet.java?rev=619145&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedResultSet.java (added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedResultSet.java Wed Feb  6 12:26:14 2008
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * A chain of ResultSet.
+ * 
+ * @author Pinaki Poddar 
+ *
+ */
+class DistributedResultSet implements ResultSet {
+	LinkedList<ResultSet> comps = new LinkedList<ResultSet>();
+	ResultSet current;
+	int cursor = -1;
+	
+	/**
+	 * Adds the ResultSet only if it has rows.
+	 */
+	public void add(ResultSet rs) {
+		try {
+			if (rs.first())
+				comps.add(rs);
+		} catch (SQLException e) {
+			// ignore
+		}
+	}
+	
+	public boolean absolute(int arg0) throws SQLException {
+		throw new UnsupportedOperationException();
+	}
+
+	public void afterLast() throws SQLException {
+		current = null;
+		cursor  = comps.size();
+	}
+
+	public void beforeFirst() throws SQLException {
+		current = null;
+		cursor  = -1;
+	}
+
+	public void cancelRowUpdates() throws SQLException {
+		throw new UnsupportedOperationException();
+	}
+
+	public void clearWarnings() throws SQLException {
+		for (ResultSet rs:comps)
+			rs.clearWarnings();
+	}
+
+	public void close() throws SQLException {
+		for (ResultSet rs:comps)
+			rs.close();
+	}
+
+	public void deleteRow() throws SQLException {
+		current.deleteRow();
+	}
+
+	public int findColumn(String arg0) throws SQLException {
+		return 0;
+	}
+
+	public boolean first() throws SQLException {
+		if (comps.isEmpty()) return false;
+		cursor = 0;
+		current = comps.get(0);
+		return true;
+	}
+
+	public Array getArray(int arg0) throws SQLException {
+		return current.getArray(arg0);
+	}
+
+	public Array getArray(String arg0) throws SQLException {
+		return current.getArray(arg0);
+	}
+
+	public InputStream getAsciiStream(int arg0) throws SQLException {
+		return current.getAsciiStream(arg0);
+	}
+
+	public InputStream getAsciiStream(String arg0) throws SQLException {
+		return current.getAsciiStream(arg0);
+	}
+
+	public BigDecimal getBigDecimal(int arg0) throws SQLException {
+		return current.getBigDecimal(arg0);
+	}
+
+	public BigDecimal getBigDecimal(String arg0) throws SQLException {
+		return current.getBigDecimal(arg0);
+	}
+
+	public BigDecimal getBigDecimal(int arg0, int arg1) throws SQLException {
+		return current.getBigDecimal(arg0, arg1);
+	}
+
+	public BigDecimal getBigDecimal(String arg0, int arg1) throws SQLException {
+		return current.getBigDecimal(arg0, arg1);
+	}
+
+	public InputStream getBinaryStream(int arg0) throws SQLException {
+		return current.getBinaryStream(arg0);
+	}
+
+	public InputStream getBinaryStream(String arg0) throws SQLException {
+		return current.getBinaryStream(arg0);
+	}
+
+	public Blob getBlob(int arg0) throws SQLException {
+		return current.getBlob(arg0);
+	}
+
+	public Blob getBlob(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public boolean getBoolean(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	public boolean getBoolean(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	public byte getByte(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public byte getByte(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public byte[] getBytes(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public byte[] getBytes(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Reader getCharacterStream(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Reader getCharacterStream(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Clob getClob(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Clob getClob(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public int getConcurrency() throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public String getCursorName() throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Date getDate(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Date getDate(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Date getDate(int arg0, Calendar arg1) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Date getDate(String arg0, Calendar arg1) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public double getDouble(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public double getDouble(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public int getFetchDirection() throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public int getFetchSize() throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public float getFloat(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public float getFloat(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public int getInt(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public int getInt(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public long getLong(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public long getLong(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public ResultSetMetaData getMetaData() throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Object getObject(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Object getObject(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Object getObject(int arg0, Map<String, Class<?>> arg1)
+			throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Object getObject(String arg0, Map<String, Class<?>> arg1)
+			throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Ref getRef(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Ref getRef(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public int getRow() throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public short getShort(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public short getShort(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public Statement getStatement() throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public String getString(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public String getString(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Time getTime(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Time getTime(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Time getTime(int arg0, Calendar arg1) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Time getTime(String arg0, Calendar arg1) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Timestamp getTimestamp(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Timestamp getTimestamp(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Timestamp getTimestamp(int arg0, Calendar arg1) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public Timestamp getTimestamp(String arg0, Calendar arg1)
+			throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public int getType() throws SQLException {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	public URL getURL(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public URL getURL(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public InputStream getUnicodeStream(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public InputStream getUnicodeStream(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public SQLWarning getWarnings() throws SQLException {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	public void insertRow() throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public boolean isAfterLast() throws SQLException {
+		return current == null && cursor >= comps.size();
+	}
+
+	public boolean isBeforeFirst() throws SQLException {
+		return current == null && cursor<0;
+	}
+
+	public boolean isFirst() throws SQLException {
+		return current != null && current.isFirst() && cursor==0;
+	}
+
+	public boolean isLast() throws SQLException {
+		return current != null && current.isLast() && cursor==comps.size()-1;
+	}
+
+	public boolean last() throws SQLException {
+		if (comps.isEmpty()) return false;
+		cursor = comps.size()-1;
+		return false;
+	}
+
+	public void moveToCurrentRow() throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void moveToInsertRow() throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public boolean next() throws SQLException {
+		if (current == null) {
+			current = comps.get(0);
+			cursor = 0;
+		}
+		if (current.next())
+			return true;
+		cursor++;
+		if (cursor<comps.size())
+			current = comps.get(cursor);
+		return cursor<comps.size();
+	}
+
+	public boolean previous() throws SQLException {
+		return current.previous();
+	}
+
+	public void refreshRow() throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public boolean relative(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	public boolean rowDeleted() throws SQLException {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	public boolean rowInserted() throws SQLException {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	public boolean rowUpdated() throws SQLException {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+	public void setFetchDirection(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void setFetchSize(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateArray(int arg0, Array arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateArray(String arg0, Array arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateAsciiStream(int arg0, InputStream arg1, int arg2)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateAsciiStream(String arg0, InputStream arg1, int arg2)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBigDecimal(int arg0, BigDecimal arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBigDecimal(String arg0, BigDecimal arg1)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBinaryStream(int arg0, InputStream arg1, int arg2)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBinaryStream(String arg0, InputStream arg1, int arg2)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBlob(int arg0, Blob arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBlob(String arg0, Blob arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBoolean(int arg0, boolean arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBoolean(String arg0, boolean arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateByte(int arg0, byte arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateByte(String arg0, byte arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBytes(int arg0, byte[] arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateBytes(String arg0, byte[] arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateCharacterStream(int arg0, Reader arg1, int arg2)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateCharacterStream(String arg0, Reader arg1, int arg2)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateClob(int arg0, Clob arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateClob(String arg0, Clob arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateDate(int arg0, Date arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateDate(String arg0, Date arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateDouble(int arg0, double arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateDouble(String arg0, double arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateFloat(int arg0, float arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateFloat(String arg0, float arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateInt(int arg0, int arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateInt(String arg0, int arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateLong(int arg0, long arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateLong(String arg0, long arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateNull(int arg0) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateNull(String arg0) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateObject(int arg0, Object arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateObject(String arg0, Object arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateObject(int arg0, Object arg1, int arg2)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateObject(String arg0, Object arg1, int arg2)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateRef(int arg0, Ref arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateRef(String arg0, Ref arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateRow() throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateShort(int arg0, short arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateShort(String arg0, short arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateString(int arg0, String arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateString(String arg0, String arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateTime(int arg0, Time arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateTime(String arg0, Time arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateTimestamp(int arg0, Timestamp arg1) throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public void updateTimestamp(String arg0, Timestamp arg1)
+			throws SQLException {
+		// TODO Auto-generated method stub
+
+	}
+
+	public boolean wasNull() throws SQLException {
+		// TODO Auto-generated method stub
+		return false;
+	}
+
+}

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStatement.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStatement.java?rev=619145&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStatement.java (added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStatement.java Wed Feb  6 12:26:14 2008
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.sql.Statement;
+
+/**
+ * A virtual Statement that delegates to many actual Statements.
+ * 
+ * @author Pinaki Poddar 
+ *
+ */
+class DistributedStatement extends DistributedTemplate<Statement>  {
+	public DistributedStatement(DistributedConnection c) {
+		super(c);
+	}
+}

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=619145&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java (added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java Wed Feb  6 12:26:14 2008
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import javax.naming.ConfigurationException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.openjpa.conf.OpenJPAConfiguration;
+import org.apache.openjpa.enhance.PersistenceCapable;
+import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
+import org.apache.openjpa.jdbc.kernel.ConnectionInfo;
+import org.apache.openjpa.jdbc.kernel.JDBCStore;
+import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
+import org.apache.openjpa.jdbc.sql.Result;
+import org.apache.openjpa.jdbc.sql.ResultSetResult;
+import org.apache.openjpa.kernel.FetchConfiguration;
+import org.apache.openjpa.kernel.OpenJPAStateManager;
+import org.apache.openjpa.kernel.PCState;
+import org.apache.openjpa.kernel.QueryLanguages;
+import org.apache.openjpa.kernel.Seq;
+import org.apache.openjpa.kernel.StoreContext;
+import org.apache.openjpa.kernel.StoreManager;
+import org.apache.openjpa.kernel.StoreQuery;
+import org.apache.openjpa.kernel.exps.ExpressionParser;
+import org.apache.openjpa.lib.log.Log;
+import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
+import org.apache.openjpa.lib.rop.ResultObjectProvider;
+import org.apache.openjpa.lib.util.Localizer;
+import org.apache.openjpa.meta.ClassMetaData;
+import org.apache.openjpa.meta.FieldMetaData;
+import org.apache.openjpa.slice.DistributionPolicy;
+import org.apache.openjpa.slice.transaction.DistributedNaiveTransaction;
+import org.apache.openjpa.slice.transaction.DistributedTransactionManager;
+import org.apache.openjpa.slice.transaction.NaiveTransactionManager;
+import org.apache.openjpa.util.InternalException;
+import org.apache.openjpa.util.StoreException;
+import org.apache.openjpa.util.UserException;
+
+/**
+ * A Store manager for multiple physical databases referred as <em>slice</em>.
+ * This receiver behaves like a Transaction Manager as it implements two-phase
+ * commit protocol if all the component slices is XA-complaint. The actions are
+ * delegated to the underlying slices. The actions are executed in parallel
+ * threads whenever possible such as flushing or query. <br>
+ * 
+ * @author Pinaki Poddar
+ * 
+ */
+class DistributedStoreManager extends JDBCStoreManager {
+    private final List<SliceStoreManager> _slices;
+    private JDBCStoreManager _master;
+    private boolean isXA;
+    private TransactionManager _tm;
+    private final DistributedJDBCConfiguration _conf;
+    private Log _log;
+    private static final Localizer _loc =
+            Localizer.forPackage(DistributedStoreManager.class);
+    private static ExecutorService threadPool = Executors.newCachedThreadPool();
+
+    /**
+     * Constructs a set of child StoreManagers each connected to a physical
+     * DataSource.
+     * 
+     * The supplied configuration carries multiple URL for underlying physical
+     * slices. The first slice is referred as <em>master</em> and is used to
+     * get Sequence based entity identifiers.
+     */
+    public DistributedStoreManager(DistributedJDBCConfiguration conf) {
+        super();
+        _conf = conf;
+        _log = conf.getLog(OpenJPAConfiguration.LOG_RUNTIME);
+        _slices = new ArrayList<SliceStoreManager>();
+        for (String name : conf.getActiveSliceNames()) {
+            SliceStoreManager slice = new SliceStoreManager
+                (conf.getSlice(name));
+            _slices.add(slice);
+            if (slice.getName().equals(conf.getMaster().getName()))
+                _master = slice;
+        }
+    }
+
+    public DistributedJDBCConfiguration getConfiguration() {
+        return _conf;
+    }
+
+    /**
+     * Decides the index of the StoreManager by first looking at the
+     * implementation data. If not found then {@link DistributionPolicy
+     * DistributionPolicy} determines the target store for new instances and
+     * additional connection info is used to estimate for the existing
+     * instances.
+     */
+    protected String findSliceName(OpenJPAStateManager sm, Object info) {
+        boolean hasIndex = hasSlice(sm);
+        if (hasIndex)
+            return sm.getImplData().toString();
+        String slice = estimateSlice(sm, info);
+        if (slice == null)
+            return assignSlice(sm);
+        return slice;
+    }
+
+    private boolean hasSlice(OpenJPAStateManager sm) {
+        return sm.getImplData() != null;
+    }
+
+    private String assignSlice(OpenJPAStateManager sm) {
+        PersistenceCapable pc = sm.getPersistenceCapable();
+        String slice =
+                _conf.getDistributionPolicyInstance().distribute(pc,
+                        _conf.getActiveSliceNames(), getContext());
+        if (!_conf.getActiveSliceNames().contains(slice)) {
+            throw new UserException(_loc.get("bad-policy-slice", new Object[] {
+                    _conf.getDistributionPolicyInstance().getClass().getName(),
+                    slice, sm.getPersistenceCapable(), 
+                    _conf.getActiveSliceNames() }));
+        }
+        sm.setImplData(slice, true);
+        return slice;
+    }
+
+    /**
+     * The additional edata is used, if possible, to find the StoreManager
+     * managing the given StateManager. If the additional data is unavailable
+     * then return null.
+     * 
+     */
+    private String estimateSlice(OpenJPAStateManager sm, Object edata) {
+        if (edata == null || !(edata instanceof ConnectionInfo))
+            return null;
+
+        Result result = ((ConnectionInfo) edata).result;
+        if (result instanceof ResultSetResult) {
+            JDBCStore store = ((ResultSetResult) result).getStore();
+            for (SliceStoreManager slice : _slices) {
+                if (slice == store) {
+                    String sliceId = slice.getName();
+                    sm.setImplData(sliceId, true);
+                    return sliceId;
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Selects a child StoreManager where the given instance resides.
+     */
+    private StoreManager selectStore(OpenJPAStateManager sm, Object edata) {
+        String name = findSliceName(sm, edata);
+        SliceStoreManager slice = lookup(name);
+        if (slice == null)
+            throw new InternalException(_loc.get("wrong-slice", name, sm));
+        return slice;
+    }
+
+    public boolean assignField(OpenJPAStateManager sm, int field,
+            boolean preFlush) {
+        return selectStore(sm, null).assignField(sm, field, preFlush);
+    }
+
+    public boolean assignObjectId(OpenJPAStateManager sm, boolean preFlush) {
+        return _master.assignObjectId(sm, preFlush);
+    }
+
+    public void beforeStateChange(OpenJPAStateManager sm, PCState fromState,
+            PCState toState) {
+        _master.beforeStateChange(sm, fromState, toState);
+    }
+
+    public void begin() {
+        TransactionManager tm = getTransactionManager();
+        for (SliceStoreManager slice : _slices) {
+            try {
+                Transaction txn = tm.getTransaction();
+                if (isXA) {
+                    txn.enlistResource(slice.getXAConnection().getXAResource());
+                } else { // This is the only place where casting to our
+                         // internal implementation classes become necessary
+                    ((DistributedNaiveTransaction) txn).enlistResource(slice);
+                }
+            } catch (Exception e) {
+                throw new InternalException(e);
+            }
+        }
+
+        try {
+            tm.begin();
+        } catch (Exception e) {
+            throw new StoreException(e);
+        }
+    }
+
+    Log getLog(SliceStoreManager slice) {
+        return slice.getConfiguration()
+                .getLog(OpenJPAConfiguration.LOG_RUNTIME);
+    }
+
+    public void beginOptimistic() {
+        for (SliceStoreManager slice : _slices)
+            slice.beginOptimistic();
+    }
+
+    public boolean cancelAll() {
+        boolean ret = true;
+        for (SliceStoreManager slice : _slices)
+            ret = slice.cancelAll() & ret;
+        return ret;
+    }
+
+    public void close() {
+        for (SliceStoreManager slice : _slices)
+            slice.close();
+    }
+
+    public void commit() {
+        TransactionManager tm = getTransactionManager();
+        try {
+            tm.commit();
+        } catch (Exception e) {
+            throw new StoreException(e);
+        }
+    }
+
+    public int compareVersion(OpenJPAStateManager sm, Object v1, Object v2) {
+        return selectStore(sm, null).compareVersion(sm, v1, v2);
+    }
+
+    public Object copyDataStoreId(Object oid, ClassMetaData meta) {
+        return _master.copyDataStoreId(oid, meta);
+    }
+
+    public ResultObjectProvider executeExtent(ClassMetaData meta,
+            boolean subclasses, FetchConfiguration fetch) {
+        ResultObjectProvider[] tmp = new ResultObjectProvider[_slices.size()];
+        int i = 0;
+        for (SliceStoreManager slice : _slices) {
+            tmp[i++] = slice.executeExtent(meta, subclasses, fetch);
+        }
+        return new MergedResultObjectProvider(tmp);
+    }
+
+    public boolean exists(OpenJPAStateManager sm, Object edata) {
+        for (SliceStoreManager slice : _slices) {
+            if (slice.exists(sm, edata)) {
+                sm.setImplData(slice.getName(), true);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Flush the given StateManagers after binning them to respective physical
+     * slices.
+     */
+    public Collection flush(Collection sms) {
+        Collection exceptions = new ArrayList();
+        List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
+        Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
+        for (SliceStoreManager slice : _slices) {
+            List<OpenJPAStateManager> subset = subsets.get(slice.getName());
+            if (subset.isEmpty())
+                continue;
+            futures.add(threadPool.submit(new Flusher(slice, subset)));
+        }
+        for (Future<Collection> future : futures) {
+            Collection error;
+            try {
+                error = future.get();
+                if (!(error == null || error.isEmpty())) {
+                    exceptions.addAll(error);
+                }
+            } catch (InterruptedException e) {
+                throw new StoreException(e);
+            } catch (ExecutionException e) {
+                throw new StoreException(e.getCause());
+            }
+        }
+        return exceptions;
+    }
+
+    /**
+     * Separate the given list of StateManagers in separate lists for each slice 
+     * by the associated slice identifier of each StateManager.
+     * @param sms
+     * @param edata
+     * @return
+     */
+    private Map<String, List<OpenJPAStateManager>> bin(
+            Collection/*<StateManage>*/ sms, Object edata) {
+        Map<String, List<OpenJPAStateManager>> subsets =
+                new HashMap<String, List<OpenJPAStateManager>>();
+        for (SliceStoreManager slice : _slices)
+            subsets.put(slice.getName(), new ArrayList<OpenJPAStateManager>());
+        for (Object x : sms) {
+            OpenJPAStateManager sm = (OpenJPAStateManager) x;
+            String slice = findSliceName(sm, edata);
+            subsets.get(slice).add(sm);
+        }
+        return subsets;
+    }
+
+    public Object getClientConnection() {
+        throw new UnsupportedOperationException();
+    }
+
+    public Seq getDataStoreIdSequence(ClassMetaData forClass) {
+        return _master.getDataStoreIdSequence(forClass);
+    }
+
+    public Class getDataStoreIdType(ClassMetaData meta) {
+        return _master.getDataStoreIdType(meta);
+    }
+
+    public Class getManagedType(Object oid) {
+        return _master.getManagedType(oid);
+    }
+
+    public Seq getValueSequence(FieldMetaData forField) {
+        return _master.getValueSequence(forField);
+    }
+
+    public boolean initialize(OpenJPAStateManager sm, PCState state,
+            FetchConfiguration fetch, Object edata) {
+        if (edata instanceof ConnectionInfo) {
+            String slice = findSliceName(sm, (ConnectionInfo) edata);
+            if (slice != null)
+                return lookup(slice).initialize(sm, state, fetch, edata);
+        }
+        // not a part of Query result load. Look into the slices till found
+        for (SliceStoreManager slice : _slices) {
+            if (slice.initialize(sm, state, fetch, edata)) {
+                sm.setImplData(slice.getName(), true);
+                return true;
+            }
+        }
+        return false;
+
+    }
+
+    public boolean load(OpenJPAStateManager sm, BitSet fields,
+            FetchConfiguration fetch, int lockLevel, Object edata) {
+        return selectStore(sm, edata).load(sm, fields, fetch, lockLevel, edata);
+    }
+
+    public Collection loadAll(Collection sms, PCState state, int load,
+            FetchConfiguration fetch, Object edata) {
+        Map<String, List<OpenJPAStateManager>> subsets = bin(sms, edata);
+        Collection result = new ArrayList();
+        for (SliceStoreManager slice : _slices) {
+            List<OpenJPAStateManager> subset = subsets.get(slice.getName());
+            if (subset.isEmpty())
+                continue;
+            Collection tmp = slice.loadAll(subset, state, load, fetch, edata);
+            if (tmp != null && !tmp.isEmpty())
+                result.addAll(tmp);
+        }
+        return result;
+    }
+
+    public Object newDataStoreId(Object oidVal, ClassMetaData meta) {
+        return _master.newDataStoreId(oidVal, meta);
+    }
+
+    public FetchConfiguration newFetchConfiguration() {
+        return _master.newFetchConfiguration();
+    }
+
+    /**
+     * Construct a distributed query to be executed against all the slices.
+     */
+    public StoreQuery newQuery(String language) {
+        ExpressionParser parser = QueryLanguages.parserForLanguage(language);
+        DistributedStoreQuery ret = new DistributedStoreQuery(this, parser);
+        for (SliceStoreManager slice : _slices) {
+            ret.add(slice.newQuery(language));
+        }
+        return ret;
+    }
+
+    public void releaseConnection() {
+        for (SliceStoreManager slice : _slices)
+            slice.releaseConnection();
+
+    }
+
+    public void retainConnection() {
+        for (SliceStoreManager slice : _slices)
+            slice.retainConnection();
+    }
+
+    public void rollback() {
+        TransactionManager tm = getTransactionManager();
+        try {
+            tm.rollback();
+        } catch (Exception e) {
+            throw new StoreException(e);
+        }
+    }
+
+    public void rollbackOptimistic() {
+        for (SliceStoreManager slice : _slices)
+            slice.rollbackOptimistic();
+    }
+
+    /**
+     * Sets the context for this receiver and all its underlying slices.
+     */
+    public void setContext(StoreContext ctx) {
+        super.setContext(ctx);
+        isXA = true;
+        for (SliceStoreManager store : _slices) {
+            store.setContext(ctx, 
+                    (JDBCConfiguration)store.getSlice().getConfiguration());
+            isXA &= store.isXAEnabled();
+        }
+        _tm = getTransactionManager();
+    }
+
+    private SliceStoreManager lookup(String name) {
+        for (SliceStoreManager slice : _slices)
+            if (slice.getName().equals(name))
+                return slice;
+        return null;
+    }
+
+    public boolean syncVersion(OpenJPAStateManager sm, Object edata) {
+        return selectStore(sm, edata).syncVersion(sm, edata);
+    }
+
+    protected TransactionManager getTransactionManager() {
+        if (_tm == null) {
+            _tm = getConfiguration().getTransactionManagerInstance();
+            String alias = getConfiguration().getTransactionManager();
+            boolean is2pc = !(_tm instanceof NaiveTransactionManager);
+            if (isXA) { 
+                if (!is2pc) { 
+                    _log.warn(_loc.get("resource-xa-tm-not-2pc", alias));
+                    isXA = false;
+                }
+            } else if (is2pc) {
+                throw new UserException(_loc.get("resource-not-xa-tm-2pc", 
+                        alias));
+            } 
+        }
+        return _tm;
+    }
+
+    private static class Flusher implements Callable<Collection> {
+        final SliceStoreManager store;
+        final Collection toFlush;
+
+        Flusher(SliceStoreManager store, Collection toFlush) {
+            this.store = store;
+            this.toFlush = toFlush;
+        }
+
+        public Collection call() throws Exception {
+            return store.flush(toFlush);
+        }
+    }
+
+}

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=619145&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java (added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java Wed Feb  6 12:26:14 2008
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.openjpa.jdbc.kernel.JDBCStore;
+import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.kernel.ExpressionStoreQuery;
+import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
+import org.apache.openjpa.kernel.QueryContext;
+import org.apache.openjpa.kernel.StoreQuery;
+import org.apache.openjpa.kernel.exps.ExpressionParser;
+import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
+import org.apache.openjpa.lib.rop.ResultObjectProvider;
+import org.apache.openjpa.meta.ClassMetaData;
+import org.apache.openjpa.util.StoreException;
+
+/**
+ * A query for distributed databases.
+ * 
+ * @author Pinaki Poddar 
+ *
+ */
+@SuppressWarnings("serial")
+class DistributedStoreQuery extends JDBCStoreQuery {
+	private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
+	private ExpressionParser _parser;
+	
+	public DistributedStoreQuery(JDBCStore store, ExpressionParser parser) {
+		super(store, parser);
+		_parser = parser;
+		
+	}
+	
+	void add(StoreQuery q) {
+		_queries.add(q);
+	}
+	
+    public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
+    	ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser, 
+    			ctx.getCompilation());
+        for (StoreQuery q:_queries) {
+        	ex.addExecutor(q.newDataStoreExecutor(meta, subs));
+        }
+        return ex;
+    }
+    
+    public void setContext(QueryContext ctx) {
+    	super.setContext(ctx);
+    	for (StoreQuery q:_queries) 
+    		q.setContext(ctx); 
+    }
+    
+    public ExecutorService getExecutorServiceInstance() {
+        DistributedJDBCConfiguration conf = 
+            ((DistributedJDBCConfiguration)getStore().getConfiguration());
+        return conf.getExecutorServiceInstance();
+    }
+
+	/**
+	 * Executes queries on multiple databases.
+	 * 
+	 * @author Pinaki Poddar 
+	 *
+	 */
+	public static class ParallelExecutor extends 
+		ExpressionStoreQuery.DataStoreExecutor {
+		private List<Executor> executors = new ArrayList<Executor>();
+		private DistributedStoreQuery owner = null;
+		private ExecutorService threadPool = null;
+		
+		public void addExecutor(Executor ex) {
+			executors.add(ex);
+		}
+		
+        public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta, 
+        		boolean subclasses, ExpressionParser parser, Object parsed) {
+        	super(dsq, meta, subclasses, parser, parsed);
+        	owner = dsq;
+        	threadPool = ((DistributedJDBCConfiguration)dsq.getStore()
+        	        .getConfiguration()).getExecutorServiceInstance();
+        }
+        
+        /**
+         * Each child query must be executed with slice context and not the 
+         * given query context.
+         */
+        public ResultObjectProvider executeQuery(StoreQuery q,
+                final Object[] params, final Range range) {
+        	ResultObjectProvider[] tmp = new ResultObjectProvider[executors.size()];
+        	final Iterator<StoreQuery> qs = owner._queries.iterator();
+        	final List<Future<ResultObjectProvider>> futures = 
+        		new ArrayList<Future<ResultObjectProvider>>();
+        	int i = 0;
+        	for (Executor ex:executors)  {
+        		QueryExecutor call = new QueryExecutor();
+        		call.executor = ex;
+        		call.query    = qs.next();
+        		call.params   = params;
+        		call.range    = range;
+        		futures.add(threadPool.submit(call)); 
+        	}
+        	for (Future<ResultObjectProvider> future:futures) {
+        		try {
+					tmp[i++] = future.get();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
+				}
+        	}
+        	boolean[] ascending = getAscending(q);
+        	boolean isAscending = ascending.length > 0;
+        	boolean isUnique    = q.getContext().isUnique();
+        	if (isUnique) {
+        	    
+        	    return new UniqueResultObjectProvider(tmp, q, getQueryExpressions());
+        	}
+        	if (isAscending) {
+        	    return new OrderingMergedResultObjectProvider(tmp, ascending, 
+                  (Executor[])executors.toArray(new Executor[executors.size()]),
+                  q, params);
+        	}
+        	return new MergedResultObjectProvider(tmp);
+        }
+        
+        public Number executeDelete(StoreQuery q, Object[] params) {
+        	Iterator<StoreQuery> qs = owner._queries.iterator();
+        	final List<Future<Number>> futures = new ArrayList<Future<Number>>();
+        	for (Executor ex:executors) {
+        		DeleteExecutor call = new DeleteExecutor();
+        		call.executor = ex;
+        		call.query    = qs.next();
+        		call.params   = params;
+        		futures.add(threadPool.submit(call)); 
+        	}
+        	int N = 0;
+        	for (Future<Number> future:futures) {
+        		try {
+            		Number n = future.get();
+            		if (n != null) 
+            			N += n.intValue();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
+				}
+        	}
+        	return new Integer(N);
+        }
+        
+        public Number executeUpdate(StoreQuery q, Object[] params) {
+        	Iterator<StoreQuery> qs = owner._queries.iterator();
+        	final List<Future<Number>> futures = new ArrayList<Future<Number>>();
+        	for (Executor ex:executors) {
+        		UpdateExecutor call = new UpdateExecutor();
+        		call.executor = ex;
+        		call.query    = qs.next();
+        		call.params   = params;
+        		futures.add(threadPool.submit(call)); 
+        	}
+        	int N = 0;
+        	for (Future<Number> future:futures) {
+        		try {
+            		Number n = future.get();
+            		if (n != null) 
+            			N += n.intValue();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e.getCause());
+				}
+        	}
+        	return new Integer(N);
+        }
+
+	}
+	
+	static  class QueryExecutor implements Callable<ResultObjectProvider> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+		Range range;
+		public ResultObjectProvider call() throws Exception {
+			return executor.executeQuery(query, params, range);
+		}
+	}
+	
+	static  class DeleteExecutor implements Callable<Number> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+		public Number call() throws Exception {
+			return executor.executeDelete(query, params);
+		}
+	}
+	
+	static  class UpdateExecutor implements Callable<Number> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+		public Number call() throws Exception {
+			return executor.executeDelete(query, params);
+		}
+	}
+}
+

Added: openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedTemplate.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedTemplate.java?rev=619145&view=auto
==============================================================================
--- openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedTemplate.java (added)
+++ openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedTemplate.java Wed Feb  6 12:26:14 2008
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.slice.jdbc;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A template for multiple Statements being executed by multiple connections.
+ * 
+ * @author Pinaki Poddar 
+ *
+ */
+class DistributedTemplate<T extends Statement> 
+	implements Statement, Iterable<T> {
+	protected List<T> stmts = new ArrayList<T>();
+	protected final DistributedConnection con;
+	protected T master;
+	
+	public DistributedTemplate(DistributedConnection c) {
+		con = c;
+	}
+	
+	public Iterator<T> iterator() {
+		return stmts.iterator();
+	}
+	
+	public void add(T s) {
+		if (stmts.isEmpty())
+			master = s;
+		try {
+			if (!con.contains(s.getConnection()))
+				throw new IllegalArgumentException(s + " has different connection");
+			stmts.add(s);
+		} catch (SQLException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public void addBatch(String sql) throws SQLException {
+		for (T s:this)
+			s.addBatch(sql);
+	}
+
+	public void cancel() throws SQLException {
+		for (T s:this)
+			s.cancel();
+	}
+
+	public void clearBatch() throws SQLException {
+		for (T s:this)
+			s.clearBatch();
+	}
+
+	public void clearWarnings() throws SQLException {
+		for (T s:this)
+			s.clearWarnings();
+	}
+
+	public void close() throws SQLException {
+		for (T s:this)
+			s.close();
+	}
+
+	public boolean execute(String arg0) throws SQLException {
+		boolean ret = true;
+		for (T s:this)
+			ret = s.execute(arg0) & ret;
+		return ret;
+	}
+
+	public boolean execute(String arg0, int arg1) throws SQLException {
+		boolean ret = true;
+		for (T s:this)
+			ret = s.execute(arg0, arg1) & ret;
+		return ret;
+	}
+
+	public boolean execute(String arg0, int[] arg1) throws SQLException {
+		boolean ret = true;
+		for (T s:this)
+			ret = s.execute(arg0, arg1) & ret;
+		return ret;
+	}
+
+	public boolean execute(String arg0, String[] arg1) throws SQLException {
+		boolean ret = true;
+		for (T s:this)
+			ret = s.execute(arg0, arg1) & ret;
+		return ret;
+	}
+
+	public int[] executeBatch() throws SQLException {
+		int[] ret = new int[0];
+		for (Statement s:this) {
+			int[] tmp = s.executeBatch();
+			ret = new int[ret.length + tmp.length];
+			System.arraycopy(tmp, 0, ret, ret.length-tmp.length, tmp.length);
+		}
+		return ret;
+	}
+
+	public ResultSet executeQuery() throws SQLException {
+		DistributedResultSet rs = new DistributedResultSet();
+		for (T s:this)
+			rs.add(s.executeQuery(null));
+		return rs;
+	}
+
+	public ResultSet executeQuery(String arg0) throws SQLException {
+		DistributedResultSet rs = new DistributedResultSet();
+		for (T s:this)
+			rs.add(s.executeQuery(arg0));
+		return rs;
+	}
+
+	public int executeUpdate(String arg0) throws SQLException {
+		int ret = 0;
+		for (T s:this)
+			ret += s.executeUpdate(arg0);
+		return ret;
+	}
+
+	public int executeUpdate(String arg0, int arg1) throws SQLException {
+		int ret = 0;
+		for (T s:this)
+			ret += s.executeUpdate(arg0, arg1);
+		return ret;
+	}
+
+	public int executeUpdate(String arg0, int[] arg1) throws SQLException {
+		int ret = 0;
+		for (T s:this)
+			ret += s.executeUpdate(arg0, arg1);
+		return ret;
+	}
+
+	public int executeUpdate(String arg0, String[] arg1) throws SQLException {
+		int ret = 0;
+		for (T s:this)
+			ret += s.executeUpdate(arg0, arg1);
+		return ret;
+	}
+
+	public Connection getConnection() throws SQLException {
+		return con;
+	}
+
+	public int getFetchDirection() throws SQLException {
+		return master.getFetchDirection();
+	}
+
+	public int getFetchSize() throws SQLException {
+		return master.getFetchSize();
+	}
+
+	public ResultSet getGeneratedKeys() throws SQLException {
+		DistributedResultSet mrs = new DistributedResultSet();
+		for (T s:this)
+			mrs.add(s.getGeneratedKeys());
+		return mrs;
+	}
+
+	public int getMaxFieldSize() throws SQLException {
+		return master.getMaxFieldSize();
+	}
+
+	public int getMaxRows() throws SQLException {
+		return master.getMaxRows();
+	}
+
+	public boolean getMoreResults() throws SQLException {
+		for (T s:this)
+			if (s.getMoreResults())
+				return true;
+		return false;
+	}
+
+	public boolean getMoreResults(int arg0) throws SQLException {
+		for (T s:this)
+			if (s.getMoreResults(arg0))
+				return true;
+		return false;
+	}
+
+	public int getQueryTimeout() throws SQLException {
+		return master.getQueryTimeout();
+	}
+
+	public ResultSet getResultSet() throws SQLException {
+		DistributedResultSet rs = new DistributedResultSet();
+		for (T s:this)
+			rs.add(s.getResultSet());
+		return rs;
+	}
+
+	public int getResultSetConcurrency() throws SQLException {
+		return master.getResultSetConcurrency();
+	}
+
+	public int getResultSetHoldability() throws SQLException {
+		return master.getResultSetHoldability();
+	}
+
+	public int getResultSetType() throws SQLException {
+		return master.getResultSetType();
+	}
+
+	public int getUpdateCount() throws SQLException {
+		return master.getUpdateCount();
+	}
+
+	public SQLWarning getWarnings() throws SQLException {
+		return master.getWarnings();
+	}
+
+	public void setCursorName(String name) throws SQLException {
+		for (T s:this)
+			s.setCursorName(name);
+	}
+
+	public void setEscapeProcessing(boolean flag) throws SQLException {
+		for (T s:this)
+			s.setEscapeProcessing(flag);
+	}
+
+	public void setFetchDirection(int dir) throws SQLException {
+		for (T s:this)
+			s.setFetchDirection(dir);
+	}
+
+	public void setFetchSize(int size) throws SQLException {
+		for (T s:this)
+			s.setFetchSize(size);
+	}
+
+	public void setMaxFieldSize(int size) throws SQLException {
+		for (T s:this)
+			s.setMaxFieldSize(size);
+	}
+
+	public void setMaxRows(int n) throws SQLException {
+		for (T s:this)
+			s.setMaxFieldSize(n);
+	}
+	
+	public void setQueryTimeout(int n) throws SQLException {
+		for (T s:this)
+			s.setMaxFieldSize(n);
+	}
+}



Mime
View raw message