Author: reschke Date: Mon Jun 25 12:43:55 2012 New Revision: 1353495 URL: http://svn.apache.org/viewvc?rev=1353495&view=rev Log: JCR-2950: avoid parallel updates for the same node id (note this patch implements three different strategies for testing purposes) Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java?rev=1353495&r1=1353494&r2=1353495&view=diff ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/security/authorization/acl/CachingEntryCollector.java Mon Jun 25 12:43:55 2012 @@ -16,6 +16,12 @@ */ package org.apache.jackrabbit.core.security.authorization.acl; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.jcr.RepositoryException; + import org.apache.jackrabbit.core.NodeImpl; import org.apache.jackrabbit.core.SessionImpl; import org.apache.jackrabbit.core.cache.GrowingLRUMap; @@ -24,9 +30,6 @@ import org.apache.jackrabbit.core.securi import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jcr.RepositoryException; -import java.util.Map; - /** * CachingEntryCollector extends EntryCollector by * keeping a cache of ACEs per access controlled nodeId. @@ -45,9 +48,12 @@ class CachingEntryCollector extends Entr */ private final EntryCache cache; + private ConcurrentMap futures = new ConcurrentHashMap(); + private final String strategy; + /** * Create a new instance. - * + * * @param systemSession A system session. * @param rootID The id of the root node. * @throws RepositoryException If an error occurs. @@ -55,6 +61,13 @@ class CachingEntryCollector extends Entr CachingEntryCollector(SessionImpl systemSession, NodeId rootID) throws RepositoryException { super(systemSession, rootID); cache = new EntryCache(); + + // for testing purposes, see JCR-2950 + String propname = "org.apache.jackrabbit.core.security.authorization.acl.CachingEntryCollector.strategy"; + strategy = System.getProperty(propname, "T"); + if (!("S".equals(strategy) || "T".equals(strategy) || "P".equals(strategy))) { + throw new RepositoryException("Invalid value " + strategy + " specified for system property " + propname); + } } @Override @@ -95,12 +108,12 @@ class CachingEntryCollector extends Entr /** * Read the entries defined for the specified node and update the cache * accordingly. - * + * * @param node The target node * @return The list of entries present on the specified node or an empty list. * @throws RepositoryException If an error occurs. */ - private Entries updateCache(NodeImpl node) throws RepositoryException { + private Entries internalUpdateCache(NodeImpl node) throws RepositoryException { Entries entries = super.getEntries(node); if (!entries.isEmpty()) { // adjust the 'nextId' to point to the next access controlled @@ -112,6 +125,77 @@ class CachingEntryCollector extends Entr } /** + * Update cache for the given node id + * @param node The target node + * @return The list of entries present on the specified node or an empty list. + * @throws RepositoryException + */ + private Entries updateCache(NodeImpl node) throws RepositoryException { + if ("T".equals(strategy)) { + return throttledUpdateCache(node); + } else if ("S".equals(strategy)) { + return synchronizedUpdateCache(node); + } else if ("P".equals(strategy)) { + return parallelUpdateCache(node); + } else { + // panic + throw new RuntimeException("invalid value for updateCacheStrategy: " + strategy); + } + } + + /** + * See {@link CachingEntryCollector#updateCache(NodeImpl)} ; this variant runs fully synchronized + */ + synchronized private Entries synchronizedUpdateCache(NodeImpl node) throws RepositoryException { + return internalUpdateCache(node); + } + + /** + * See {@link CachingEntryCollector#updateCache(NodeImpl)} ; this variant runs fully parallel + */ + private Entries parallelUpdateCache(NodeImpl node) throws RepositoryException { + return internalUpdateCache(node); + } + + /** + * See {@link CachingEntryCollector#updateCache(NodeImpl)} ; this variant blocks the current + * thread if a concurrent update for the same node id takes place + */ + private Entries throttledUpdateCache(NodeImpl node) throws RepositoryException { + NodeId id = node.getNodeId(); + FutureEntries fe = null; + FutureEntries nfe = new FutureEntries(); + boolean found = true; + + fe = futures.putIfAbsent(id, nfe); + if (fe == null) { + found = false; + fe = nfe; + } + + if (found) { + // we have found a previous FutureEntries object, so use it + return fe.get(); + } else { + // otherwise obtain result and when done notify waiting FutureEntries + try { + Entries e = internalUpdateCache(node); + futures.remove(id); + fe.setResult(e); + return e; + } catch (Throwable problem) { + futures.remove(id); + fe.setProblem(problem); + if (problem instanceof RepositoryException) { + throw (RepositoryException)problem; + } else { + throw new RuntimeException(problem); + } + } + } + } + + /** * Find the next access control ancestor in the hierarchy 'null' indicates * that there is no ac-controlled ancestor. * @@ -153,7 +237,7 @@ class CachingEntryCollector extends Entr /** * Evaluates if the given node is access controlled and holds a non-empty * rep:policy child node. - * + * * @param n The node to test. * @return true if the specified node is access controlled and holds a * non-empty policy child node. @@ -206,7 +290,46 @@ class CachingEntryCollector extends Entr super.notifyListeners(modifications); } - //-------------------------------------------------------------------------- + /** + * A place holder for a yet to be computed {@link Entries} result + */ + private class FutureEntries { + + private boolean ready = false; + private Entries result = null; + private Throwable problem = null; + + synchronized public Entries get() throws RepositoryException { + while (!ready) { + try { + wait(); + } catch (InterruptedException e) { + } + } + if (problem != null) { + if (problem instanceof RepositoryException) { + throw new RepositoryException(problem); + } + else { + throw new RuntimeException(problem); + } + } + return result; + } + + synchronized public void setResult(Entries e) { + result = e; + ready = true; + notifyAll(); + } + + synchronized public void setProblem(Throwable t) { + problem = t; + ready = true; + notifyAll(); + } + } + /** * A cache to lookup the ACEs defined on a given (access controlled) * node. The internal map uses the ID of the node as key while the value