openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From allee8...@apache.org
Subject svn commit: r633317 - in /openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel: BatchingConstraintUpdateManager.java BatchingOperationOrderUpdateManager.java BatchingPreparedStatementManagerImpl.java PreparedStatementManagerImpl.java
Date Mon, 03 Mar 2008 22:59:10 GMT
Author: allee8285
Date: Mon Mar  3 14:59:07 2008
New Revision: 633317

URL: http://svn.apache.org/viewvc?rev=633317&view=rev
Log:
OPENJPA-530 - Change BatchingPreparedStatementManagerImpl to correctly batch dispatched statements
in the same order requested by the update managers.

Modified:
    openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java
    openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java
    openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
    openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java

Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java?rev=633317&r1=633316&r2=633317&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java
(original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingConstraintUpdateManager.java
Mon Mar  3 14:59:07 2008
@@ -19,19 +19,9 @@
 package org.apache.openjpa.jdbc.kernel;
 
 import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 
-import org.apache.openjpa.jdbc.schema.ForeignKey;
-import org.apache.openjpa.jdbc.sql.PrimaryRow;
-import org.apache.openjpa.jdbc.sql.Row;
-import org.apache.openjpa.jdbc.sql.RowImpl;
 import org.apache.openjpa.jdbc.sql.RowManager;
-import org.apache.openjpa.jdbc.sql.RowManagerImpl;
-import org.apache.openjpa.jdbc.sql.SQLExceptions;
-import org.apache.openjpa.kernel.OpenJPAStateManager;
 
 /**
  * <P>Batch update manager that writes the SQL in object-level operation order. 
@@ -51,8 +41,22 @@
 public class BatchingConstraintUpdateManager extends ConstraintUpdateManager {
 
     protected PreparedStatementManager newPreparedStatementManager(
-            JDBCStore store, Connection conn) {
+        JDBCStore store, Connection conn) {
         int batchLimit = dict.getBatchLimit();
-        return new BatchingPreparedStatementManagerImpl(store, conn, batchLimit);
+        return new BatchingPreparedStatementManagerImpl(store, conn,
+            batchLimit);
+    }
+
+    /*
+     * Override this method to flush any remaining batched row in the
+     * PreparedStatementManager.
+     */
+    protected Collection flush(RowManager rowMgr,
+        PreparedStatementManager psMgr, Collection exceps) {
+        Collection rtnCol = super.flush(rowMgr, psMgr, exceps);
+        BatchingPreparedStatementManagerImpl bPsMgr =
+            (BatchingPreparedStatementManagerImpl) psMgr;
+        bPsMgr.flushBatch();
+        return rtnCol;
     }
 }

Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java?rev=633317&r1=633316&r2=633317&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java
(original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingOperationOrderUpdateManager.java
Mon Mar  3 14:59:07 2008
@@ -19,6 +19,9 @@
 package org.apache.openjpa.jdbc.kernel;
 
 import java.sql.Connection;
+import java.util.Collection;
+
+import org.apache.openjpa.jdbc.sql.RowManager;
 
 /**
  * <P>Batch update manager that writes the SQL in object-level operation order. 
@@ -37,12 +40,25 @@
  */
 
 public class BatchingOperationOrderUpdateManager extends
-        OperationOrderUpdateManager {
+    OperationOrderUpdateManager {
 
     protected PreparedStatementManager newPreparedStatementManager(
-            JDBCStore store, Connection conn) {
+        JDBCStore store, Connection conn) {
         int batchLimit = dict.getBatchLimit();
-        return new BatchingPreparedStatementManagerImpl(
-                store, conn, batchLimit);
+        return new BatchingPreparedStatementManagerImpl(store, conn,
+            batchLimit);
+    }
+    
+    /*
+     * Override this method to flush any remaining batched row in the
+     * PreparedStatementManager.
+     */
+    protected Collection flush(RowManager rowMgr,
+        PreparedStatementManager psMgr, Collection exceps) {
+        Collection rtnCol = super.flush(rowMgr, psMgr, exceps);
+        BatchingPreparedStatementManagerImpl bPsMgr = 
+            (BatchingPreparedStatementManagerImpl) psMgr;
+        bPsMgr.flushBatch();
+        return rtnCol;
     }
 }

Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java?rev=633317&r1=633316&r2=633317&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
(original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
Mon Mar  3 14:59:07 2008
@@ -18,20 +18,13 @@
  */
 package org.apache.openjpa.jdbc.kernel;
 
-import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.Statement;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
+import java.util.List;
 
 import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
 import org.apache.openjpa.jdbc.meta.ClassMapping;
@@ -42,7 +35,6 @@
 import org.apache.openjpa.kernel.OpenJPAStateManager;
 import org.apache.openjpa.lib.log.Log;
 import org.apache.openjpa.lib.util.Localizer;
-import org.apache.openjpa.util.ApplicationIds;
 import org.apache.openjpa.util.OptimisticException;
 
 /**
@@ -59,7 +51,8 @@
     private final static Localizer _loc = Localizer
             .forPackage(BatchingPreparedStatementManagerImpl.class);
 
-    private Map _cacheSql = null;
+    private String _batchedSql = null;
+    private List _batchedRows = new ArrayList();
     private int _batchLimit;
     private boolean _disableBatch = false;
     private transient Log _log = null;
@@ -68,8 +61,7 @@
      * Constructor. Supply connection.
      */
     public BatchingPreparedStatementManagerImpl(JDBCStore store,
-            Connection conn, int batchLimit) {
-
+        Connection conn, int batchLimit) {
         super(store, conn);
         _batchLimit = batchLimit;
         _log = store.getConfiguration().getLog(JDBCConfiguration.LOG_JDBC);
@@ -78,202 +70,158 @@
     }
 
     /**
-     * Flush the given row. This method will cache the statement in a cache. The
-     * statement will be executed in the flush() method.
+     * Flush the given row immediately or deferred the flush in batch.
      */
-    protected void flushInternal(RowImpl row) throws SQLException {
-        if (_batchLimit == 0 || _disableBatch) {
-            super.flushInternal(row);
-            return;
-        }
-        Column[] autoAssign = null;
-        if (row.getAction() == Row.ACTION_INSERT)
-            autoAssign = row.getTable().getAutoAssignedColumns();
-
-        // prepare statement
-        String sql = row.getSQL(_dict);
-        OpenJPAStateManager sm = row.getPrimaryKey();
-        ClassMapping cmd = null;
-        if (sm != null)
-            cmd = (ClassMapping) sm.getMetaData();
-        // validate batch capability
-        _disableBatch = _dict.validateBatchProcess(row, autoAssign, sm, cmd);
-
-        // process the sql statement, either execute it immediately or
-        // cache them.
-        processSql(sql, row);
-
-        // set auto assign values
-        if (autoAssign != null && autoAssign.length > 0 && sm != null)
{
-            Object val;
-            for (int i = 0; i < autoAssign.length; i++) {
-                val = _dict.getGeneratedKey(autoAssign[i], _conn);
-                cmd.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm,
-                        _store, autoAssign[i], val);
-            }
-            sm.setObjectId(ApplicationIds.create(sm.getPersistenceCapable(),
-                    cmd));
-        }
-    }
-
-    private void processSql(String sql, RowImpl row) throws SQLException {
-        ArrayList temprow;
-
-        if (_cacheSql == null)
-            _cacheSql = Collections.synchronizedMap(new LinkedHashMap());
-        if (_disableBatch) {
+    protected void flushAndUpdate(RowImpl row) throws SQLException {
+        if (isBatchDisabled(row)) {
             // if there were some statements batched before, then
             // we need to flush them out first before processing the
             // current non batch process.
-            if (!_cacheSql.isEmpty())
-                flush();
-            execute(sql, row);
+            flushBatch();
 
+            super.flushAndUpdate(row);
         } else {
-            // else start batch support. If the sql string is in the cache,
-            // just adds the row to the cache
-            if (_cacheSql.containsKey(sql)) {
-                temprow = (ArrayList) _cacheSql.get(sql);
-                temprow.add(row);
-                _cacheSql.put(sql, temprow);
-            } else {
-                // no sql exists in the cache, cache the sql string and its rows
-                ArrayList inputrow = new ArrayList();
-                inputrow.add(row);
-                _cacheSql.put(sql, inputrow);
-            }
-        } // end of batch support
-    }
-
-    private void execute(String sql, RowImpl row) throws SQLException {
-        PreparedStatement stmnt = null;
-        try {
-            ResultSet rs = null;
-            stmnt = _conn.prepareStatement(sql);
-            row.flush(stmnt, _dict, _store);
-            int count = stmnt.executeUpdate();
-            if (count != 1) {
-                Object failed = row.getFailedObject();
-                if (failed != null)
-                    _exceptions.add(new OptimisticException(failed));
-                else if (row.getAction() == Row.ACTION_INSERT)
-                    throw new SQLException(_loc.get(
-                            "update-failed-no-failed-obj",
-                            String.valueOf(count), sql).getMessage());
-            }
-        } catch (SQLException se) {
-            throw SQLExceptions.getStore(se, row.getFailedObject(), _dict);
-        } finally {
-            try {
-                if (stmnt != null)
-                    stmnt.close();
-            } catch (SQLException se) {
-                // ignore the exception for this case.
+            // process the SQL statement, either execute it immediately or
+            // batch it for later execution.
+            String sql = row.getSQL(_dict);
+            if (_batchedSql == null) {
+                // brand new SQL
+                _batchedSql = sql;
+            } else if (!sql.equals(_batchedSql)) {
+                // SQL statements changed.
+                switch (_batchedRows.size()) {
+                case 0:
+                    break;
+                case 1:
+                    // single entry in cache, direct SQL execution. 
+                    super.flushAndUpdate((RowImpl) _batchedRows.get(0));
+                    _batchedRows.clear();
+                    break;
+                default:
+                    // flush all entries in cache in batch.
+                    flushBatch();
+                }
+                _batchedSql = sql;
             }
+            _batchedRows.add(row);
         }
     }
 
-    public void flush() {
-        PreparedStatement ps = null;
-        ArrayList list;
-        RowImpl onerow = null;
-
-        // go thru the cache to process all the sql stmt.
-        if (_cacheSql == null || _cacheSql.isEmpty()) {
-            super.flush();
-            return;
+    /*
+     * Compute if batching is disabled, based on values of batch limit
+     * and database characteristics.
+     */
+    private boolean isBatchDisabled(RowImpl row) {
+        boolean rtnVal = true;
+        if (_batchLimit != 0 && !_disableBatch) {
+            String sql = row.getSQL(_dict);
+            OpenJPAStateManager sm = row.getPrimaryKey();
+            ClassMapping cmd = null;
+            if (sm != null)
+                cmd = (ClassMapping) sm.getMetaData();
+            Column[] autoAssign = null;
+            if (row.getAction() == Row.ACTION_INSERT)
+                autoAssign = row.getTable().getAutoAssignedColumns();
+            // validate batch capability
+            _disableBatch = _dict
+                .validateBatchProcess(row, autoAssign, sm, cmd);
+            rtnVal = _disableBatch;
         }
-        Set e = _cacheSql.keySet();
-
-        for (Iterator itr = e.iterator(); itr.hasNext();) {
-            String key = (String) itr.next();
-            try {
-                ps = _conn.prepareStatement(key);
-            } catch (SQLException se) {
-                throw SQLExceptions.getStore(se, ps, _dict);
-            }
-            list = (ArrayList) _cacheSql.get(key);
-            if (list == null) {
-                return;
-            }
-
-            // if only 1 row for this statement, then execute it right away
-            int rowsize = list.size();
-
+        return rtnVal;
+    }
+    
+    /**
+     * flush all cached up statements to be executed as a single or batched
+     * prepared statements.
+     */
+    protected void flushBatch() {
+        if (_batchedSql != null && _batchedRows.size() > 0) {
+            PreparedStatement ps = null;
             try {
-                if (rowsize == 1) {
-                    onerow = (RowImpl) list.get(0);
-                    onerow.flush(ps, _dict, _store);
-                    int count = ps.executeUpdate();
-                    if (count != 1) {
-                        Object failed = onerow.getFailedObject();
-                        if (failed != null)
-                            _exceptions.add(new OptimisticException(failed));
-                        else if (onerow.getAction() == Row.ACTION_INSERT)
-                            throw new SQLException(_loc.get(
-                                    "update-failed-no-failed-obj",
-                                    String.valueOf(count), key).getMessage());
-                    }
+                RowImpl onerow = null;
+                ps = _conn.prepareStatement(_batchedSql);
+                if (_batchedRows.size() == 1) {
+                    // execute a single row.
+                    onerow = (RowImpl) _batchedRows.get(0);
+                    flushSingleRow(onerow, ps);
                 } else {
-                    // has more than one rows for this statement, use addBatch
+                    // cache has more than one rows, execute as batch.
                     int count = 0;
-                    for (int i = 0; i < list.size(); i++) {
-                        onerow = (RowImpl) list.get(i);
-                        if (count < _batchLimit || _batchLimit == -1) {
-                            onerow.flush(ps, _dict, _store);
-                            ps.addBatch();
-                            count++;
-
+                    int batchedRowsBaseIndex = 0;
+                    Iterator itr = _batchedRows.iterator();
+                    while (itr.hasNext()) {
+                        onerow = (RowImpl) itr.next();
+                        if (_batchLimit == 1) {
+                            flushSingleRow(onerow, ps);
                         } else {
-                            // reach the batchLimit , execute it
-                            try {
+                            if (count < _batchLimit || _batchLimit == -1) {
+                                onerow.flush(ps, _dict, _store);
+                                ps.addBatch();
+                                count++;
+                            } else {
+                                // reach the batchLimit, execute the batch
                                 int[] rtn = ps.executeBatch();
-                                checkUpdateCount(rtn, onerow, key);
-                            } catch (BatchUpdateException bex) {
-                                SQLException sqex = bex.getNextException();
-                                if (sqex == null)
-                                    sqex = bex;
-                                throw SQLExceptions.getStore(sqex, ps, _dict);
+                                checkUpdateCount(rtn, batchedRowsBaseIndex);
+
+                                batchedRowsBaseIndex += _batchLimit;
+
+                                onerow.flush(ps, _dict, _store);
+                                ps.addBatch();
+                                // reset the count to 1 for new batch
+                                count = 1;
                             }
-                            onerow.flush(ps, _dict, _store);
-                            ps.addBatch();
-                            count = 1; // reset the count to 1 for new batch
                         }
                     }
                     // end of the loop, execute the batch
-                    try {
-                        int[] rtn = ps.executeBatch();
-                        checkUpdateCount(rtn, onerow, key);
-                    } catch (BatchUpdateException bex) {
-                        SQLException sqex = bex.getNextException();
-                        if (sqex == null)
-                            sqex = bex;
-                        throw SQLExceptions.getStore(sqex, ps, _dict);
-                    }
+                    int[] rtn = ps.executeBatch();
+                    checkUpdateCount(rtn, batchedRowsBaseIndex);
                 }
             } catch (SQLException se) {
                 SQLException sqex = se.getNextException();
                 if (sqex == null)
                     sqex = se;
                 throw SQLExceptions.getStore(sqex, ps, _dict);
-            }
-            try {
-                ps.close();
-            } catch (SQLException sqex) {
-                throw SQLExceptions.getStore(sqex, ps, _dict);
+            } finally {
+                _batchedSql = null;
+                _batchedRows.clear();
+                if (ps != null) {
+                    try {
+                        ps.close();
+                    } catch (SQLException sqex) {
+                        throw SQLExceptions.getStore(sqex, ps, _dict);
+                    }
+                }
             }
         }
-        // instead of calling _cacheSql.clear, null it out to improve the
-        // performance.
-        _cacheSql = null;
     }
 
-    private void checkUpdateCount(int[] count, RowImpl row, String sql)
-            throws SQLException {
+    /*
+     * Execute an update of a single row.
+     */
+    private void flushSingleRow(RowImpl row, PreparedStatement ps)
+        throws SQLException {
+        row.flush(ps, _dict, _store);
+        int count = ps.executeUpdate();
+        if (count != 1) {
+            Object failed = row.getFailedObject();
+            if (failed != null)
+                _exceptions.add(new OptimisticException(failed));
+            else if (row.getAction() == Row.ACTION_INSERT)
+                throw new SQLException(_loc.get("update-failed-no-failed-obj",
+                    String.valueOf(count), row.getSQL(_dict)).getMessage());
+        }
+    }
+
+    /*
+     * Process executeBatch function array of return counts.
+     */
+    private void checkUpdateCount(int[] count, int batchedRowsBaseIndex)
+        throws SQLException {
         int cnt = 0;
         Object failed = null;
         for (int i = 0; i < count.length; i++) {
             cnt = count[i];
+            RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i);
             switch (cnt) {
             case Statement.EXECUTE_FAILED: // -3
                 failed = row.getFailedObject();
@@ -281,21 +229,21 @@
                     _exceptions.add(new OptimisticException(failed));
                 else if (row.getAction() == Row.ACTION_INSERT)
                     throw new SQLException(_loc.get(
-                            "update-failed-no-failed-obj",
-                            String.valueOf(count[i]), sql).getMessage());
+                        "update-failed-no-failed-obj",
+                        String.valueOf(count[i]), _batchedSql).getMessage());
                 break;
             case Statement.SUCCESS_NO_INFO: // -2
                 if (_log.isTraceEnabled())
                     _log.trace(_loc.get("batch_update_info",
-                            String.valueOf(cnt), sql).getMessage());
+                        String.valueOf(cnt), _batchedSql).getMessage());
                 break;
             case 0: // no row is inserted, treats it as failed
                 // case
                 failed = row.getFailedObject();
                 if ((failed != null || row.getAction() == Row.ACTION_INSERT))
                     throw new SQLException(_loc.get(
-                            "update-failed-no-failed-obj",
-                            String.valueOf(count[i]), sql).getMessage());
+                        "update-failed-no-failed-obj",
+                        String.valueOf(count[i]), _batchedSql).getMessage());
             }
         }
     }

Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java?rev=633317&r1=633316&r2=633317&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java
(original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedStatementManagerImpl.java
Mon Mar  3 14:59:07 2008
@@ -86,10 +86,33 @@
         if (row.getAction() == Row.ACTION_INSERT)
             autoAssign = row.getTable().getAutoAssignedColumns();
 
+        flushAndUpdate(row);
+
+        // set auto assign values
+        if (autoAssign != null && autoAssign.length > 0
+            && row.getPrimaryKey() != null) {
+            OpenJPAStateManager sm = row.getPrimaryKey();
+            ClassMapping mapping = (ClassMapping) sm.getMetaData();
+            Object val;
+            for (int i = 0; i < autoAssign.length; i++) {
+                val = _dict.getGeneratedKey(autoAssign[i], _conn);
+                mapping.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm,
+                    _store, autoAssign[i], val);
+            }
+            sm.setObjectId(
+                ApplicationIds.create(sm.getPersistenceCapable(), mapping));
+        }
+    }
+
+    /**
+     * Flush the given row immediately. 
+     */
+    protected void flushAndUpdate(RowImpl row)
+        throws SQLException {
         // prepare statement
         String sql = row.getSQL(_dict);
         PreparedStatement stmnt = prepareStatement(sql);
-        
+
         // setup parameters and execute statement
         if (stmnt != null)
             row.flush(stmnt, _dict, _store);
@@ -107,23 +130,12 @@
         } catch (SQLException se) {
             throw SQLExceptions.getStore(se, row.getFailedObject(), _dict);
         } finally {
-            if (stmnt != null)
-               try { stmnt.close(); } catch (SQLException se) {}
-        }
-
-        // set auto assign values
-        if (autoAssign != null && autoAssign.length > 0
-            && row.getPrimaryKey() != null) {
-            OpenJPAStateManager sm = row.getPrimaryKey();
-            ClassMapping mapping = (ClassMapping) sm.getMetaData();
-            Object val;
-            for (int i = 0; i < autoAssign.length; i++) {
-                val = _dict.getGeneratedKey(autoAssign[i], _conn);
-                mapping.assertJoinable(autoAssign[i]).setAutoAssignedValue(sm,
-                    _store, autoAssign[i], val);
+            if (stmnt != null) {
+                try {
+                    stmnt.close();
+                } catch (SQLException se) {
+                }
             }
-            sm.setObjectId(
-                ApplicationIds.create(sm.getPersistenceCapable(), mapping));
         }
     }
 
@@ -146,5 +158,5 @@
     protected PreparedStatement prepareStatement(String sql)
         throws SQLException {
         return _conn.prepareStatement(sql);
-    }    
+    }
 }



Mime
View raw message