qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Giusti <kgiu...@redhat.com>
Subject Re: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks
Date Wed, 08 Jul 2015 14:32:04 GMT
Hi Bozzo,

Can you please revert this change?

It is causing a segfault in the python unit tests when they are run under python3.4.

I haven't hit the segfault on python2.7, only on python3.4

thanks,

-K

----- Original Message -----
> From: bozzo@apache.org
> To: commits@qpid.apache.org
> Sent: Tuesday, July 7, 2015 3:50:16 PM
> Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks
> 
> PROTON-928: cancellable tasks
> 
> A scheduled task can be cancelled.
> A cancelled task does not prevent reactor from stopping running
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
> Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3
> Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3
> Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3
> 
> Branch: refs/heads/master
> Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48
> Parents: 09af375
> Author: Bozo Dragojevic <bozzo@digiverse.si>
> Authored: Tue Jul 7 10:17:40 2015 +0200
> Committer: Bozo Dragojevic <bozzo@digiverse.si>
> Committed: Tue Jul 7 21:49:44 2015 +0200
> 
> ----------------------------------------------------------------------
>  proton-c/bindings/python/proton/reactor.py      |  5 +++-
>  proton-c/include/proton/reactor.h               |  1 +
>  proton-c/src/reactor/timer.c                    | 25 +++++++++++++++++++-
>  proton-c/src/tests/reactor.c                    | 15 ++++++++++++
>  .../org/apache/qpid/proton/reactor/Task.java    |  4 ++++
>  .../qpid/proton/reactor/impl/TaskImpl.java      | 10 ++++++++
>  .../apache/qpid/proton/reactor/impl/Timer.java  | 19 ++++++++++++---
>  proton-j/src/main/resources/creactor.py         |  3 +++
>  tests/python/proton_tests/reactor.py            | 14 +++++++++++
>  9 files changed, 91 insertions(+), 5 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py
> ----------------------------------------------------------------------
> diff --git a/proton-c/bindings/python/proton/reactor.py
> b/proton-c/bindings/python/proton/reactor.py
> index c66334b..d019554 100644
> --- a/proton-c/bindings/python/proton/reactor.py
> +++ b/proton-c/bindings/python/proton/reactor.py
> @@ -53,6 +53,9 @@ class Task(Wrapper):
>      def _init(self):
>          pass
>  
> +    def cancel(self):
> +        pn_task_cancel(self._impl)
> +
>  class Acceptor(Wrapper):
>  
>      def __init__(self, impl):
> @@ -112,7 +115,7 @@ class Reactor(Wrapper):
>          pn_reactor_yield(self._impl)
>  
>      def mark(self):
> -        pn_reactor_mark(self._impl)
> +        return pn_reactor_mark(self._impl)
>  
>      def _get_handler(self):
>          return WrappedHandler.wrap(pn_reactor_get_handler(self._impl),
>          self.on_error)
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h
> ----------------------------------------------------------------------
> diff --git a/proton-c/include/proton/reactor.h
> b/proton-c/include/proton/reactor.h
> index 59b2282..6f52d22 100644
> --- a/proton-c/include/proton/reactor.h
> +++ b/proton-c/include/proton/reactor.h
> @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer,
> pn_timestamp_t deadlin
>  PN_EXTERN int pn_timer_tasks(pn_timer_t *timer);
>  
>  PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task);
> +PN_EXTERN void pn_task_cancel(pn_task_t *task);
>  
>  PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void
>  *object);
>  PN_EXTERN pn_reactor_t *pn_object_reactor(void *object);
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c
> ----------------------------------------------------------------------
> diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c
> index 1ad0821..61efd31 100644
> --- a/proton-c/src/reactor/timer.c
> +++ b/proton-c/src/reactor/timer.c
> @@ -27,12 +27,14 @@ struct pn_task_t {
>    pn_list_t *pool;
>    pn_record_t *attachments;
>    pn_timestamp_t deadline;
> +  bool cancelled;
>  };
>  
>  void pn_task_initialize(pn_task_t *task) {
>    task->pool = NULL;
>    task->attachments = pn_record();
>    task->deadline = 0;
> +  task->cancelled = false;
>  }
>  
>  void pn_task_finalize(pn_task_t *task) {
> @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) {
>    return task->attachments;
>  }
>  
> +void pn_task_cancel(pn_task_t *task) {
> +    assert(task);
> +    task->cancelled = true;
> +}
> +
>  //
>  // timer
>  //
> @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer,
> pn_timestamp_t deadline) {
>    return task;
>  }
>  
> +void pni_timer_flush_cancelled(pn_timer_t *timer) {
> +    while (pn_list_size(timer->tasks)) {
> +        pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
> +        if (task->cancelled) {
> +            pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
> +            assert(min == task);
> +            pn_decref(min);
> +        } else {
> +            break;
> +        }
> +    }
> +}
> +
>  pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) {
>    assert(timer);
> +  pni_timer_flush_cancelled(timer);
>    if (pn_list_size(timer->tasks)) {
>      pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0);
>      return task->deadline;
> @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
> {
>      if (now >= task->deadline) {
>        pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks);
>        assert(min == task);
> -      pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
> +      if (!min->cancelled)
> +          pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK);
>        pn_decref(min);
>      } else {
>        break;
> @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now)
> {
>  
>  int pn_timer_tasks(pn_timer_t *timer) {
>    assert(timer);
> +  pni_timer_flush_cancelled(timer);
>    return pn_list_size(timer->tasks);
>  }
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c
> ----------------------------------------------------------------------
> diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c
> index fe6c769..059d099 100644
> --- a/proton-c/src/tests/reactor.c
> +++ b/proton-c/src/tests/reactor.c
> @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) {
>    pn_free(tevents);
>  }
>  
> +static void test_reactor_schedule_cancel(void) {
> +  pn_reactor_t *reactor = pn_reactor();
> +  pn_handler_t *root = pn_reactor_get_handler(reactor);
> +  pn_list_t *events = pn_list(PN_VOID, 0);
> +  pn_handler_add(root, test_handler(reactor, events));
> +  pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL);
> +  pn_task_cancel(task);
> +  pn_reactor_run(reactor);
> +  pn_reactor_free(reactor);
> +  expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED,
> +         PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END);
> +  pn_free(events);
> +}
> +
>  int main(int argc, char **argv)
>  {
>    test_reactor();
> @@ -461,5 +475,6 @@ int main(int argc, char **argv)
>    test_reactor_transfer(4*1024, 1024);
>    test_reactor_schedule();
>    test_reactor_schedule_handler();
> +  test_reactor_schedule_cancel();
>    return 0;
>  }
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
> ----------------------------------------------------------------------
> diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
> index 69701ab..7fb5964 100644
> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
> @@ -43,4 +43,8 @@ public interface Task extends Extendable {
>      /** @return the reactor that created this task. */
>      Reactor getReactor();
>  
> +    /**
> +     * Cancel the execution of this task. No-op if invoked after the task
> was already executed.
> +     */
> +    void cancel();
>  }
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
> ----------------------------------------------------------------------
> diff --git
> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
> index 00c9a84..11bb6b8 100644
> ---
> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
> +++
> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
> @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>  public class TaskImpl implements Task, Comparable<TaskImpl> {
>      private final long deadline;
>      private final int counter;
> +    private boolean cancelled = false;
>      private final AtomicInteger count = new AtomicInteger();
>      private Record attachments = new RecordImpl();
>      private Reactor reactor;
> @@ -58,6 +59,15 @@ public class TaskImpl implements Task,
> Comparable<TaskImpl> {
>          return deadline;
>      }
>  
> +    public boolean isCancelled() {
> +        return cancelled;
> +    }
> +
> +    @Override
> +    public void cancel() {
> +        cancelled = true;
> +    }
> +
>      public void setReactor(Reactor reactor) {
>          this.reactor = reactor;
>      }
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
> ----------------------------------------------------------------------
> diff --git
> a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
> b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
> index 32bb4f6..b8df19d 100644
> --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
> +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
> @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task;
>  public class Timer {
>  
>      private CollectorImpl collector;
> -    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
> +    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
>  
>      public Timer(Collector collector) {
>          this.collector = (CollectorImpl)collector;
> @@ -44,6 +44,7 @@ public class Timer {
>      }
>  
>      long deadline() {
> +        flushCancelled();
>          if (tasks.size() > 0) {
>              Task task = tasks.peek();
>              return task.deadline();
> @@ -52,12 +53,23 @@ public class Timer {
>          }
>      }
>  
> +    private void flushCancelled() {
> +        while (!tasks.isEmpty()) {
> +            TaskImpl task = tasks.peek();
> +            if (task.isCancelled())
> +                tasks.poll();
> +            else
> +                break;
> +        }
> +    }
> +
>      void tick(long now) {
>          while(!tasks.isEmpty()) {
> -            Task task = tasks.peek();
> +            TaskImpl task = tasks.peek();
>              if (now >= task.deadline()) {
>                  tasks.poll();
> -                collector.put(Type.TIMER_TASK, task);
> +                if (!task.isCancelled())
> +                    collector.put(Type.TIMER_TASK, task);
>              } else {
>                  break;
>              }
> @@ -65,6 +77,7 @@ public class Timer {
>      }
>  
>      int tasks() {
> +        flushCancelled();
>          return tasks.size();
>      }
>  }
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py
> ----------------------------------------------------------------------
> diff --git a/proton-j/src/main/resources/creactor.py
> b/proton-j/src/main/resources/creactor.py
> index e179b23..1f8514e 100644
> --- a/proton-j/src/main/resources/creactor.py
> +++ b/proton-j/src/main/resources/creactor.py
> @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd):
>  def pn_acceptor_close(a):
>      a.close()
>  
> +def pn_task_cancel(t):
> +    t.cancel()
> +
>  def pn_object_reactor(o):
>      if hasattr(o, "impl"):
>          if hasattr(o.impl, "getSession"):
> 
> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py
> ----------------------------------------------------------------------
> diff --git a/tests/python/proton_tests/reactor.py
> b/tests/python/proton_tests/reactor.py
> index 6afee30..067c5c0 100644
> --- a/tests/python/proton_tests/reactor.py
> +++ b/tests/python/proton_tests/reactor.py
> @@ -171,3 +171,17 @@ class ExceptionTest(Test):
>              assert False, "expected to barf"
>          except Barf:
>              pass
> +
> +    def test_schedule_cancel(self):
> +        barf = self.reactor.schedule(10, BarfOnTask())
> +        class CancelBarf:
> +            def on_timer_task(self, event):
> +                barf.cancel()
> +        self.reactor.schedule(0, CancelBarf())
> +        now = self.reactor.mark()
> +        try:
> +            self.reactor.run()
> +            elapsed = self.reactor.mark() - now
> +            assert elapsed < 10, "expected cancelled task to not delay the
> reactor by " + elapsed
> +        except Barf:
> +            assert False, "expected barf to be cancelled"
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
> For additional commands, e-mail: commits-help@qpid.apache.org
> 
> 

-- 
-K

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message