river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peter_firmst...@apache.org
Subject svn commit: r1468119 [12/15] - in /river/jtsk/skunk/qa_refactor/trunk: qa/src/com/sun/jini/qa/harness/ qa/src/com/sun/jini/test/impl/mahalo/ qa/src/com/sun/jini/test/resources/ qa/src/com/sun/jini/test/share/ qa/src/com/sun/jini/test/spec/javaspace/con...
Date Mon, 15 Apr 2013 15:26:46 GMT
Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/StorableEventWatcher.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/StorableEventWatcher.java?rev=1468119&r1=1468118&r2=1468119&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/StorableEventWatcher.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/StorableEventWatcher.java Mon Apr 15 15:26:44 2013
@@ -1,152 +1,152 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.outrigger;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.IOException;
-import java.io.StreamCorruptedException;
-import java.rmi.MarshalledObject;
-import java.rmi.RemoteException;
-import net.jini.core.event.RemoteEventListener;
-import net.jini.id.Uuid;
-import net.jini.id.UuidFactory;
-import net.jini.security.ProxyPreparer;
-
-/**
- * Subclass of <code>EventRegistrationWatcher</code> for non-transactional
- * persistent event registrations.
- */
-class StorableEventWatcher extends EventRegistrationWatcher
-    implements StorableResource
-{
-    /** The listener that should be notified of matches */
-    private StorableReference listener;
-
-    /**
-     * Used during log recovery to create a mostly empty
-     * <code>StorableEventWatcher</code>.  
-     * <p> 
-     * Note, we set the time stamp and tie-breaker here instead of
-     * getting them from the log. This means they will be inconstant
-     * with their value from the last VM we were in, but since they
-     * never leak out and events are read-only anyway this should not
-     * be a problem (this also allows us to keep the tie-breaker and
-     * time stamp in final fields).
-     *
-     * @param timestamp the value that is used
-     *        to sort <code>TransitionWatcher</code>s.
-     * @param startOrdinal the highest ordinal associated
-     *        with operations that are considered to have occurred 
-     *        before the operation associated with this watcher.
-     * @param currentSeqNum Sequence number to start with.
-     * @throws NullPointerException if the <code>notifier</code>
-     *         argument is null.  
-     */
-    StorableEventWatcher(long timestamp, long startOrdinal,
-			 long currentSeqNum) 
-    {
-	super(timestamp, startOrdinal, currentSeqNum);
-    }
-
-    /**
-     * Create a new <code>StorableEventWatcher</code>.
-     * @param timestamp the value that is used
-     *        to sort <code>TransitionWatcher</code>s.
-     * @param startOrdinal the highest ordinal associated
-     *        with operations that are considered to have occurred 
-     *        before the operation associated with this watcher.
-     * @param cookie The unique identifier associated
-     *        with this watcher. Must not be <code>null</code>.
-     * @param handback The handback object that
-     *        should be sent along with event
-     *        notifications to the the listener.
-     * @param eventID The event ID for event type
-     *        represented by this object. 
-     * @param listener The object to notify of
-     *        matches.
-     * @throws NullPointerException if the <code>cookie</code>,
-     *        or <code>listener</code> arguments are <code>null</code>.
-     */
-    StorableEventWatcher(long timestamp, long startOrdinal, Uuid cookie, 
-			 MarshalledObject handback, long eventID, 
-			 RemoteEventListener listener)
-    {
-	super(timestamp, startOrdinal, cookie, handback, eventID);
-
-	if (listener == null)
-	    throw new NullPointerException("listener must be non-null");
-	this.listener = new StorableReference(listener);
-    }
-    
-    boolean isInterested(EntryTransition transition, long ordinal) {
-	return ((ordinal > startOrdinal) &&
-		(transition.getTxn() == null) &&
-		(transition.isNewEntry()));
-    }
-
-    RemoteEventListener getListener(ProxyPreparer preparer) 
-	throws ClassNotFoundException, IOException
-    {
-	return (RemoteEventListener)listener.get(preparer);
-    }
-
-    /**
-     * Overridden by subclasses if there is any cleanup work they need
-     * to do as part of <code>cancel</code> or
-     * <code>removeIfExpired</code>. Called after releasing the lock
-     * on <code>this</code>.  Will be called at most once.  
-     * @param owner A reference to the owner.
-     * @param expired <code>true</code> if being called from 
-     *        <code>removeIfExpired</code> and false otherwise. 
-     */
-    void cleanup(TemplateHandle owner, boolean expired) {
-	if (expired)
-	    owner.getServer().scheduleCancelOp(cookie);
-	else 
-	    owner.getServer().cancelOp(cookie, false);
-    }
-
-
-    /**  
-     * Store the persistent fields 
-     */
-    public void store(ObjectOutputStream out) throws IOException {
-	cookie.write(out);
-	out.writeLong(expiration);
-	out.writeLong(eventID);
-	out.writeObject(handback);
-	out.writeObject(listener);
-    }
-
-    /**
-     * Restore the persistent fields
-     */
-    public void restore(ObjectInputStream in) 
-	throws IOException, ClassNotFoundException 
-    {
-	cookie = UuidFactory.read(in);
-	expiration = in.readLong();
-	eventID = in.readLong();
-	handback = (MarshalledObject)in.readObject();	
-	listener = (StorableReference)in.readObject();
-	if (listener == null)
-	    throw new StreamCorruptedException(
-		"Stream corrupted, should not be null");
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.IOException;
+import java.io.StreamCorruptedException;
+import java.rmi.MarshalledObject;
+import java.rmi.RemoteException;
+import net.jini.core.event.RemoteEventListener;
+import net.jini.id.Uuid;
+import net.jini.id.UuidFactory;
+import net.jini.security.ProxyPreparer;
+
+/**
+ * Subclass of <code>EventRegistrationWatcher</code> for non-transactional
+ * persistent event registrations.
+ */
+class StorableEventWatcher extends EventRegistrationWatcher
+    implements StorableResource
+{
+    /** The listener that should be notified of matches */
+    private volatile StorableReference listener;
+
+    /**
+     * Used during log recovery to create a mostly empty
+     * <code>StorableEventWatcher</code>.  
+     * <p> 
+     * Note, we set the time stamp and tie-breaker here instead of
+     * getting them from the log. This means they will be inconstant
+     * with their value from the last VM we were in, but since they
+     * never leak out and events are read-only anyway this should not
+     * be a problem (this also allows us to keep the tie-breaker and
+     * time stamp in final fields).
+     *
+     * @param timestamp the value that is used
+     *        to sort <code>TransitionWatcher</code>s.
+     * @param startOrdinal the highest ordinal associated
+     *        with operations that are considered to have occurred 
+     *        before the operation associated with this watcher.
+     * @param currentSeqNum Sequence number to start with.
+     * @throws NullPointerException if the <code>notifier</code>
+     *         argument is null.  
+     */
+    StorableEventWatcher(long timestamp, long startOrdinal,
+			 long currentSeqNum) 
+    {
+	super(timestamp, startOrdinal, currentSeqNum);
+    }
+
+    /**
+     * Create a new <code>StorableEventWatcher</code>.
+     * @param timestamp the value that is used
+     *        to sort <code>TransitionWatcher</code>s.
+     * @param startOrdinal the highest ordinal associated
+     *        with operations that are considered to have occurred 
+     *        before the operation associated with this watcher.
+     * @param cookie The unique identifier associated
+     *        with this watcher. Must not be <code>null</code>.
+     * @param handback The handback object that
+     *        should be sent along with event
+     *        notifications to the the listener.
+     * @param eventID The event ID for event type
+     *        represented by this object. 
+     * @param listener The object to notify of
+     *        matches.
+     * @throws NullPointerException if the <code>cookie</code>,
+     *        or <code>listener</code> arguments are <code>null</code>.
+     */
+    StorableEventWatcher(long timestamp, long startOrdinal, Uuid cookie, 
+			 MarshalledObject handback, long eventID, 
+			 RemoteEventListener listener)
+    {
+	super(timestamp, startOrdinal, cookie, handback, eventID);
+
+	if (listener == null)
+	    throw new NullPointerException("listener must be non-null");
+	this.listener = new StorableReference(listener);
+    }
+    
+    boolean isInterested(EntryTransition transition, long ordinal) {
+	return ((ordinal > startOrdinal) &&
+		(transition.getTxn() == null) &&
+		(transition.isNewEntry()));
+    }
+
+    RemoteEventListener getListener(ProxyPreparer preparer) 
+	throws ClassNotFoundException, IOException
+    {
+	return (RemoteEventListener)listener.get(preparer);
+    }
+
+    /**
+     * Overridden by subclasses if there is any cleanup work they need
+     * to do as part of <code>cancel</code> or
+     * <code>removeIfExpired</code>. Called after releasing the lock
+     * on <code>this</code>.  Will be called at most once.  
+     * @param owner A reference to the owner.
+     * @param expired <code>true</code> if being called from 
+     *        <code>removeIfExpired</code> and false otherwise. 
+     */
+    synchronized void cleanup(TemplateHandle owner, boolean expired) {
+	if (expired)
+	    owner.getServer().scheduleCancelOp(cookie);
+	else 
+	    owner.getServer().cancelOp(cookie, false);
+    }
+
+
+    /**  
+     * Store the persistent fields 
+     */
+    public synchronized void store(ObjectOutputStream out) throws IOException {
+	cookie.write(out);
+	out.writeLong(expiration);
+	out.writeLong(eventID);
+	out.writeObject(handback);
+	out.writeObject(listener);
+    }
+
+    /**
+     * Restore the persistent fields
+     */
+    public synchronized void restore(ObjectInputStream in) 
+	throws IOException, ClassNotFoundException 
+    {
+	cookie = UuidFactory.read(in);
+	expiration = in.readLong();
+	eventID = in.readLong();
+	handback = (MarshalledObject)in.readObject();	
+	listener = (StorableReference)in.readObject();
+	if (listener == null)
+	    throw new StreamCorruptedException(
+		"Stream corrupted, should not be null");
+    }
+}

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java?rev=1468119&r1=1468118&r2=1468119&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java Mon Apr 15 15:26:44 2013
@@ -1,228 +1,237 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.outrigger;
-
-import com.sun.jini.config.Config;
-import com.sun.jini.thread.TaskManager;
-import com.sun.jini.thread.WakeupManager;
-
-import net.jini.config.Configuration;
-import net.jini.config.ConfigurationException;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * This class provides a driver for monitoring the state of transactions
- * that have blocked progress of other operations recently.  It creates
- * tasks that monitor each transaction by intermittently querying the
- * transaction's state.  If it finds that the transaction has aborted,
- * it makes sure that the local space aborts the transaction, too, so
- * that operations will cease to be blocked by the transaction.
- *
- * @author Sun Microsystems, Inc.
- *
- * @see TxnMonitorTask
- * @see OutriggerServerImpl#monitor
- */
-class TxnMonitor implements Runnable {
-    /**
-     * Each <code>ToMonitor</code> object represents a need to monitor
-     * the given transactions, possibly under a lease.
-     *
-     * @see #pending
-     */
-    private static class ToMonitor {
-	QueryWatcher	query;         // query governing interest in txns
-	Collection	txns;	       // the transactions to monitor
-
-	ToMonitor(QueryWatcher query, Collection txns) {
-	    this.query = query;
-	    this.txns = txns;
-	}
-    }
-
-    /**
-     * This list is used to contain requests to monitor interfering
-     * transactions.  We use a list like this so that the
-     * <code>getMatch</code> request that detected the conflict
-     * doesn't have to wait for all the setup before returning -- it
-     * just puts the data on this list and the <code>TxnMonitor</code>
-     * pulls it off using its own thread.
-     * 
-     * @see OutriggerServerImpl#getMatch 
-     */
-    // @see #ToMonitor
-    private LinkedList pending = new LinkedList();
-
-    /** wakeup manager for <code>TxnMonitorTask</code>s */
-    private final WakeupManager wakeupMgr = 
-	new WakeupManager(new WakeupManager.ThreadDesc(null, true));
-
-    /**
-     * The manager for <code>TxnMonitorTask</code> objects.
-     */
-    private TaskManager taskManager;
-
-    /**
-     * The space we belong to.  Needed for aborts.
-     */
-    private OutriggerServerImpl	space;
-
-    /**
-     * The thread running us.
-     */
-    private Thread ourThread;
-
-    /** Set when we are told to stop */
-    private boolean die = false;
-
-    /** Logger for logging transaction related information */
-    private static final Logger logger = 
-	Logger.getLogger(OutriggerServerImpl.txnLoggerName);
-
-    /**
-     * Create a new TxnMonitor.
-     */
-    TxnMonitor(OutriggerServerImpl space, Configuration config)
-	throws ConfigurationException 
-    {
-	if (space == null)
-	    throw new NullPointerException("space must be non-null");
-	this.space = space;
-
-	taskManager = (TaskManager)Config.getNonNullEntry(config,
-	    OutriggerServerImpl.COMPONENT_NAME, "txnMonitorTaskManager", 
-	    TaskManager.class, new TaskManager());
-
-        ourThread = new Thread(this, "TxnMonitor");
-	ourThread.setDaemon(true);
-        ourThread.start();
-    }
-
-    public void destroy() {
-        taskManager.terminate();
-	wakeupMgr.stop();	
-
-	synchronized (this) {
-	    die = true;
-	    notifyAll();
-	}
-
-        try {
-	    ourThread.join();
-	} catch(InterruptedException ie) {
-	    // ignore
-	}
-    }
-
-    /**
-     * Return the space we're part of.
-     */
-    OutriggerServerImpl space() {
-	return space;
-    }
-
-    /**
-     * Add a set of <code>transactions</code> to be monitored under the
-     * given query.
-     */
-    synchronized void add(QueryWatcher query, Collection transactions) {
-	if (logger.isLoggable(Level.FINEST)) {
-	    final StringBuffer buf = new StringBuffer();
-	    buf.append("Setting up monitor for ");
-	    buf.append(query);
-	    buf.append(" toMonitor:");
-	    boolean notFirst = false;
-	    for (Iterator i=transactions.iterator(); i.hasNext();) {
-		if (notFirst) {
-		    buf.append(",");
-		    notFirst = true;
-		}
-		buf.append(i.next());
-	    }
-	    logger.log(Level.FINEST, buf.toString());
-	}
-
-	pending.add(new ToMonitor(query, transactions));
-	notifyAll();
-    }
-
-    /**
-     * Add a set of <code>transactions</code> to be monitored under no
-     * lease.
-     */
-    void add(Collection transactions) {
-	add(null, transactions);
-    }
-
-    /**
-     * Take pending monitor requests off the queue, creating the
-     * required <code>TxnMonitorTask</code> objects and scheduling them.
-     */
-    public void run() {
-	try {
-	    ToMonitor tm;
-	    for (;;)  {
-		synchronized (this) {
-		
-		    // Sleep if nothing is pending.
-		    while (pending.isEmpty() && !die)
-			wait();
-
-		    if (die)
-			return;
-
-		    tm = (ToMonitor)pending.removeFirst();
-		}
-
-		logger.log(Level.FINER, "creating monitor tasks for {0}",
-			   tm.query);
-
-		Iterator it = tm.txns.iterator();
-		while (it.hasNext()) {
-		    Txn txn = (Txn) it.next();
-		    TxnMonitorTask task = taskFor(txn);
-		    task.add(tm.query);
-		}
-	    }
-	} catch (InterruptedException e) {
-	    return;
-	}
-    }
-
-    /**
-     * Return the monitor task for this transaction, creating it if
-     * necessary.
-     */
-    private TxnMonitorTask taskFor(Txn txn) {
-	TxnMonitorTask task = txn.monitorTask();
-	if (task == null) {
-	    logger.log(Level.FINER, "creating TxnMonitorTask for {0}", 
-			   txn);
-
-	    task = new TxnMonitorTask(txn, this, taskManager, wakeupMgr);
-	    txn.monitorTask(task);
-	    taskManager.add(task);  // add it after we've set it in the txn
-	}
-	return task;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger;
+
+import com.sun.jini.config.Config;
+import com.sun.jini.thread.TaskManager;
+import com.sun.jini.thread.WakeupManager;
+
+import net.jini.config.Configuration;
+import net.jini.config.ConfigurationException;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This class provides a driver for monitoring the state of transactions
+ * that have blocked progress of other operations recently.  It creates
+ * tasks that monitor each transaction by intermittently querying the
+ * transaction's state.  If it finds that the transaction has aborted,
+ * it makes sure that the local space aborts the transaction, too, so
+ * that operations will cease to be blocked by the transaction.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @see TxnMonitorTask
+ * @see OutriggerServerImpl#monitor
+ */
+class TxnMonitor implements Runnable {
+    /**
+     * Each <code>ToMonitor</code> object represents a need to monitor
+     * the given transactions, possibly under a lease.
+     *
+     * @see #pending
+     */
+    private static class ToMonitor {
+	final QueryWatcher	query;         // query governing interest in txns
+	final Collection	txns;	       // the transactions to monitor
+
+	ToMonitor(QueryWatcher query, Collection txns) {
+	    this.query = query;
+	    this.txns = txns;
+	}
+    }
+
+    /**
+     * This list is used to contain requests to monitor interfering
+     * transactions.  We use a list like this so that the
+     * <code>getMatch</code> request that detected the conflict
+     * doesn't have to wait for all the setup before returning -- it
+     * just puts the data on this list and the <code>TxnMonitor</code>
+     * pulls it off using its own thread.
+     * 
+     * @see OutriggerServerImpl#getMatch 
+     */
+    // @see #ToMonitor
+    private final LinkedList pending = new LinkedList();
+
+    /** wakeup manager for <code>TxnMonitorTask</code>s */
+    private final WakeupManager wakeupMgr = 
+	new WakeupManager(new WakeupManager.ThreadDesc(null, true));
+
+    /**
+     * The manager for <code>TxnMonitorTask</code> objects.
+     */
+    private final TaskManager taskManager;
+
+    /**
+     * The space we belong to.  Needed for aborts.
+     */
+    private final OutriggerServerImpl	space;
+
+    /**
+     * The thread running us.
+     */
+    private final Thread ourThread;
+
+    /** Set when we are told to stop */
+    private volatile boolean die = false;
+
+    private volatile boolean started = false;
+
+    /** Logger for logging transaction related information */
+    private static final Logger logger = 
+	Logger.getLogger(OutriggerServerImpl.txnLoggerName);
+
+    /**
+     * Create a new TxnMonitor.
+     */
+    TxnMonitor(OutriggerServerImpl space, Configuration config)
+	throws ConfigurationException 
+    {
+	if (space == null)
+	    throw new NullPointerException("space must be non-null");
+	this.space = space;
+
+	taskManager = (TaskManager)Config.getNonNullEntry(config,
+	    OutriggerServerImpl.COMPONENT_NAME, "txnMonitorTaskManager", 
+	    TaskManager.class, new TaskManager());
+
+        ourThread = new Thread(this, "TxnMonitor");
+	ourThread.setDaemon(true);
+//        ourThread.start();
+    }
+    
+    public void start(){
+        synchronized (this){
+        ourThread.start();
+            started = true;
+    }
+    }
+
+    public void destroy() {
+        taskManager.terminate();
+	wakeupMgr.stop();	
+
+	synchronized (this) {
+	    die = true;
+	    notifyAll();
+	}
+
+        try {
+	    if (started) ourThread.join();
+	} catch(InterruptedException ie) {
+	    // ignore
+	}
+    }
+
+    /**
+     * Return the space we're part of.
+     */
+    OutriggerServerImpl space() {
+	return space;
+    }
+
+    /**
+     * Add a set of <code>transactions</code> to be monitored under the
+     * given query.
+     */
+    synchronized void add(QueryWatcher query, Collection transactions) {
+	if (logger.isLoggable(Level.FINEST)) {
+	    final StringBuffer buf = new StringBuffer();
+	    buf.append("Setting up monitor for ");
+	    buf.append(query);
+	    buf.append(" toMonitor:");
+	    boolean notFirst = false;
+	    for (Iterator i=transactions.iterator(); i.hasNext();) {
+		if (notFirst) {
+		    buf.append(",");
+		    notFirst = true;
+		}
+		buf.append(i.next());
+	    }
+	    logger.log(Level.FINEST, buf.toString());
+	}
+
+	pending.add(new ToMonitor(query, transactions));
+	notifyAll();
+    }
+
+    /**
+     * Add a set of <code>transactions</code> to be monitored under no
+     * lease.
+     */
+    void add(Collection transactions) {
+	add(null, transactions);
+    }
+
+    /**
+     * Take pending monitor requests off the queue, creating the
+     * required <code>TxnMonitorTask</code> objects and scheduling them.
+     */
+    public void run() {
+	try {
+	    ToMonitor tm;
+	    for (;;)  {
+		synchronized (this) {
+		
+		    // Sleep if nothing is pending.
+		    while (pending.isEmpty() && !die)
+			wait();
+
+		    if (die)
+			return;
+
+		    tm = (ToMonitor)pending.removeFirst();
+		}
+
+		logger.log(Level.FINER, "creating monitor tasks for {0}",
+			   tm.query);
+
+		Iterator it = tm.txns.iterator();
+		while (it.hasNext()) {
+		    Txn txn = (Txn) it.next();
+		    TxnMonitorTask task = taskFor(txn);
+		    task.add(tm.query);
+		}
+	    }
+	} catch (InterruptedException e) {
+	    return;
+	}
+    }
+
+    /**
+     * Return the monitor task for this transaction, creating it if
+     * necessary.
+     */
+    private TxnMonitorTask taskFor(Txn txn) {
+	TxnMonitorTask task = txn.monitorTask();
+	if (task == null) {
+	    logger.log(Level.FINER, "creating TxnMonitorTask for {0}", 
+			   txn);
+
+	    task = new TxnMonitorTask(txn, this, taskManager, wakeupMgr);
+	    txn.monitorTask(task);
+	    taskManager.add(task);  // add it after we've set it in the txn
+	}
+	return task;
+    }
+}

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TypeTree.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TypeTree.java?rev=1468119&r1=1468118&r2=1468119&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TypeTree.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TypeTree.java Mon Apr 15 15:26:44 2013
@@ -1,247 +1,249 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.outrigger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Random;
-import java.util.Vector;
-
-/**
- * A type tree for entries.  It maintains, for each class, a list of
- * known subclasses so that we can walk down the relevant subpart of
- * the subtype tree for a template, looking for matching entries.  This
- * list of subtypes is not current garbage collected -- if a subtype
- * was once written, it's subtype entry will never be removed from this
- * tree.  All operations are done via class name.
- *
- * @author Sun Microsystems, Inc.
- *
- * @see OutriggerServerImpl
- */
-class TypeTree {
-    /** For each type, a vector of known subtypes */
-    private Hashtable subclasses = new Hashtable();
-
-    /**
-     * A generator used to randomize the order of iterator returns
-     */
-    // @see #RandomizedIterator
-    static final private Random numgen = new Random();
-
-    /**
-     * Name of the root bucket of the type tree
-     */
-    static final private String ROOT = 
-	net.jini.core.entry.Entry.class.getName();
-
-    /**
-     * Return the vector of subclasses for the given class.
-     */
-    private Vector classVector(String whichClass) {
-	return (Vector) subclasses.get(whichClass);
-    }
-
-    /**
-     * An iterator that will walk through a list of known types.
-     */
-    // @see #RandomizedIterator
-    private abstract class TypeTreeIterator implements Iterator {
-	protected int cursor;		// the current position in the list
-	protected Object[] typearray;	// the list of types as an array
-
-	// inherit doc comment
-        public boolean hasNext() {
-            if (cursor < typearray.length)
-                return true;
- 
-            return false;
-        }
- 
-	// inherit doc comment
-        public Object next() throws NoSuchElementException {
-            Object val = null;
- 
-            if (cursor >= typearray.length)
-                throw new NoSuchElementException("TypeTreeIterator: next");
- 
-            try {
-                val = typearray[cursor];
-                cursor++;
-            } catch (ArrayIndexOutOfBoundsException e) {
-                throw new NoSuchElementException("TypeTreeIterator: next" +
-                                                        e.getMessage());
-            }
-
-            return val;
-        }
- 
-	/**
-	 * Unimplemented operations
-	 * @throws UnsupportedOperationException Always
-	 */
-        public void remove() throws UnsupportedOperationException {
-            throw new UnsupportedOperationException(
-		"TypeTreeIterator: remove not supported");
-        }
-    }
-
-    /**
-     * This class implements a randomized iterator over the
-     * <code>TypeTree</code>.  Given a <code>className</code>, it
-     * maintains a randomized list of subtypes for the given
-     * <code>className</code>, including the class itself.
-     */
-    class RandomizedIterator extends TypeTreeIterator {
-	/**
-	 * Create a new <code>RandomizedIterator</code> for the given
-	 * class.
-	 */
-	RandomizedIterator(String className) {
-	    super();
-	    init(className);
-	}
-
-
-	/*
-	 * Traverse the given type tree and add to the list all the
-	 * subtypes encountered within.
-	 */
-	private void walkTree(Collection children, Collection list) {
-	    if (children != null) {
-		list.addAll(children);
-	        Object[] kids = children.toArray();
-		for (int i = 0; i< kids.length; i++) {
-		    walkTree(classVector((String)kids[i]), list);
-		}
-	    }
-	}
-
-	/**
-	 * Set up this iterator to walk over the subtypes of this class,
-	 * including the class itself.  It then randomizes the list.
-	 */
-	private void init(String className) {
-            Collection types = new ArrayList();
-
-	    if (className.equals(EntryRep.matchAnyClassName())) {
-		// handle "match any" specially" -- search from ROOT
-		// Simplification suggested by 
-		// Lutz Birkhahn <lutz.birkhahn@GMX.DE>
-		className = ROOT;
-	    } else {
-		// add this class
-		types.add(className);
-	    }
-
-	    // add all subclasses
-	    walkTree(classVector(className), types);
-
-	    // Convert it to an array and then randomize
-	    typearray = types.toArray();
-	    int randnum = 0;
-	    Object tmpobj = null;
-
-	    for (int i = 0; i < typearray.length; i++) {
-		randnum = numgen.nextInt(typearray.length - i);
-		tmpobj = typearray[i];
-		typearray[i] = typearray[randnum];
-		typearray[randnum] = tmpobj;
-	    }
-	}
-    }
-
-    /**
-     * Return an iterator over the subtypes of the given class
-     * (including the type itself).  This implementation always returns
-     * an iterator that randomizes the order of the classes returned.
-     * In other words, it returns the names of all classes that are
-     * instances of the class that named, in a random ordering.
-     */
-    Iterator subTypes(String className) {
-	return new RandomizedIterator(className);
-    }
-
-    /**
-     * Update the type tree with the given bits.  This will traverse the
-     * given EntryRep's list of superclasses, retrieve the subclass list
-     * at each list item and update it with the given EntryRep's type.
-     *
-     *  SupClass List
-     *   |
-     *   V
-     *   SupC1-->Sub1OfSupC1--Sub2OfSupC1...SubNOfSupC1--EntryRep
-     *   |
-     *   |
-     *   SupC2-->Sub1OfSupC2--Sub2OfSupC2...SubNOfSupC2--EntryRep
-     *   .
-     *   .
-     *   .
-     *   SupCN-->Sub1OfSupCN--Sub2OfSupCN...SubNOfSupCN--EntryRep
-     */
-    void addTypes(EntryRep bits) {
-	String classFor = bits.classFor();
-	String[] superclasses = bits.superclasses();
-
-	//The given EntryRep will add its className to the
-	//subtype list of all its supertypes.
-
-	String prevClass = classFor;
-	for (int i = 0; i < superclasses.length; i++) {
-	    if (!addKnown(superclasses[i], prevClass)) {
-		return;
-	    }
-	    prevClass = superclasses[i];
-	}
-
-	// If we are here prevClass must have java.Object as its
-	// direct superclass (we don't store "java.Object" in
-	// EntryRep.superclasses since that would be redundant) and
-	// prevClass is not already in the the tree.  Place it in the
-	// "net.jini.core.entry.Entry" bucket so it does not get lost
-	// if it does not have any sub-classes.
-	//
-	// Fix suggested by Lutz Birkhahn <lutz.birkhahn@GMX.DE>
-	addKnown(ROOT, prevClass);
-    }
-
-    /**
-     * Add the subclass to the list of known subclasses of this superclass.  
-     */
-    private boolean addKnown(String superclass, String subclass) {
-	Vector v;
-
-	synchronized (subclasses) {
-	    v = classVector(superclass);
-	    if (v == null) {
-		v = new Vector();
-		subclasses.put(superclass, v);
-	    }
-	}
-
-	synchronized (v) {
-	    if (v.contains(subclass))
-		return false;
-	    v.addElement(subclass);
-	}
-	return true;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.Vector;
+
+/**
+ * A type tree for entries.  It maintains, for each class, a list of
+ * known subclasses so that we can walk down the relevant subpart of
+ * the subtype tree for a template, looking for matching entries.  This
+ * list of subtypes is not current garbage collected -- if a subtype
+ * was once written, it's subtype entry will never be removed from this
+ * tree.  All operations are done via class name.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @see OutriggerServerImpl
+ */
+class TypeTree {
+    /** For each type, a vector of known subtypes */
+    private final Hashtable<String,Vector> subclasses = new Hashtable<String,Vector>();
+
+    /**
+     * A generator used to randomize the order of iterator returns
+     */
+    // @see #RandomizedIterator
+    static final private Random numgen = new Random();
+
+    /**
+     * Name of the root bucket of the type tree
+     */
+    static final private String ROOT = 
+	net.jini.core.entry.Entry.class.getName();
+
+    /**
+     * Return the vector of subclasses for the given class.
+     */
+    private Vector classVector(String whichClass) {
+        synchronized (subclasses){
+	return (Vector) subclasses.get(whichClass);
+    }
+    }
+
+    /**
+     * An iterator that will walk through a list of known types.
+     */
+    // @see #RandomizedIterator
+    private abstract class TypeTreeIterator implements Iterator {
+	protected int cursor;		// the current position in the list
+	protected Object[] typearray;	// the list of types as an array
+
+	// inherit doc comment
+        public boolean hasNext() {
+            if (cursor < typearray.length)
+                return true;
+ 
+            return false;
+        }
+ 
+	// inherit doc comment
+        public Object next() throws NoSuchElementException {
+            Object val = null;
+ 
+            if (cursor >= typearray.length)
+                throw new NoSuchElementException("TypeTreeIterator: next");
+ 
+            try {
+                val = typearray[cursor];
+                cursor++;
+            } catch (ArrayIndexOutOfBoundsException e) {
+                throw new NoSuchElementException("TypeTreeIterator: next" +
+                                                        e.getMessage());
+            }
+
+            return val;
+        }
+ 
+	/**
+	 * Unimplemented operations
+	 * @throws UnsupportedOperationException Always
+	 */
+        public void remove() throws UnsupportedOperationException {
+            throw new UnsupportedOperationException(
+		"TypeTreeIterator: remove not supported");
+        }
+    }
+
+    /**
+     * This class implements a randomized iterator over the
+     * <code>TypeTree</code>.  Given a <code>className</code>, it
+     * maintains a randomized list of subtypes for the given
+     * <code>className</code>, including the class itself.
+     */
+    class RandomizedIterator extends TypeTreeIterator {
+	/**
+	 * Create a new <code>RandomizedIterator</code> for the given
+	 * class.
+	 */
+	RandomizedIterator(String className) {
+	    super();
+	    init(className);
+	}
+
+
+	/*
+	 * Traverse the given type tree and add to the list all the
+	 * subtypes encountered within.
+	 */
+	private void walkTree(Collection children, Collection list) {
+	    if (children != null) {
+		list.addAll(children);
+	        Object[] kids = children.toArray();
+		for (int i = 0; i< kids.length; i++) {
+		    walkTree(classVector((String)kids[i]), list);
+		}
+	    }
+	}
+
+	/**
+	 * Set up this iterator to walk over the subtypes of this class,
+	 * including the class itself.  It then randomizes the list.
+	 */
+	private void init(String className) {
+            Collection types = new ArrayList();
+
+	    if (className.equals(EntryRep.matchAnyClassName())) {
+		// handle "match any" specially" -- search from ROOT
+		// Simplification suggested by 
+		// Lutz Birkhahn <lutz.birkhahn@GMX.DE>
+		className = ROOT;
+	    } else {
+		// add this class
+		types.add(className);
+	    }
+
+	    // add all subclasses
+	    walkTree(classVector(className), types);
+
+	    // Convert it to an array and then randomize
+	    typearray = types.toArray();
+	    int randnum = 0;
+	    Object tmpobj = null;
+
+	    for (int i = 0; i < typearray.length; i++) {
+		randnum = numgen.nextInt(typearray.length - i);
+		tmpobj = typearray[i];
+		typearray[i] = typearray[randnum];
+		typearray[randnum] = tmpobj;
+	    }
+	}
+    }
+
+    /**
+     * Return an iterator over the subtypes of the given class
+     * (including the type itself).  This implementation always returns
+     * an iterator that randomizes the order of the classes returned.
+     * In other words, it returns the names of all classes that are
+     * instances of the class that named, in a random ordering.
+     */
+    Iterator subTypes(String className) {
+	return new RandomizedIterator(className);
+    }
+
+    /**
+     * Update the type tree with the given bits.  This will traverse the
+     * given EntryRep's list of superclasses, retrieve the subclass list
+     * at each list item and update it with the given EntryRep's type.
+     *
+     *  SupClass List
+     *   |
+     *   V
+     *   SupC1-->Sub1OfSupC1--Sub2OfSupC1...SubNOfSupC1--EntryRep
+     *   |
+     *   |
+     *   SupC2-->Sub1OfSupC2--Sub2OfSupC2...SubNOfSupC2--EntryRep
+     *   .
+     *   .
+     *   .
+     *   SupCN-->Sub1OfSupCN--Sub2OfSupCN...SubNOfSupCN--EntryRep
+     */
+    void addTypes(EntryRep bits) {
+	String classFor = bits.classFor();
+	String[] superclasses = bits.superclasses();
+
+	//The given EntryRep will add its className to the
+	//subtype list of all its supertypes.
+
+	String prevClass = classFor;
+	for (int i = 0; i < superclasses.length; i++) {
+	    if (!addKnown(superclasses[i], prevClass)) {
+		return;
+	    }
+	    prevClass = superclasses[i];
+	}
+
+	// If we are here prevClass must have java.Object as its
+	// direct superclass (we don't store "java.Object" in
+	// EntryRep.superclasses since that would be redundant) and
+	// prevClass is not already in the the tree.  Place it in the
+	// "net.jini.core.entry.Entry" bucket so it does not get lost
+	// if it does not have any sub-classes.
+	//
+	// Fix suggested by Lutz Birkhahn <lutz.birkhahn@GMX.DE>
+	addKnown(ROOT, prevClass);
+    }
+
+    /**
+     * Add the subclass to the list of known subclasses of this superclass.  
+     */
+    private boolean addKnown(String superclass, String subclass) {
+	Vector v;
+
+	synchronized (subclasses) {
+	    v = classVector(superclass);
+	    if (v == null) {
+		v = new Vector();
+		subclasses.put(superclass, v);
+	    }
+	}
+
+	synchronized (v) {
+	    if (v.contains(subclass))
+		return false;
+	    v.addElement(subclass);
+	}
+	return true;
+    }
+}

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java?rev=1468119&r1=1468118&r2=1468119&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/RetryTask.java Mon Apr 15 15:26:44 2013
@@ -1,253 +1,260 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.thread;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import com.sun.jini.constants.TimeConstants;
-
-/**
- * An abstract class for building tasks that retry on failure after a
- * timeout period.  This builds upon <code>TaskManager</code> for task
- * execution and <code>WakeupManager</code> for retry scheduling.  You
- * extend <code>RetryTask</code>, implementing <code>tryOnce</code> to
- * represent a single attempt at the task.  If <code>tryOnce</code>
- * returns <code>true</code>, the attempt was successful and the task
- * is complete.  If <code>tryOnce</code> returns <code>false</code>,
- * the task is scheduled for a future retry using <code>WakeupManager</code>.
- * <p>
- * The default retry times are defined by this class's implementation
- * of <code>retryTime</code></code>.  You can override this method to
- * change the retry times.
- * <p>
- * It is legal to reuse the same task again and again, but only after
- * the task is complete and <code>reset</code> has been called.
- * Inserting the same task multiple times before it has either been
- * cancelled or completed successfully will generate unpredictable
- * behavior.<p>
- *
- * This class uses the {@link Logger} named
- * <code>com.sun.jini.thread.RetryTask</code> to log information at
- * the following logging levels: <p>
- *
- * <table border=1 cellpadding=5
- *       summary="Describes logging performed by RetryTask at different
- *	          logging levels">
- *
- * <tr> <th> Level <th> Description
- *
- * <tr> <td> FINEST <td> after a failed attempt, when should 
- *                       the task be scheduled for a re-try
- *
- * </table>
- *
- * @author Sun Microsystems, Inc.
- *
- * @see TaskManager
- * @see WakeupManager
- */
-import com.sun.jini.thread.WakeupManager.Ticket;
-
-public abstract class RetryTask implements TaskManager.Task, TimeConstants {
-    private TaskManager	  manager;	// the TaskManager for this task
-    private RetryTime	  retry;	// the retry object for this task
-    private boolean	  cancelled;	// have we been cancelled?
-    private boolean	  complete;	// have we completed successfully?
-    private Ticket	  ticket;	// the WakeupManager ticket
-    private long	  startTime;	// the time when we were created or 
-                                        //   last reset
-    private int		  attempt;	// the current attempt number
-    private WakeupManager wakeup;       // WakeupManager for retry scheduling
-
-    /**
-     * Default delay backoff times.  These are converted from
-     * intervals to "time since start" by the static block below.
-     *
-     * @see #retryTime 
-     */
-    private static final long[] delays = {
-	 0, // First value is never read
-	 1 * SECONDS,
-	 5 * SECONDS,
-	10 * SECONDS,
-	 1 * MINUTES,
-	 1 * MINUTES,
-	 5 * MINUTES,
-    };
-
-    /** Logger for this class */
-    private static final Logger logger = 
-	Logger.getLogger("com.sun.jini.thread.RetryTask");
-
-    /**
-     * Create a new <code>RetryTask</code> that will be scheduled with
-     * the given task manager, and which will perform retry scheduling 
-     * using the given wakeup manager.
-     */
-    public RetryTask(TaskManager manager, WakeupManager wakeupManager) {
-	this.manager = manager;
-        this.wakeup = wakeupManager;
-	reset();
-    }
-
-    /**
-     * Make a single attempt.  Return <code>true</code> if the attempt
-     * was successful.  If the attempt is not successful, the task
-     * will be scheduled for a future retry.
-     */
-    public abstract boolean tryOnce();
-
-    /**
-     * The <code>run</code> method used as a
-     * <code>TaskManager&#046;Task</code>.  This invokes
-     * <code>tryOnce</code>.  If it is not successful, it schedules
-     * the task for a future retry at the time it gets by invoking
-     * <code>retryTime</code>.
-     *
-     * @see #tryOnce
-     * @see #startTime 
-     */
-    public void run() {
-	synchronized (this) {		// avoid retry if cancelled
-	    if (cancelled)		// if they cancelled
-		return;			// do nothing
-	}
-
-	boolean success = tryOnce();
-
-	synchronized (this) {
-	    if (!success) {		// if at first we don't succeed ...
-		attempt++;
-		long at = retryTime();	// ... try, try again
-
-		if (logger.isLoggable(Level.FINEST)) {
-		    logger.log(Level.FINEST, "retry of {0} in {1} ms", 
-		        new Object[]{this, 
-			    Long.valueOf(at - System.currentTimeMillis())});
-		}
-
-		if (retry == null)	// only create it if we need to
-		    retry = new RetryTime();
-		ticket = wakeup.schedule(at, retry);
-	    } else {
-		complete = true;
-		notifyAll();		// see waitFor()
-	    }
-	}
-    }
-
-    /**
-     * Return the next time at which we should make another attempt.
-     * This is <em>not</em> an interval, but the actual time.
-     * <p>
-     * The implementation is free to do as it pleases with the policy
-     * here.  The default implementation is to delay using intervals of
-     * 1 second, 5 seconds, 10 seconds, 1 minute, and 1 minute between
-     * attempts, and then retrying every five minutes forever.
-     * <p>
-     * The default implementation assumes it is being called from
-     * the default <code>run</code> method and that the current thread
-     * holds the lock on this object. If the caller does
-     * not own the lock the result is undefined and could result in an
-     * exception.
-     */
-    public synchronized long retryTime() {
-	int index = (attempt < delays.length ? attempt : delays.length - 1); 
-	return delays[index] + System.currentTimeMillis();
-    }
-
-    /**
-     * Return the time this task was created, or the last
-     * time {@link #reset reset} was called.
-     */
-    public synchronized long startTime() {
-	return startTime;
-    }
-
-    /**
-     * Return the attempt number, starting with zero.
-     */
-    public synchronized int attempt() {
-	return attempt;
-    }
-
-    /**
-     * Cancel the retrying of the task.  This ensures that there will be
-     * no further attempts to invoke <code>tryOnce</code>.  It will not
-     * interfere with any ongoing invocation of <code>tryOnce</code>
-     * unless a subclass overrides this to do so.  Any override of this
-     * method should invoke <code>super.cancel()</code>.
-     */
-    public synchronized void cancel() {
-	cancelled = true;
-	if (ticket != null)
-	    wakeup.cancel(ticket);
-	notifyAll();		// see waitFor()
-    }
-
-    /**
-     * Return <code>true</code> if <code>cancel</code> has been invoked.
-     */
-    public synchronized boolean cancelled() {
-	return cancelled;
-    }
-
-    /**
-     * Return <code>true</code> if <code>tryOnce</code> has returned
-     * successfully.
-     */
-    public synchronized boolean complete() {
-	return complete;
-    }
-
-    public synchronized boolean waitFor() throws InterruptedException {
-	while (!cancelled && !complete)
-	    wait();
-	return complete;
-    }
-
-    /**
-     * Reset values for a new use of this task.
-     */
-    public synchronized void reset() {
-	cancel();		// remove from the wakeup queue
-	startTime = System.currentTimeMillis();
-	cancelled = false;
-	complete = false;
-	ticket = null;
-	attempt = 0;
-    }
-
-    /**
-     * This is the runnable class for the <code>WakeupManager</code>,
-     * since we need different implementations of
-     * <code>WakeupManager&#046;run</code> and
-     * <code>TaskManager&#046;run</code>.  
-     */
-    private class RetryTime implements Runnable {
-	/**
-	 * Time to retry the task.
-	 */
-	public void run() {
-	    synchronized (RetryTask.this) {
-		ticket = null;
-	    }
-	    manager.add(RetryTask.this);
-	}
-    };
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.thread;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import com.sun.jini.constants.TimeConstants;
+
+/**
+ * An abstract class for building tasks that retry on failure after a
+ * timeout period.  This builds upon <code>TaskManager</code> for task
+ * execution and <code>WakeupManager</code> for retry scheduling.  You
+ * extend <code>RetryTask</code>, implementing <code>tryOnce</code> to
+ * represent a single attempt at the task.  If <code>tryOnce</code>
+ * returns <code>true</code>, the attempt was successful and the task
+ * is complete.  If <code>tryOnce</code> returns <code>false</code>,
+ * the task is scheduled for a future retry using <code>WakeupManager</code>.
+ * <p>
+ * The default retry times are defined by this class's implementation
+ * of <code>retryTime</code></code>.  You can override this method to
+ * change the retry times.
+ * <p>
+ * It is legal to reuse the same task again and again, but only after
+ * the task is complete and <code>reset</code> has been called.
+ * Inserting the same task multiple times before it has either been
+ * cancelled or completed successfully will generate unpredictable
+ * behavior.<p>
+ *
+ * This class uses the {@link Logger} named
+ * <code>com.sun.jini.thread.RetryTask</code> to log information at
+ * the following logging levels: <p>
+ *
+ * <table border=1 cellpadding=5
+ *       summary="Describes logging performed by RetryTask at different
+ *	          logging levels">
+ *
+ * <tr> <th> Level <th> Description
+ *
+ * <tr> <td> FINEST <td> after a failed attempt, when should 
+ *                       the task be scheduled for a re-try
+ *
+ * </table>
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @see TaskManager
+ * @see WakeupManager
+ */
+import com.sun.jini.thread.WakeupManager.Ticket;
+
+public abstract class RetryTask implements TaskManager.Task, TimeConstants {
+    private final TaskManager	  manager;	// the TaskManager for this task
+    private RetryTime	  retry;	// the retry object for this task
+    private boolean	  cancelled;	// have we been cancelled?
+    private boolean	  complete;	// have we completed successfully?
+    private Ticket	  ticket;	// the WakeupManager ticket
+    private long	  startTime;	// the time when we were created or 
+                                        //   last reset
+    private int		  attempt;	// the current attempt number
+    private final WakeupManager wakeup;       // WakeupManager for retry scheduling
+
+    /**
+     * Default delay backoff times.  These are converted from
+     * intervals to "time since start" by the static block below.
+     *
+     * @see #retryTime 
+     */
+    private static final long[] delays = {
+	 0, // First value is never read
+	 1 * SECONDS,
+	 5 * SECONDS,
+	10 * SECONDS,
+	 1 * MINUTES,
+	 1 * MINUTES,
+	 5 * MINUTES,
+    };
+
+    /** Logger for this class */
+    protected static final Logger logger = 
+	Logger.getLogger("com.sun.jini.thread.RetryTask");
+
+    /**
+     * Create a new <code>RetryTask</code> that will be scheduled with
+     * the given task manager, and which will perform retry scheduling 
+     * using the given wakeup manager.
+     */
+    public RetryTask(TaskManager manager, WakeupManager wakeupManager) {
+	this.manager = manager;
+        this.wakeup = wakeupManager;
+	reset();
+    }
+
+    /**
+     * Make a single attempt.  Return <code>true</code> if the attempt
+     * was successful.  If the attempt is not successful, the task
+     * will be scheduled for a future retry.
+     */
+    public abstract boolean tryOnce();
+
+    /**
+     * The <code>run</code> method used as a
+     * <code>TaskManager&#046;Task</code>.  This invokes
+     * <code>tryOnce</code>.  If it is not successful, it schedules
+     * the task for a future retry at the time it gets by invoking
+     * <code>retryTime</code>.
+     *
+     * @see #tryOnce
+     * @see #startTime 
+     */
+    public void run() {
+	synchronized (this) {		// avoid retry if cancelled
+	    if (cancelled)		// if they cancelled
+		return;			// do nothing
+	}
+
+	boolean success = false;
+        try {
+            success = tryOnce();
+        } catch (Throwable t){
+            t.printStackTrace(System.err);
+            if (t instanceof Error) throw (Error) t;
+            if (t instanceof RuntimeException) throw (RuntimeException) t;
+        }
+
+	synchronized (this) {
+	    if (!success) {		// if at first we don't succeed ...
+		attempt++;
+		long at = retryTime();	// ... try, try again
+
+		if (logger.isLoggable(Level.FINEST)) {
+		    logger.log(Level.FINEST, "retry of {0} in {1} ms", 
+		        new Object[]{this, 
+			    Long.valueOf(at - System.currentTimeMillis())});
+		}
+
+		if (retry == null)	// only create it if we need to
+		    retry = new RetryTime();
+		ticket = wakeup.schedule(at, retry);
+	    } else {
+		complete = true;
+		notifyAll();		// see waitFor()
+	    }
+	}
+    }
+
+    /**
+     * Return the next time at which we should make another attempt.
+     * This is <em>not</em> an interval, but the actual time.
+     * <p>
+     * The implementation is free to do as it pleases with the policy
+     * here.  The default implementation is to delay using intervals of
+     * 1 second, 5 seconds, 10 seconds, 1 minute, and 1 minute between
+     * attempts, and then retrying every five minutes forever.
+     * <p>
+     * The default implementation assumes it is being called from
+     * the default <code>run</code> method and that the current thread
+     * holds the lock on this object. If the caller does
+     * not own the lock the result is undefined and could result in an
+     * exception.
+     */
+    public synchronized long retryTime() {
+	int index = (attempt < delays.length ? attempt : delays.length - 1); 
+	return delays[index] + System.currentTimeMillis();
+    }
+
+    /**
+     * Return the time this task was created, or the last
+     * time {@link #reset reset} was called.
+     */
+    public synchronized long startTime() {
+	return startTime;
+    }
+
+    /**
+     * Return the attempt number, starting with zero.
+     */
+    public synchronized int attempt() {
+	return attempt;
+    }
+
+    /**
+     * Cancel the retrying of the task.  This ensures that there will be
+     * no further attempts to invoke <code>tryOnce</code>.  It will not
+     * interfere with any ongoing invocation of <code>tryOnce</code>
+     * unless a subclass overrides this to do so.  Any override of this
+     * method should invoke <code>super.cancel()</code>.
+     */
+    public synchronized void cancel() {
+	cancelled = true;
+	if (ticket != null)
+	    wakeup.cancel(ticket);
+	notifyAll();		// see waitFor()
+    }
+
+    /**
+     * Return <code>true</code> if <code>cancel</code> has been invoked.
+     */
+    public synchronized boolean cancelled() {
+	return cancelled;
+    }
+
+    /**
+     * Return <code>true</code> if <code>tryOnce</code> has returned
+     * successfully.
+     */
+    public synchronized boolean complete() {
+	return complete;
+    }
+
+    public synchronized boolean waitFor() throws InterruptedException {
+	while (!cancelled && !complete)
+	    wait();
+	return complete;
+    }
+
+    /**
+     * Reset values for a new use of this task.
+     */
+    public synchronized void reset() {
+	cancel();		// remove from the wakeup queue
+	startTime = System.currentTimeMillis();
+	cancelled = false;
+	complete = false;
+	ticket = null;
+	attempt = 0;
+    }
+
+    /**
+     * This is the runnable class for the <code>WakeupManager</code>,
+     * since we need different implementations of
+     * <code>WakeupManager&#046;run</code> and
+     * <code>TaskManager&#046;run</code>.  
+     */
+    private class RetryTime implements Runnable {
+	/**
+	 * Time to retry the task.
+	 */
+	public void run() {
+	    synchronized (RetryTask.this) {
+		ticket = null;
+	    }
+	    manager.add(RetryTask.this);
+	}
+    };
+}

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/TaskManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/TaskManager.java?rev=1468119&r1=1468118&r2=1468119&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/TaskManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/TaskManager.java Mon Apr 15 15:26:44 2013
@@ -1,342 +1,346 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.sun.jini.thread;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A task manager manages a single queue of tasks, and some number of
- * worker threads.  New tasks are added to the tail of the queue.  Each
- * thread loops, taking a task from the queue and running it.  Each
- * thread looks for a task by starting at the head of the queue and
- * taking the first task (that is not already being worked on) that is
- * not required to run after any of the tasks that precede it in
- * the queue (including tasks that are currently being worked on).
- * <p>
- * This class uses the {@link Logger} named
- * <code>com.sun.jini.thread.TaskManager</code> to log information at
- * the following logging levels:
- * <p>
- * <table border=1 cellpadding=5
- *       summary="Describes logging performed by TaskManager at different
- *	          logging levels">
- * <caption halign="center" valign="top"><b><code>
- *	   com.sun.jini.thread.TaskManager</code></b></caption>
- * <tr><th>Level<th>Description
- * <tr><td>{@link Level#SEVERE SEVERE}<td>
- * failure to create a worker thread when no other worker threads exist
- * <tr><td>{@link Level#WARNING WARNING}<td>
- * exceptions thrown by {@link TaskManager.Task} methods, and failure
- * to create a worker thread when other worker threads exist
- * </table>
- *
- * @author Sun Microsystems, Inc.
- * @deprecated will be removed from River 2.4.0 onward
- */
-@Deprecated
-public class TaskManager {
-
-    /** The interface that tasks must implement */
-    public interface Task extends Runnable {
-	/**
-	 * Return true if this task must be run after at least one task
-	 * in the given task list with an index less than size (size may be
-	 * less then tasks.size()).  Using List.get will be more efficient
-	 * than List.iterator.
-	 *
-	 * @param tasks the tasks to consider.  A read-only List, with all
-	 * elements instanceof Task.
-	 * @param size elements with index less than size should be considered
-	 */
-	boolean runAfter(List tasks, int size);
-    }
-
-    /** Logger */
-    protected static final Logger logger =
-	Logger.getLogger("com.sun.jini.thread.TaskManager");
-
-    /** Active and pending tasks */
-    protected final ArrayList tasks = new ArrayList();
-    /** Index of the first pending task; all earlier tasks are active */
-    protected int firstPending = 0;
-    /** Read-only view of tasks */
-    protected final List roTasks = Collections.unmodifiableList(tasks);
-    /** Active threads */
-    protected final List threads = new ArrayList();
-    /** Maximum number of threads allowed */
-    protected final int maxThreads;
-    /** Idle time before a thread should exit */
-    protected final long timeout;
-    /** Threshold for creating new threads */
-    protected final float loadFactor;
-    /** True if manager has been terminated */
-    protected boolean terminated = false;
-
-    /**
-     * Create a task manager with maxThreads = 10, timeout = 15 seconds,
-     * and loadFactor = 3.0.
-     */
-    public TaskManager() {
-	this(10, 1000 * 15, 3.0f);
-    }
-
-    /**
-     * Create a task manager.
-     *
-     * @param maxThreads maximum number of threads to use on tasks
-     * @param timeout idle time before a thread exits 
-     * @param loadFactor threshold for creating new threads.  A new
-     * thread is created if the total number of runnable tasks (both active
-     * and pending) exceeds the number of threads times the loadFactor,
-     * and the maximum number of threads has not been reached.
-     */
-    public TaskManager(int maxThreads, long timeout, float loadFactor) {
-	this.maxThreads = maxThreads;
-	this.timeout = timeout;
-	this.loadFactor = loadFactor;
-    }
-
-    /**
-     * Add a new task if it is not equal to (using the equals method)
-     * to any existing active or pending task.
-     */
-    public synchronized void addIfNew(Task t) {
-	if (!tasks.contains(t))
-	    add(t);
-    }
-
-    /** Add a new task. */
-    public synchronized void add(Task t) {
-	tasks.add(t);
-	boolean poke = true;
-	while (threads.size() < maxThreads && needThread()) {
-	    Thread th;
-	    try {
-		th = new TaskThread();
-		th.start();
-	    } catch (Throwable tt) {
-		try {
-		    logger.log(threads.isEmpty() ?
-			       Level.SEVERE : Level.WARNING,
-			       "thread creation exception", tt);
-		} catch (Throwable ttt) {
-		}
-		break;
-	    }
-	    threads.add(th);
-	    poke = false;
-	}
-	if (poke &&
-	    threads.size() > firstPending &&
-	    !runAfter(t, tasks.size() - 1))
-	{
-	    notify();
-	}
-    }
-
-    /** Add all tasks in a collection, in iterator order. */
-    public synchronized void addAll(Collection c) {
-	for (Iterator iter = c.iterator(); iter.hasNext(); ) {
-	    add((Task)iter.next());
-	}
-    }
-
-    /** Return true if a new thread should be created (ignoring maxThreads). */
-    protected boolean needThread() {
-	int bound = (int)(loadFactor * threads.size());
-	int max = tasks.size();
-	if (max < bound)
-	    return false;
-	max--;
-	if (runAfter((Task)tasks.get(max), max))
-	    return false;
-	int ready = firstPending + 1;
-	if (ready > bound)
-	    return true;
-	for (int i = firstPending; i < max; i++) {
-	    if (!runAfter((Task)tasks.get(i), i)) {
-		ready++;
-		if (ready > bound)
-		    return true;
-	    }
-	}
-	return false;
-    }
-
-    /**
-     * Returns t.runAfter(i), or false if an exception is thrown.
-     */
-    private boolean runAfter(Task t, int i) {
-	try {
-	    return t.runAfter(roTasks, i);
-	} catch (Throwable tt) {
-	    try {
-		logger.log(Level.WARNING, "Task.runAfter exception", tt);
-	    } catch (Throwable ttt) {
-	    }
-	    return false;
-	}
-    }
-
-    /**
-     * Remove a task if it is pending (not active).  Object identity (==)
-     * is used, not the equals method.  Returns true if the task was
-     * removed.
-     */
-    public synchronized boolean removeIfPending(Task t) {
-	return removeTask(t, firstPending);
-    }
-
-    /*
-     * Remove a task if it is pending or active.  If it is active and not being
-     * executed by the calling thread, interrupt the thread executing the task,
-     * but do not wait for the thread to terminate.  Object identity (==) is
-     * used, not the equals method.  Returns true if the task was removed.
-     */
-    public synchronized boolean remove(Task t) {
-	return removeTask(t, 0);
-    }
-
-    /**
-     * Remove a task if it has index >= min.  If it is active and not being
-     * executed by the calling thread, interrupt the thread executing the task.
-     */
-    private boolean removeTask(Task t, int min) {
-	for (int i = tasks.size(); --i >= min; ) {
-	    if (tasks.get(i) == t) {
-		tasks.remove(i);
-		if (i < firstPending) {
-		    firstPending--;
-		    for (int j = threads.size(); --j >= 0; ) {
-			TaskThread thread = (TaskThread)threads.get(j);
-			if (thread.task == t) {
-			    if (thread != Thread.currentThread())
-				thread.interrupt();
-			    break;
-			}
-		    }
-		}
-		return true;
-	    }
-	}
-	return false;
-    }
-
-    /**
-     * Interrupt all threads, and stop processing tasks.  Only getPending
-     * should be used afterwards.
-     */
-    public synchronized void terminate() {
-	terminated = true;
-	for (int i = threads.size(); --i >= 0; ) {
-	    ((Thread)threads.get(i)).interrupt();
-	}
-    }
-
-    /** Return all pending tasks.  A new list is returned each time. */
-    public synchronized ArrayList getPending() {
-	ArrayList tc = (ArrayList)tasks.clone();
-	for (int i = firstPending; --i >= 0; ) {
-	    tc.remove(0);
-	}
-	return tc;
-    }
-
-    /** Return the maximum number of threads to use on tasks. */
-    public int getMaxThreads() {
-	return maxThreads;
-    }
-
-    private class TaskThread extends Thread {
-
-	/** The task being run, if any */
-	public Task task = null;
-
-	public TaskThread() {
-	    super("task");
-	    setDaemon(true);
-	}
-
-	/**
-	 * Find the next task that can be run, and mark it taken by
-	 * moving firstPending past it (and moving the task in front of
-	 * any pending tasks that are skipped due to execution constraints).
-	 * If a task is found, set task to it and return true.
-	 */
-	private boolean takeTask() {
-	    int size = tasks.size();
-	    for (int i = firstPending; i < size; i++) {
-		Task t = (Task)tasks.get(i);
-		if (!runAfter(t, i)) {
-		    if (i > firstPending) {
-			tasks.remove(i);
-			tasks.add(firstPending, t);
-		    }
-		    firstPending++;
-		    task = t;
-		    return true;
-		}
-	    }
-	    return false;
-	}
-
-	public void run() {
-	    while (true) {
-		synchronized (TaskManager.this) {
-		    if (terminated)
-			return;
-		    if (task != null) {
-			for (int i = firstPending; --i >= 0; ) {
-			    if (tasks.get(i) == task) {
-				tasks.remove(i);
-				firstPending--;
-				break;
-			    }
-			}
-			task = null;
-			interrupted(); // clear interrupt bit
-		    }
-		    if (!takeTask()) {
-			try {
-			    TaskManager.this.wait(timeout);
-			} catch (InterruptedException e) {
-			}
-			if (terminated || !takeTask()) {
-			    threads.remove(this);
-			    return;
-			}
-		    }
-		}
-		try {
-		    task.run();
-		} catch (Throwable t) {
-		    try {
-			logger.log(Level.WARNING, "Task.run exception", t);
-		    } catch (Throwable tt) {
-		    }
-		}
-	    }
-	}
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.thread;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A task manager manages a single queue of tasks, and some number of
+ * worker threads.  New tasks are added to the tail of the queue.  Each
+ * thread loops, taking a task from the queue and running it.  Each
+ * thread looks for a task by starting at the head of the queue and
+ * taking the first task (that is not already being worked on) that is
+ * not required to run after any of the tasks that precede it in
+ * the queue (including tasks that are currently being worked on).
+ * <p>
+ * This class uses the {@link Logger} named
+ * <code>com.sun.jini.thread.TaskManager</code> to log information at
+ * the following logging levels:
+ * <p>
+ * <table border=1 cellpadding=5
+ *       summary="Describes logging performed by TaskManager at different
+ *	          logging levels">
+ * <caption halign="center" valign="top"><b><code>
+ *	   com.sun.jini.thread.TaskManager</code></b></caption>
+ * <tr><th>Level<th>Description
+ * <tr><td>{@link Level#SEVERE SEVERE}<td>
+ * failure to create a worker thread when no other worker threads exist
+ * <tr><td>{@link Level#WARNING WARNING}<td>
+ * exceptions thrown by {@link TaskManager.Task} methods, and failure
+ * to create a worker thread when other worker threads exist
+ * </table>
+ *
+ * @author Sun Microsystems, Inc.
+ * @deprecated will be removed from River 2.4.0 onward
+ */
+@Deprecated
+public class TaskManager {
+
+    /** The interface that tasks must implement */
+    public interface Task extends Runnable {
+	/**
+	 * Return true if this task must be run after at least one task
+	 * in the given task list with an index less than size (size may be
+	 * less then tasks.size()).  Using List.get will be more efficient
+	 * than List.iterator.
+	 *
+	 * @param tasks the tasks to consider.  A read-only List, with all
+	 * elements instanceof Task.
+	 * @param size elements with index less than size should be considered
+	 */
+	boolean runAfter(List tasks, int size);
+    }
+
+    /** Logger */
+    protected static final Logger logger =
+	Logger.getLogger("com.sun.jini.thread.TaskManager");
+
+    /** Active and pending tasks */
+    protected final ArrayList tasks = new ArrayList(); //sync on this
+    /** Index of the first pending task; all earlier tasks are active */
+    protected int firstPending = 0;//sync on this
+    /** Read-only view of tasks */
+    protected final List roTasks = Collections.unmodifiableList(tasks); // sync on this
+    /** Active threads */
+    protected final List threads = new ArrayList(); //sync on this
+    /** Maximum number of threads allowed */
+    protected final int maxThreads;
+    /** Idle time before a thread should exit */
+    protected final long timeout;
+    /** Threshold for creating new threads */
+    protected final float loadFactor;
+    /** True if manager has been terminated */
+    protected boolean terminated = false; //sync on this
+
+    /**
+     * Create a task manager with maxThreads = 10, timeout = 15 seconds,
+     * and loadFactor = 3.0.
+     */
+    public TaskManager() {
+	this(10, 1000 * 15, 3.0f);
+    }
+
+    /**
+     * Create a task manager.
+     *
+     * @param maxThreads maximum number of threads to use on tasks
+     * @param timeout idle time before a thread exits 
+     * @param loadFactor threshold for creating new threads.  A new
+     * thread is created if the total number of runnable tasks (both active
+     * and pending) exceeds the number of threads times the loadFactor,
+     * and the maximum number of threads has not been reached.
+     */
+    public TaskManager(int maxThreads, long timeout, float loadFactor) {
+	this.maxThreads = maxThreads;
+	this.timeout = timeout;
+	this.loadFactor = loadFactor;
+    }
+
+    /**
+     * Add a new task if it is not equal to (using the equals method)
+     * to any existing active or pending task.
+     */
+    public synchronized void addIfNew(Task t) {
+	if (!tasks.contains(t))
+	    add(t);
+    }
+
+    /** Add a new task. */
+    public synchronized void add(Task t) {
+	tasks.add(t);
+	boolean poke = true;
+	while (threads.size() < maxThreads && needThread()) {
+	    Thread th;
+	    try {
+		th = new TaskThread();
+		th.start();
+	    } catch (Throwable tt) {
+		try {
+		    logger.log(threads.isEmpty() ?
+			       Level.SEVERE : Level.WARNING,
+			       "thread creation exception", tt);
+		} catch (Throwable ttt) {
+		}
+		break;
+	    }
+	    threads.add(th);
+	    poke = false;
+	}
+	if (poke &&
+	    threads.size() > firstPending &&
+	    !runAfter(t, tasks.size() - 1))
+	{
+	    notify();
+	}
+    }
+
+    /** Add all tasks in a collection, in iterator order. */
+    public synchronized void addAll(Collection c) {
+	for (Iterator iter = c.iterator(); iter.hasNext(); ) {
+	    add((Task)iter.next());
+	}
+    }
+
+    /** Return true if a new thread should be created (ignoring maxThreads). */
+    protected boolean needThread() {
+	int bound = (int)(loadFactor * threads.size());
+	int max = tasks.size();
+	if (max < bound)
+	    return false;
+	max--;
+	if (runAfter((Task)tasks.get(max), max))
+	    return false;
+	int ready = firstPending + 1;
+	if (ready > bound)
+	    return true;
+	for (int i = firstPending; i < max; i++) {
+	    if (!runAfter((Task)tasks.get(i), i)) {
+		ready++;
+		if (ready > bound)
+		    return true;
+	    }
+	}
+	return false;
+    }
+
+    /**
+     * Returns t.runAfter(i), or false if an exception is thrown.
+     */
+    private boolean runAfter(Task t, int i) {
+	try {
+	    return t.runAfter(roTasks, i);
+	} catch (Throwable tt) {
+	    try {
+		logger.log(Level.WARNING, "Task.runAfter exception", tt);
+	    } catch (Throwable ttt) {
+	    }
+	    return false;
+	}
+    }
+
+    /**
+     * Remove a task if it is pending (not active).  Object identity (==)
+     * is used, not the equals method.  Returns true if the task was
+     * removed.
+     */
+    public synchronized boolean removeIfPending(Task t) {
+	return removeTask(t, firstPending);
+    }
+
+    /*
+     * Remove a task if it is pending or active.  If it is active and not being
+     * executed by the calling thread, interrupt the thread executing the task,
+     * but do not wait for the thread to terminate.  Object identity (==) is
+     * used, not the equals method.  Returns true if the task was removed.
+     */
+    public synchronized boolean remove(Task t) {
+	return removeTask(t, 0);
+    }
+
+    /**
+     * Remove a task if it has index >= min.  If it is active and not being
+     * executed by the calling thread, interrupt the thread executing the task.
+     */
+    private boolean removeTask(Task t, int min) {
+	for (int i = tasks.size(); --i >= min; ) {
+	    if (tasks.get(i) == t) {
+		tasks.remove(i);
+		if (i < firstPending) {
+		    firstPending--;
+		    for (int j = threads.size(); --j >= 0; ) {
+			TaskThread thread = (TaskThread)threads.get(j);
+			if (thread.task == t) {
+			    if (thread != Thread.currentThread())
+				thread.interrupt();
+			    break;
+			}
+		    }
+		}
+		return true;
+	    }
+	}
+	return false;
+    }
+
+    /**
+     * Interrupt all threads, and stop processing tasks.  Only getPending
+     * should be used afterwards.
+     */
+    public synchronized void terminate() {
+	terminated = true;
+	for (int i = threads.size(); --i >= 0; ) {
+	    ((Thread)threads.get(i)).interrupt();
+	}
+    }
+
+    /** Return all pending tasks.  A new list is returned each time. */
+    public synchronized ArrayList getPending() {
+	ArrayList tc = new ArrayList(tasks);
+	for (int i = firstPending; --i >= 0; ) {
+	    tc.remove(0);
+	}
+	return tc;
+    }
+
+    /** Return the maximum number of threads to use on tasks. */
+    public int getMaxThreads() {
+	return maxThreads;
+    }
+
+    private class TaskThread extends Thread {
+
+	/** The task being run, if any */
+	public Task task = null; // sync access on TaskManager.this
+
+	public TaskThread() {
+	    super("task");
+	    setDaemon(true);
+	}
+
+	/**
+	 * Find the next task that can be run, and mark it taken by
+	 * moving firstPending past it (and moving the task in front of
+	 * any pending tasks that are skipped due to execution constraints).
+	 * If a task is found, set task to it and return true.
+	 */
+	private boolean takeTask() {
+	    int size = tasks.size();
+	    for (int i = firstPending; i < size; i++) {
+		Task t = (Task)tasks.get(i);
+		if (!runAfter(t, i)) {
+		    if (i > firstPending) {
+			tasks.remove(i);
+			tasks.add(firstPending, t);
+		    }
+		    firstPending++;
+		    task = t;
+		    return true;
+		}
+	    }
+	    return false;
+	}
+
+	public void run() {
+	    while (true) {
+                Task tsk = null;
+		synchronized (TaskManager.this) {
+		    if (terminated)
+			return;
+		    if (task != null) {
+			for (int i = firstPending; --i >= 0; ) {
+			    if (tasks.get(i) == task) {
+				tasks.remove(i);
+				firstPending--;
+				break;
+			    }
+			}
+			task = null;
+			interrupted(); // clear interrupt bit
+		    }
+		    if (!takeTask()) {
+			try {
+			    TaskManager.this.wait(timeout);
+			} catch (InterruptedException e) {
+			}
+			if (terminated || !takeTask()) {
+			    threads.remove(this);
+			    return;
+			}
+		    }
+                    tsk = task;  
+		}
+		try {
+		    tsk.run();
+		} catch (Throwable t) {
+                    if (t instanceof Error) throw (Error) t;
+		    try {
+			logger.log(Level.WARNING, "Task.run exception", t);
+		    } catch (Throwable tt) {
+		    }
+		}
+	    }
+	}
+    }
+}



Mime
View raw message