Author: reschke
Date: Mon Jun 25 12:43:55 2012
New Revision: 1353495
URL: http://svn.apache.org/viewvc?rev=1353495&view=rev
Log:
JCR-2950: avoid parallel updates for the same node id (note this patch implements three different strategies for testing purposes)
Modified:
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java?rev=1353495&r1=1353494&r2=1353495&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java Mon Jun 25 12:43:55 2012
@@ -16,6 +16,12 @@
*/
package org.apache.jackrabbit.core.security.authorization.acl;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.jcr.RepositoryException;
+
import org.apache.jackrabbit.core.NodeImpl;
import org.apache.jackrabbit.core.SessionImpl;
import org.apache.jackrabbit.core.cache.GrowingLRUMap;
@@ -24,9 +30,6 @@ import org.apache.jackrabbit.core.securi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jcr.RepositoryException;
-import java.util.Map;
-
/**
* CachingEntryCollector
extends EntryCollector
by
* keeping a cache of ACEs per access controlled nodeId.
@@ -45,9 +48,12 @@ class CachingEntryCollector extends Entr
*/
private final EntryCache cache;
+ private ConcurrentMap futures = new ConcurrentHashMap();
+ private final String strategy;
+
/**
* Create a new instance.
- *
+ *
* @param systemSession A system session.
* @param rootID The id of the root node.
* @throws RepositoryException If an error occurs.
@@ -55,6 +61,13 @@ class CachingEntryCollector extends Entr
CachingEntryCollector(SessionImpl systemSession, NodeId rootID) throws RepositoryException {
super(systemSession, rootID);
cache = new EntryCache();
+
+ // for testing purposes, see JCR-2950
+ String propname = "org.apache.jackrabbit.core.security.authorization.acl.CachingEntryCollector.strategy";
+ strategy = System.getProperty(propname, "T");
+ if (!("S".equals(strategy) || "T".equals(strategy) || "P".equals(strategy))) {
+ throw new RepositoryException("Invalid value " + strategy + " specified for system property " + propname);
+ }
}
@Override
@@ -95,12 +108,12 @@ class CachingEntryCollector extends Entr
/**
* Read the entries defined for the specified node and update the cache
* accordingly.
- *
+ *
* @param node The target node
* @return The list of entries present on the specified node or an empty list.
* @throws RepositoryException If an error occurs.
*/
- private Entries updateCache(NodeImpl node) throws RepositoryException {
+ private Entries internalUpdateCache(NodeImpl node) throws RepositoryException {
Entries entries = super.getEntries(node);
if (!entries.isEmpty()) {
// adjust the 'nextId' to point to the next access controlled
@@ -112,6 +125,77 @@ class CachingEntryCollector extends Entr
}
/**
+ * Update cache for the given node id
+ * @param node The target node
+ * @return The list of entries present on the specified node or an empty list.
+ * @throws RepositoryException
+ */
+ private Entries updateCache(NodeImpl node) throws RepositoryException {
+ if ("T".equals(strategy)) {
+ return throttledUpdateCache(node);
+ } else if ("S".equals(strategy)) {
+ return synchronizedUpdateCache(node);
+ } else if ("P".equals(strategy)) {
+ return parallelUpdateCache(node);
+ } else {
+ // panic
+ throw new RuntimeException("invalid value for updateCacheStrategy: " + strategy);
+ }
+ }
+
+ /**
+ * See {@link CachingEntryCollector#updateCache(NodeImpl)} ; this variant runs fully synchronized
+ */
+ synchronized private Entries synchronizedUpdateCache(NodeImpl node) throws RepositoryException {
+ return internalUpdateCache(node);
+ }
+
+ /**
+ * See {@link CachingEntryCollector#updateCache(NodeImpl)} ; this variant runs fully parallel
+ */
+ private Entries parallelUpdateCache(NodeImpl node) throws RepositoryException {
+ return internalUpdateCache(node);
+ }
+
+ /**
+ * See {@link CachingEntryCollector#updateCache(NodeImpl)} ; this variant blocks the current
+ * thread if a concurrent update for the same node id takes place
+ */
+ private Entries throttledUpdateCache(NodeImpl node) throws RepositoryException {
+ NodeId id = node.getNodeId();
+ FutureEntries fe = null;
+ FutureEntries nfe = new FutureEntries();
+ boolean found = true;
+
+ fe = futures.putIfAbsent(id, nfe);
+ if (fe == null) {
+ found = false;
+ fe = nfe;
+ }
+
+ if (found) {
+ // we have found a previous FutureEntries object, so use it
+ return fe.get();
+ } else {
+ // otherwise obtain result and when done notify waiting FutureEntries
+ try {
+ Entries e = internalUpdateCache(node);
+ futures.remove(id);
+ fe.setResult(e);
+ return e;
+ } catch (Throwable problem) {
+ futures.remove(id);
+ fe.setProblem(problem);
+ if (problem instanceof RepositoryException) {
+ throw (RepositoryException)problem;
+ } else {
+ throw new RuntimeException(problem);
+ }
+ }
+ }
+ }
+
+ /**
* Find the next access control ancestor in the hierarchy 'null' indicates
* that there is no ac-controlled ancestor.
*
@@ -153,7 +237,7 @@ class CachingEntryCollector extends Entr
/**
* Evaluates if the given node is access controlled and holds a non-empty
* rep:policy child node.
- *
+ *
* @param n The node to test.
* @return true if the specified node is access controlled and holds a
* non-empty policy child node.
@@ -206,7 +290,46 @@ class CachingEntryCollector extends Entr
super.notifyListeners(modifications);
}
- //--------------------------------------------------------------------------
+ /**
+ * A place holder for a yet to be computed {@link Entries} result
+ */
+ private class FutureEntries {
+
+ private boolean ready = false;
+ private Entries result = null;
+ private Throwable problem = null;
+
+ synchronized public Entries get() throws RepositoryException {
+ while (!ready) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ if (problem != null) {
+ if (problem instanceof RepositoryException) {
+ throw new RepositoryException(problem);
+ }
+ else {
+ throw new RuntimeException(problem);
+ }
+ }
+ return result;
+ }
+
+ synchronized public void setResult(Entries e) {
+ result = e;
+ ready = true;
+ notifyAll();
+ }
+
+ synchronized public void setProblem(Throwable t) {
+ problem = t;
+ ready = true;
+ notifyAll();
+ }
+ }
+
/**
* A cache to lookup the ACEs defined on a given (access controlled)
* node. The internal map uses the ID of the node as key while the value