Author: fancy
Date: Fri May 9 14:34:29 2008
New Revision: 654942
URL: http://svn.apache.org/viewvc?rev=654942&view=rev
Log:
OPENJPA-598 Make BatchingPreparedStatementManagerImpl more flexible and extensible, Sub-task
of OPENJPA-477
Committing patch provided by Fay Wang
Modified:
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
Modified: openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java?rev=654942&r1=654941&r2=654942&view=diff
==============================================================================
--- openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
(original)
+++ openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/BatchingPreparedStatementManagerImpl.java
Fri May 9 14:34:29 2008
@@ -83,28 +83,32 @@
} else {
// process the SQL statement, either execute it immediately or
// batch it for later execution.
- String sql = row.getSQL(_dict);
- if (_batchedSql == null) {
- // brand new SQL
- _batchedSql = sql;
- } else if (!sql.equals(_batchedSql)) {
- // SQL statements changed.
- switch (_batchedRows.size()) {
- case 0:
- break;
- case 1:
- // single entry in cache, direct SQL execution.
- super.flushAndUpdate((RowImpl) _batchedRows.get(0));
- _batchedRows.clear();
- break;
- default:
- // flush all entries in cache in batch.
- flushBatch();
- }
- _batchedSql = sql;
+ batchOrExecuteRow(row);
+ }
+ }
+
+ protected void batchOrExecuteRow(RowImpl row) throws SQLException {
+ String sql = row.getSQL(_dict);
+ if (_batchedSql == null) {
+ // brand new SQL
+ _batchedSql = sql;
+ } else if (!sql.equals(_batchedSql)) {
+ // SQL statements changed.
+ switch (_batchedRows.size()) {
+ case 0:
+ break;
+ case 1:
+ // single entry in cache, direct SQL execution.
+ super.flushAndUpdate((RowImpl) _batchedRows.get(0));
+ _batchedRows.clear();
+ break;
+ default:
+ // flush all entries in cache in batch.
+ flushBatch();
}
- _batchedRows.add(row);
+ _batchedSql = sql;
}
+ _batchedRows.add(row);
}
/*
@@ -113,8 +117,7 @@
*/
private boolean isBatchDisabled(RowImpl row) {
boolean rtnVal = true;
- if (_batchLimit != 0 && !_disableBatch) {
- String sql = row.getSQL(_dict);
+ if (getBatchLimit() != 0 && !isBatchDisabled()) {
OpenJPAStateManager sm = row.getPrimaryKey();
ClassMapping cmd = null;
if (sm != null)
@@ -123,9 +126,9 @@
if (row.getAction() == Row.ACTION_INSERT)
autoAssign = row.getTable().getAutoAssignedColumns();
// validate batch capability
- _disableBatch = _dict
+ rtnVal = _dict
.validateBatchProcess(row, autoAssign, sm, cmd);
- rtnVal = _disableBatch;
+ setBatchDisabled(rtnVal);
}
return rtnVal;
}
@@ -135,45 +138,53 @@
* prepared statements.
*/
protected void flushBatch() {
- if (_batchedSql != null && _batchedRows.size() > 0) {
+ List batchedRows = getBatchedRows();
+ String batchedSql = getBatchedSql();
+ if (batchedRows == null)
+ return;
+
+ int batchSize = batchedRows.size();
+ if (batchedSql != null && batchSize > 0) {
PreparedStatement ps = null;
try {
RowImpl onerow = null;
- ps = _conn.prepareStatement(_batchedSql);
- if (_batchedRows.size() == 1) {
+ ps = prepareStatement(batchedSql);
+ if (batchSize == 1) {
// execute a single row.
- onerow = (RowImpl) _batchedRows.get(0);
+ onerow = (RowImpl) batchedRows.get(0);
flushSingleRow(onerow, ps);
} else {
// cache has more than one rows, execute as batch.
int count = 0;
int batchedRowsBaseIndex = 0;
- Iterator itr = _batchedRows.iterator();
+ Iterator itr = batchedRows.iterator();
while (itr.hasNext()) {
onerow = (RowImpl) itr.next();
if (_batchLimit == 1) {
flushSingleRow(onerow, ps);
} else {
if (count < _batchLimit || _batchLimit == -1) {
- onerow.flush(ps, _dict, _store);
- ps.addBatch();
+ if (ps != null)
+ onerow.flush(ps, _dict, _store);
+ addBatch(ps, onerow, count);
count++;
} else {
// reach the batchLimit, execute the batch
- int[] rtn = ps.executeBatch();
+ int[] rtn = executeBatch(ps);
checkUpdateCount(rtn, batchedRowsBaseIndex);
batchedRowsBaseIndex += _batchLimit;
- onerow.flush(ps, _dict, _store);
- ps.addBatch();
+ if (ps != null)
+ onerow.flush(ps, _dict, _store);
+ addBatch(ps, onerow, count);
// reset the count to 1 for new batch
count = 1;
}
}
}
// end of the loop, execute the batch
- int[] rtn = ps.executeBatch();
+ int[] rtn = executeBatch(ps);
checkUpdateCount(rtn, batchedRowsBaseIndex);
}
} catch (SQLException se) {
@@ -183,7 +194,7 @@
throw SQLExceptions.getStore(sqex, ps, _dict);
} finally {
_batchedSql = null;
- _batchedRows.clear();
+ batchedRows.clear();
if (ps != null) {
try {
ps.close();
@@ -200,8 +211,9 @@
*/
private void flushSingleRow(RowImpl row, PreparedStatement ps)
throws SQLException {
- row.flush(ps, _dict, _store);
- int count = ps.executeUpdate();
+ if (ps != null)
+ row.flush(ps, _dict, _store);
+ int count = executeUpdate(ps, row.getSQL(_dict), row);
if (count != 1) {
Object failed = row.getFailedObject();
if (failed != null)
@@ -219,9 +231,10 @@
throws SQLException {
int cnt = 0;
Object failed = null;
+ List batchedRows = getBatchedRows();
for (int i = 0; i < count.length; i++) {
cnt = count[i];
- RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i);
+ RowImpl row = (RowImpl) batchedRows.get(batchedRowsBaseIndex + i);
failed = row.getFailedObject();
switch (cnt) {
case Statement.EXECUTE_FAILED: // -3
@@ -230,14 +243,16 @@
else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get(
"update-failed-no-failed-obj",
- String.valueOf(count[i]), _batchedSql).getMessage());
+ String.valueOf(count[i]),
+ row.getSQL(_dict)).getMessage());
break;
case Statement.SUCCESS_NO_INFO: // -2
if (failed != null || row.getAction() == Row.ACTION_UPDATE)
_exceptions.add(new OptimisticException(failed));
else if (_log.isTraceEnabled())
_log.trace(_loc.get("batch_update_info",
- String.valueOf(cnt), _batchedSql).getMessage());
+ String.valueOf(cnt),
+ row.getSQL(_dict)).getMessage());
break;
case 0: // no row is inserted, treats it as failed
// case
@@ -246,8 +261,43 @@
else if (row.getAction() == Row.ACTION_INSERT)
throw new SQLException(_loc.get(
"update-failed-no-failed-obj",
- String.valueOf(count[i]), _batchedSql).getMessage());
+ String.valueOf(count[i]),
+ row.getSQL(_dict)).getMessage());
}
}
}
+
+ public boolean isBatchDisabled() {
+ return _disableBatch;
+ }
+
+ public void setBatchDisabled(boolean disableBatch) {
+ _disableBatch = disableBatch;
+ }
+
+ public int getBatchLimit() {
+ return _batchLimit;
+ }
+
+ public void setBatchLimit(int batchLimit) {
+ _batchLimit = batchLimit;
+ }
+
+ public List getBatchedRows() {
+ return _batchedRows;
+ }
+
+ public String getBatchedSql() {
+ return _batchedSql;
+ }
+
+ protected void addBatch(PreparedStatement ps, RowImpl row,
+ int count) throws SQLException {
+ ps.addBatch();
+ }
+
+ protected int[] executeBatch(PreparedStatement ps)
+ throws SQLException {
+ return ps.executeBatch();
+ }
}
|