river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peter_firmst...@apache.org
Subject svn commit: r1554739 - in /river/jtsk/skunk/qa_refactor/trunk: src/org/apache/river/impl/thread/SynchronousExecutors.java test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java
Date Thu, 02 Jan 2014 05:19:25 GMT
Author: peter_firmstone
Date: Thu Jan  2 05:19:25 2014
New Revision: 1554739

URL: http://svn.apache.org/r1554739
Log:
Started working on ScheduledExecutors to take advantage of a ScheduledExecutorService to allow
sharing of a thread pool for tasks that have dependencies, require order and must not execute
until their dependencies have completed.

Modified:
    river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
    river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java

Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java?rev=1554739&r1=1554738&r2=1554739&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
(original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
Thu Jan  2 05:19:25 2014
@@ -25,11 +25,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -43,7 +43,9 @@ import java.util.logging.Logger;
 import org.apache.river.api.util.Startable;
 
 /**
- * The intent of this Executor is to groups tasks into synchronous queues.
+ * The intent of this Executor is to share a single thread pool among tasks with
+ * dependencies that prevent them running concurrently.
+ * 
  * @author peter
  */
 public class SynchronousExecutors implements Startable {
@@ -89,31 +91,44 @@ public class SynchronousExecutors implem
         distributorThread.interrupt();
     }
     
-    public <T> ExecutorService newExecutor(){
-        QueueWrapper que = new QueueWrapper<T>(new LinkedBlockingQueue<Callable<T>>());
-        ExecutorService serv = new SynchronousExecutor<T>(que, distributorWaiting,
distributorLock, workToDo, pool);
+    /**
+     * The ExecutorService returned, supports a subset of ExecutorService
+     * methods, the intent of this executor is to serialize the execution
+     * of tasks, it is up to the BlockingQueue or caller to ensure order, only 
+     * one task will execute at a time, that task will be retried if it fails,
+     * using a back off strategy of 1, 5 and 10 seconds, followed by 1, 1 and 5
+     * minutes thereafter forever, no other task will execute until the task
+     * at the head of the queue is completed successfully.
+     * 
+     * Tasks submitted must implement Callable, Runnable is not supported.
+     * 
+     * @param <T>
+     * @param queue
+     * @return 
+     */
+    public <T> ExecutorService newSerialExecutor(BlockingQueue<Callable<T>>
queue){
+        QueueWrapper que = new QueueWrapper<T>(queue);
+        ExecutorService serv = new SerialExecutor<T>(que, distributorWaiting, distributorLock,
workToDo);
         addQueue(que);
         return serv;
     }
     
-    private static class SynchronousExecutor<T> implements ExecutorService {
+    private static class SerialExecutor<T> implements ExecutorService {
         
         QueueWrapper<T> queue;
         AtomicBoolean waiting;
         final Lock lock;
         final Condition workToDo;
-        final ScheduledExecutorService pool;
 
-        SynchronousExecutor(QueueWrapper<T> queue, AtomicBoolean waiting, Lock lock,
Condition cond, ScheduledExecutorService pool){
+        SerialExecutor(QueueWrapper<T> queue, AtomicBoolean waiting, Lock lock, Condition
cond){
             this.queue = queue;
             this.waiting = waiting;
             this.lock = lock;
             workToDo = cond;
-            this.pool = pool;
         }
         @Override
         public void shutdown() {
-            throw new UnsupportedOperationException("Not supported yet."); //To change body
of generated methods, choose Tools | Templates.
+            
         }
 
         @Override
@@ -139,7 +154,7 @@ public class SynchronousExecutors implem
         @Override
         public <T> Future<T> submit(Callable<T> task) {
             if (task == null) throw new NullPointerException("task cannot be null");
-            Task t = new Task<T>(task, queue, lock, workToDo, pool);
+            Task t = new Task<T>(task, queue, lock, workToDo);
             if (queue.offer(t)){
                 if (waiting.get() && !queue.stalled){
                     lock.lock();
@@ -350,18 +365,16 @@ public class SynchronousExecutors implem
         private final Lock executorLock;
         private final Condition waiting;
         private final Condition resultAwait;
-        private final ScheduledExecutorService exec;
         private int attempt;
         private volatile long retryTime;
         
-        Task(Callable<T> c, QueueWrapper wrapper, Lock executorLock, Condition distributorWaiting,
ScheduledExecutorService exec){
+        Task(Callable<T> c, QueueWrapper wrapper, Lock executorLock, Condition distributorWaiting){
             task = c;
             queue = wrapper;
             this.waiting = distributorWaiting;
             resultAwait = queue.lock.newCondition();
             attempt = 0;
             this.executorLock = executorLock;
-            this.exec = exec;
         }
         
         /**

Modified: river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java?rev=1554739&r1=1554738&r2=1554739&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java
(original)
+++ river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java
Thu Jan  2 05:19:25 2014
@@ -1,7 +1,19 @@
 /*
- * To change this license header, choose License Headers in Project Properties.
- * To change this template file, choose Tools | Templates
- * and open the template in the editor.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.river.impl.thread;
@@ -14,6 +26,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -59,9 +72,9 @@ public class SynchronouExecutorsTest {
         } catch (Exception ex) {
             ex.printStackTrace(System.out);
         }
-        ExecutorService exec = instance.newExecutor();
-        Future future = exec.submit(new Exceptional());
-        Object result = null;
+        ExecutorService exec = instance.newSerialExecutor(new LinkedBlockingQueue<Callable<String>>());
+        Future<String> future = exec.submit(new Exceptional());
+        String result = null;
         try {
             result = future.get(8, TimeUnit.MINUTES);
         } catch (InterruptedException ex) {
@@ -171,15 +184,15 @@ public class SynchronouExecutorsTest {
         
     }
     
-    private static class Exceptional implements Callable {
+    private static class Exceptional<String> implements Callable<String> {
         private final AtomicInteger tries = new AtomicInteger(0);
         @Override
-        public Object call() throws Exception {
+        public String  call() throws Exception {
             System.out.println("Task called at:");
             System.out.println(System.currentTimeMillis());
             int tri = tries.incrementAndGet();
             if (tri < 7) throw new RemoteException("Dummy communication problem");
-            return "success";
+            return (String) "success";
         }
         
     }



Mime
View raw message