qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiri Daněk (JIRA) <j...@apache.org>
Subject [jira] [Updated] (PROTON-1851) [python] Unable to send messages to newly connected server after reconnect has happened
Date Wed, 23 May 2018 11:33:00 GMT

     [ https://issues.apache.org/jira/browse/PROTON-1851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Jiri Daněk updated PROTON-1851:
-------------------------------
    Description: 
If I specify multiple {{container.connect(urls=[...]}}, my client does reconnect fine in case
of server failure. What does not seem to work for me is making the client send messages to
the new server. I think PROTON-1515 may be related to my issues.
h4. Test case

Can be run as {{python -m unittest proton_tests.engine.ServerTest.testFailover}} in {{qpid-proton/tests/python}}
{code:title=tests/python/proton_tests/engine.py}
class ServerTest(Test):

  def testFailover(self):
    """ Verify that messages continue to be delivered
    (to the new broker) after failover happens"""
    server1 = common.TestServer2()
    server1.start()
    server2 = common.TestServer2()
    server2.start()
    print("testFailover")

    class Program(MessagingHandler):
      first = True

      def on_start(self, event):
        print("sender: on start")
        # self.conn = event.container.connect(url="%s:%s" % (server1.host, server1.port))
 # , allowed_mechs="ANONYMOUS")
        self.conn = event.container.connect(urls=["%s:%s" % (server1.host, server1.port),
"%s:%s" % (server2.host, server2.port)])

      def on_sendable(self, event):
        print("sender: on sendable")
        message = Message()
        self.sender.send(message)

      def on_connection_opened(self, event):
        self.sender = event.container.create_sender(self.conn, "some_address")
        print("sender: on connection opened")

    p = Program()
    c = Container(p)
    t = Thread(target=c.run)
    t.start()
    sleep(1)
    server1.stop()
    print("server 1 stopped")
    sleep(1)
    server2.stop()
    sleep(1)
    c.stop()
    print("msgs: ", len(server1.tags), len(server2.tags))
    assert len(server1.tags) > 0
    assert len(server2.tags) > 0  # this assert fails, len == 0
{code}
{code:title=tests/python/proton_tests/common.py}
class TestServer2(MessagingHandler):
  """ Base class for creating test-specific message servers.
  """
  def __init__(self, **kwargs):
    super(TestServer2, self).__init__()
    self.args = kwargs
    self.reactor = Container(self)
    self.host = "127.0.0.1"
    self.port = 0
    if "host" in kwargs:
      self.host = kwargs["host"]
    if "port" in kwargs:
      self.port = kwargs["port"]
    self.handlers = [CFlowController(10), CHandshaker()]
    self.thread = Thread(name="server-thread", target=self.run)
    self.thread.daemon = True
    self.running = True
    self.tags = []

  def start(self):
    self.thread.start()

  def stop(self):
    self.running = False
    self.reactor.wakeup()
    self.acceptor.close()
    self.reactor.stop()
    self.thread.join()

  # Note: all following methods all run under the thread:

  def run(self):
    self.reactor.run()

  def on_start(self, event):
      print("on start server")
      retry = 0
      if self.port == 0:
          self.port = str(randint(49152, 65535))
          retry = 10
      while retry > 0:
          try:
              # self.acceptor = self.reactor.acceptor(self.host, self.port)
              self.acceptor = event.container.listen(Url("%s:%s" % (self.host, self.port)))
              break
          except IOError as e:
              print(e)
              self.port = str(randint(49152, 65535))
              retry -= 1
      assert retry > 0, "No free port for server to listen on!"

  def on_delivery(self, event):
    """
    :type event: proton.Event
    """
    print("on delivery")
    event.delivery.settle()
    self.tags.append(event.delivery.tag)
{code}

  was:
If I specify multiple {{container.connect(urls=[...]}}, my client does reconnect fine in case
of server failure. What does not seem to work for me is making the client send messages to
the new server. I think PROTON-1515 may be related to my issues.

h4. Test case

Can be run as {{python -m unittest proton_tests.engine.ServerTest.testFailover}} in {{qpid-proton/tests/python}}

{code:python|title=tests/python/proton_tests/engine.py}
class ServerTest(Test):

  def testFailover(self):
    """ Verify that messages continue to be delivered
    (to the new broker) after failover happens"""
    server1 = common.TestServer2()
    server1.start()
    server2 = common.TestServer2()
    server2.start()
    print("testFailover")

    class Program(MessagingHandler):
      first = True

      def on_start(self, event):
        print("sender: on start")
        # self.conn = event.container.connect(url="%s:%s" % (server1.host, server1.port))
 # , allowed_mechs="ANONYMOUS")
        self.conn = event.container.connect(urls=["%s:%s" % (server1.host, server1.port),
"%s:%s" % (server2.host, server2.port)])

      def on_sendable(self, event):
        print("sender: on sendable")
        message = Message()
        self.sender.send(message)
        self.connection = event.connection

      def on_connection_opened(self, event):
        self.sender = event.container.create_sender(self.conn, "some_address")
        print("sender: on connection opened")

    p = Program()
    c = Container(p)
    t = Thread(target=c.run)
    t.start()
    sleep(1)
    server1.stop()
    print("server 1 stopped")
    sleep(1)
    server2.stop()
    sleep(1)
    c.stop()
    print("msgs: ", len(server1.tags), len(server2.tags))
    assert len(server1.tags) > 0
    assert len(server2.tags) > 0  # this assert fails, len == 0
{code}

{code:python|title=tests/python/proton_tests/common.py}
class TestServer2(MessagingHandler):
  """ Base class for creating test-specific message servers.
  """
  def __init__(self, **kwargs):
    super(TestServer2, self).__init__()
    self.args = kwargs
    self.reactor = Container(self)
    self.host = "127.0.0.1"
    self.port = 0
    if "host" in kwargs:
      self.host = kwargs["host"]
    if "port" in kwargs:
      self.port = kwargs["port"]
    self.handlers = [CFlowController(10), CHandshaker()]
    self.thread = Thread(name="server-thread", target=self.run)
    self.thread.daemon = True
    self.running = True
    self.tags = []

  def start(self):
    self.thread.start()

  def stop(self):
    self.running = False
    self.reactor.wakeup()
    self.acceptor.close()
    self.reactor.stop()
    self.thread.join()

  # Note: all following methods all run under the thread:

  def run(self):
    self.reactor.run()

  def on_start(self, event):
      print("on start server")
      retry = 0
      if self.port == 0:
          self.port = str(randint(49152, 65535))
          retry = 10
      while retry > 0:
          try:
              # self.acceptor = self.reactor.acceptor(self.host, self.port)
              self.acceptor = event.container.listen(Url("%s:%s" % (self.host, self.port)))
              break
          except IOError as e:
              print(e)
              self.port = str(randint(49152, 65535))
              retry -= 1
      assert retry > 0, "No free port for server to listen on!"

  def on_delivery(self, event):
    """
    :type event: proton.Event
    """
    print("on delivery")
    event.delivery.settle()
    self.tags.append(event.delivery.tag)
{code}


> [python] Unable to send messages to newly connected server after reconnect has happened
> ---------------------------------------------------------------------------------------
>
>                 Key: PROTON-1851
>                 URL: https://issues.apache.org/jira/browse/PROTON-1851
>             Project: Qpid Proton
>          Issue Type: Bug
>          Components: python-binding
>    Affects Versions: proton-c-0.22.0
>            Reporter: Jiri Daněk
>            Priority: Major
>
> If I specify multiple {{container.connect(urls=[...]}}, my client does reconnect fine
in case of server failure. What does not seem to work for me is making the client send messages
to the new server. I think PROTON-1515 may be related to my issues.
> h4. Test case
> Can be run as {{python -m unittest proton_tests.engine.ServerTest.testFailover}} in {{qpid-proton/tests/python}}
> {code:title=tests/python/proton_tests/engine.py}
> class ServerTest(Test):
>   def testFailover(self):
>     """ Verify that messages continue to be delivered
>     (to the new broker) after failover happens"""
>     server1 = common.TestServer2()
>     server1.start()
>     server2 = common.TestServer2()
>     server2.start()
>     print("testFailover")
>     class Program(MessagingHandler):
>       first = True
>       def on_start(self, event):
>         print("sender: on start")
>         # self.conn = event.container.connect(url="%s:%s" % (server1.host, server1.port))
 # , allowed_mechs="ANONYMOUS")
>         self.conn = event.container.connect(urls=["%s:%s" % (server1.host, server1.port),
"%s:%s" % (server2.host, server2.port)])
>       def on_sendable(self, event):
>         print("sender: on sendable")
>         message = Message()
>         self.sender.send(message)
>       def on_connection_opened(self, event):
>         self.sender = event.container.create_sender(self.conn, "some_address")
>         print("sender: on connection opened")
>     p = Program()
>     c = Container(p)
>     t = Thread(target=c.run)
>     t.start()
>     sleep(1)
>     server1.stop()
>     print("server 1 stopped")
>     sleep(1)
>     server2.stop()
>     sleep(1)
>     c.stop()
>     print("msgs: ", len(server1.tags), len(server2.tags))
>     assert len(server1.tags) > 0
>     assert len(server2.tags) > 0  # this assert fails, len == 0
> {code}
> {code:title=tests/python/proton_tests/common.py}
> class TestServer2(MessagingHandler):
>   """ Base class for creating test-specific message servers.
>   """
>   def __init__(self, **kwargs):
>     super(TestServer2, self).__init__()
>     self.args = kwargs
>     self.reactor = Container(self)
>     self.host = "127.0.0.1"
>     self.port = 0
>     if "host" in kwargs:
>       self.host = kwargs["host"]
>     if "port" in kwargs:
>       self.port = kwargs["port"]
>     self.handlers = [CFlowController(10), CHandshaker()]
>     self.thread = Thread(name="server-thread", target=self.run)
>     self.thread.daemon = True
>     self.running = True
>     self.tags = []
>   def start(self):
>     self.thread.start()
>   def stop(self):
>     self.running = False
>     self.reactor.wakeup()
>     self.acceptor.close()
>     self.reactor.stop()
>     self.thread.join()
>   # Note: all following methods all run under the thread:
>   def run(self):
>     self.reactor.run()
>   def on_start(self, event):
>       print("on start server")
>       retry = 0
>       if self.port == 0:
>           self.port = str(randint(49152, 65535))
>           retry = 10
>       while retry > 0:
>           try:
>               # self.acceptor = self.reactor.acceptor(self.host, self.port)
>               self.acceptor = event.container.listen(Url("%s:%s" % (self.host, self.port)))
>               break
>           except IOError as e:
>               print(e)
>               self.port = str(randint(49152, 65535))
>               retry -= 1
>       assert retry > 0, "No free port for server to listen on!"
>   def on_delivery(self, event):
>     """
>     :type event: proton.Event
>     """
>     print("on delivery")
>     event.delivery.settle()
>     self.tags.append(event.delivery.tag)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message