qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alan Conway" <acon...@redhat.com>
Subject Re: Review Request: qpidd refactor
Date Tue, 17 Jul 2012 15:01:17 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/5833/#review9213
-----------------------------------------------------------


Looked at the HA stuff - looks good, improved in several places e.g. avoiding queue scans
now you're using a map rather than a SequenceSet and using seek.  One serious issue - code
in an assert, the remaining comments you can take or leave, non of them is important.


/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp
<https://reviews.apache.org/r/5833/#comment19704>

    const Message&?



/trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp
<https://reviews.apache.org/r/5833/#comment19705>

    Could say just "return m.getState() ==..."



/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h
<https://reviews.apache.org/r/5833/#comment19708>

    Could be simplified with boost::operators. You only have to define ==, <, +=, -= and
the template generates the rest.



/trunk/qpid/cpp/src/qpid/broker/QueueDepth.h
<https://reviews.apache.org/r/5833/#comment19706>

    Could use boost::optional



/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
<https://reviews.apache.org/r/5833/#comment19701>

    Oops! Code inside an assert()! Will explode on in release build.



/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
<https://reviews.apache.org/r/5833/#comment19700>

    Not important but: why pass SequenceNumber by reference? It's just an int32_t



/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
<https://reviews.apache.org/r/5833/#comment19702>

    Maybe move this down closer to its use, its a bit puzzling when seen alone.



/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
<https://reviews.apache.org/r/5833/#comment19703>

    There  is still a bug here (not introduced by you though.) Seek from 0 won't get the front
of the queue if the sequence numbers have wrapped. Want to make sure we leave a comment or
note of that somewhere.


- Alan Conway


On July 17, 2012, 12:38 p.m., Gordon Sim wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/5833/
> -----------------------------------------------------------
> 
> (Updated July 17, 2012, 12:38 p.m.)
> 
> 
> Review request for qpid, Alan Conway and Kenneth Giusti.
> 
> 
> Description
> -------
> 
> == Background ==
> 
> I've been looking at what would be required to get AMQP 1.0 support in
> the qpidd broker (using proton-c). In that context I felt there was a
> need to refactor the broker code, particularly that part that would be
> shared between different protocol versions. Part of the motivation was
> clearer separation of 0-10 specific logic, so that 1.0 logic could be
> introduced as an alternative. However part of it was also simply the
> recognition of some long-standing problems that we have never stopped
> to address.
> 
> So, here is a patch representing my ideas on what is needed. This is
> already a little stale (patch was generated against r1331342) and
> needs a rebase. However I think the basic ideas involved are clear
> enough that it would be worth getting some early feedback.
> 
> == Key Changes ==
> 
> qpid::broker::Message
> 
> This is now supposed to be a protocol neutral representation of a
> message. It no longer exposes qpid::framing::FrameSet. It can be based
> on data received in different encodings (this patch only includes the
> existing 0-10 encoding).
> 
> The immutable, sharable state is separated from the mutable
> queue-specific state. Messages themselves are no longer held through a
> shared pointer but are passed by reference or copied if needed. The
> immutable state (essentially the data as received) *is* still shared
> and referenced internally through an intrusive pointer. There is no
> longer a message level lock. A message instance is 'owned' by
> someother entity (usually the queue it is on) which controls
> concurrent access/modification if necessary.
> 
> The persistence context is a separate part of the message
> also. Currently that can be shared between two message instances if
> desired.
> 
> qpid::broker::Messages
> 
> Switched from using qpid::broker::QueuedMessage (which relied on
> shared pointer to message itself and made sequence number the explicit
> - and only - way to refer to a specific message) to using modified
> Message class directly and a new qpid::broker::QueueCursor.
> 
> The cursor is opaque outside the Messages implementation to which it
> relates. It provides a way to refer to a specific message (without
> directly using sequence number, though at present that is what is used
> 'under the covers') and/or to track progress through a sequence of
> messages (for consumers or other iterating entities).
> 
> I.e. its an iterator to a Message within its containing Messages
> instance that is not invalidated by changes to that container.
> 
> A Messages instance *owns* the Message instances within it. Other
> classes access this through a reference or (raw) pointer, or if needed
> copy it (the immutable part can be - and is - safely shared).
> 
> The codepath for browse/consume is a lot more unified now. You use a
> cursor and call Messages::next() in each case. This also lays the
> foundation for selectors.
> 
> The simplified Messages interface led to a simplied
> MessageDistributor. There is still a little more to do to clarify
> these separate roles (or indeed perhaps unify them?) but more on that
> later.
> 
> qpid::broker::amqp_0_10::MessageTransfer
> 
> This represents the familiar 0-10 encoding of a message. This class is
> broadly similar to the old Message class, based on a FrameSet. However
> it represents the shared and essentially immutable state. The
> sendHeader() method now explicitly takes a copy of the original
> headers and adds to it or otherwise modifies it if needed (e.g. for
> redelivered flag, ttl, annotations etc).
> 
> [Ideally I'd like to move more of the 0-10 specific classes out of
> qpid::broker and into qpid::broker::amqp_0_10, but that has no
> functional relevance so I've left existing classes alone for now.]
> 
> qpid::broker::Consumer
> 
> The deliver() method now takes a QueueCursor (representing a 'handle'
> to this message for use in subsequent operations such as accept,
> relese etc) and a *constant reference* to the Message itself
> (i.e. consumers can't alter the state of the message on the queue
> directly, but only through operations on the queue itself).
> 
> qpid::broker::QueueRegistry
> 
> The actual queue creation has been pulled out into a base class,
> QueueFactory. The actual class of the Queue returned can now be varied
> and there are two subclasses in the current patch. The first is a
> replacement for the ring policy logic, whereby messages are removed
> from the queue in order to keep the queue from growing above a
> configured limit. The second is for last value queues and simply pulls
> the special case behaviour out of the common code path.
> 
> The handling of queue configuration has also been made cleaner and
> more uniform, based on the QueueSettings class.
> 
> qpid::broker::QueuePolicy
> 
> This class has been removed. There is a new QueueDepth utility used
> for configuring limits, tracking current depth and testing the latter
> against the former. This is used directly by Queue. The behaviour at
> the limit can be varied by subclassing queue.
> 
> == Limitations etc ==
> 
> clustering
> 
> This breaks clustering. Indeed it will not compile unless clustering
> is disabled (--without-cpg in configure). Keeping the cluster code in
> sync was distracting me from the core goal, given its entanglement
> with the broker code.
> 
> My assumption is that the new ha code will eventually replace the
> cluster anyway and the amount of change that would be required to get
> the cluster working with this refactor may not be worth it and may in
> fact undermine its stability anyway (which seem the only good argument
> for using it).
> 
> I don't believe there is anything insurmountable to do to re-enable
> cluster if that was desired however.
> 
> old & nasty features removed
> 
> I have removed support for flow to disk, the legacy version of lvq
> with two modes (the updated version of lvq is of course still
> functional), the last-man-standing persistence in clustering and the
> old async queue replication. They are really quite horribly
> implemented and/or are no longer necessary in my view.
> 
> == Still To Do ==
> 
> * rebase again to latest trunk
> 
> * update cmake build (and test on windows)
> 
> * fix xml exchange
> 
> * c++ unit test cleanup (remove or re-enable those tests that have
>   been commented out)
> 
> * test against store(s), consider how this might merge with store
>   interface redesign
> 
> * rethink/clarify role of MessageDistributor v. Messages?
> 
> * can QueueCursors be made more like std iterators?
> 
> 
> Diffs
> -----
> 
>   /trunk/qpid/cpp/src/CMakeLists.txt 1362014 
>   /trunk/qpid/cpp/src/Makefile.am 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/AsyncCompletion.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Broker.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Broker.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Consumer.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Deliverable.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/DtxAck.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Exchange.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Exchange.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Fairshare.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Fairshare.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/FifoDistributor.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/FifoDistributor.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/IndexedDeque.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Link.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/LossyQueue.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/LossyQueue.cpp PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/Lvq.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/Lvq.cpp PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/MapHandler.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/Message.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Message.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageDeque.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageDistributor.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageMap.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Messages.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Persistable.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/PriorityQueue.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/PriorityQueue.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Queue.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/Queue.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueueCursor.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/QueueCursor.cpp PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/QueueDepth.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/QueueDepth.cpp PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/QueueEvents.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueueFactory.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/QueueFactory.cpp PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueueObserver.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/QueueSettings.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/QueueSettings.cpp PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/SemanticState.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/SessionHandler.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/SessionState.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/SessionState.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/TxAccept.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/TxBuffer.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/TxOp.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/TxPublish.h 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp PRE-CREATION 
>   /trunk/qpid/cpp/src/qpid/ha/Backup.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/Primary.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/QueueGuard.h 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h 1362014 
>   /trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp 1362014 
>   /trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h 1362014 
>   /trunk/qpid/cpp/src/qpid/replication/constants.h 1362014 
>   /trunk/qpid/cpp/src/qpid/xml/XmlExchange.h 1362014 
>   /trunk/qpid/cpp/src/qpid/xml/XmlExchange.cpp 1362014 
>   /trunk/qpid/cpp/src/replication.mk 1362014 
>   /trunk/qpid/cpp/src/tests/ClientSessionTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/ExchangeTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/Makefile.am 1362014 
>   /trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/MessageTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/QueueDepth.cpp PRE-CREATION 
>   /trunk/qpid/cpp/src/tests/QueueEvents.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/QueuePolicyTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/QueueRegistryTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/QueueTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/ReplicationTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/TxMocks.h 1362014 
>   /trunk/qpid/cpp/src/tests/TxPublishTest.cpp 1362014 
>   /trunk/qpid/cpp/src/tests/ha_tests.py 1362014 
>   /trunk/qpid/cpp/src/tests/test_store.cpp 1362014 
>   /trunk/qpid/specs/management-schema.xml 1362014 
>   /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/alternate_exchange.py 1362014 
>   /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py 1362014 
>   /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/msg_groups.py 1362014 
>   /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py 1362014 
>   /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/priority.py 1362014 
>   /trunk/qpid/tests/src/py/qpid_tests/broker_0_10/threshold.py 1362014 
> 
> Diff: https://reviews.apache.org/r/5833/diff/
> 
> 
> Testing
> -------
> 
> With clustering disabled, make check passes for me. However I have just realised I've
not got the xml exchange enabled
> either - though that was inadvertant - and there is some work needed there (similar to
what was done in the HeadersExchange).
> 
> 
> Thanks,
> 
> Gordon Sim
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message