From Alan Conway <acon...@redhat.com>
Subject Re: Proposed QIP: Support for Message Grouping in the broker.
Date Mon, 25 Jul 2011 13:46:30 GMT
Response in line

>>> In this case, it probably would be sufficient to maintain a counter
>>> in the group's state. I'm assuming that we'll need a group lookup on
>>> message arrival to find that state (or create it on the first
>>> received message). Increment the counter when a message in that
>>> group is enqueued, decrement when the message is dequeued. If
>>> decremented to zero, delete the group. Performance impact of such an
>>> approach is TBD, of course - probably need to pool the state objects
>>> to limit memory trashing, etc....
>> That doesn't sound right. The first N messages of a group could be
>> dequeued
>> before the N+1 message is enqueued, which would put your counter to 0
>> before the
>> entire group is processed. I think we need an explicit end of group
>> marker on
>> the last message in the group to allow the user to close a group in a
>> sceanrio
>> with lots of dynamic group ids. You could ignore this in cases where
>> there is a
>> small fixed set of groups that are used over a long time.
> Ah, yes - sorry.   The lifetime of the group's state depends on the type of "Policy"
that is being used (see below).  For the "Sequenced Consumers" policy - which is what I was
thinking of in my last reply - the broker doesn't need to maintain state across all messages
of the group.  With that policy, the state can be dropped once there are no more messages
for that group present in the broker.

Again I don't get that. You're putting group boundaries at unpredictable 
arbitrary points depending on relative speed of producer/consumer. The producer 
may send the first N messages of a group containing N+M messages, then the 
consumer consumes the first N messages. Now the queue is empty so the next M 
messages are considered a new group, where N is impossibly to predict in 
advance. So you randomly cut up the groups.

> For the other proposed policy - Exclusive Consumer - then the group state would be associated
with a particular Consumer instance, and remain present as long as the Consumer exists.  So
in that case, yes - there is the potential that an unbounded number of group states could
exist without some kind of reclaim strategy.   And end-of-group marker could be used, but
that wouldn't prevent a DOS by a misbehaving app that creates endless groups w/o setting the
end marker.  Perhaps a hard max-groups-per-Consumer limit, or a TTL for idle groups?

I'm not so worried about that, it's no worse than the fact that a client can 
create a huge number of of sessions or consumers etc. Solving that family of DOS 
attacks is a separate issue I think.

>>>>> For #2:
>>>>> The key for the header that contains the group identifier would be
>>>>> provided to
>>>>> the broker via configuration.
>>>> A fixed name like 'qpid.group' seems less likely to confuse.
>>>> Perhaps
>>>> have id
>>>> default to 'qpid.group' but be cofigurable to another name in the
>>>> (unlikely?)
>>>> event that it is required.
>>> That's a good idea. The current AMQP 1.0 draft defines a message
>>> property header called "group-id", perhaps we should use
>>> "qpid.group-id" as the default instead of "qpid.group"?
>> Should we be using the AMQP group-id? What's it for?
> It appears to be for the exact same purpose - identify the group a message belongs to.
 Set by the application and preserved to destination.  But I don't see any specific use cases
proposed in the spec.
>>>>>      From the broker's perspective, the number of different group id
>>>>>    values would be
>>>>> unlimited. And the value of group identifiers would not be
>>>>> provided
>>>>> ahead of
>>>>> time by configuration: the broker must learn them at runtime.
>>>>> For #3:
>>>>> This QIP defines two message group policies. Additional policies
>>>>> may
>>>>> be
>>>>> defined in the future.
>>>>> Policy 1: Exclusive Consumer
>>>>> With this policy, the broker would guarantee that all messages in
>>>>> a
>>>>> group
>>>>> would be delivered to the same client.
>>>>> This policy would be configured on a per-queue basis.
>> We could configure policy on a per-group basis with the value of the
>> "qpid.group" property. E.g. qpid.group=exclusive.
>> That's more flexible and puts control of the policy in the hands of
>> the
>> producer, rather than requiring an admin convention that associates a
>> queue name
>> with a policy.
> I'm not sure I completely understand: are you proposing the producer sets both the group
identifier and a policy type identifier on a per-message basis?
>>>>>   When the first
>>>>> message
>>>>> of a new message group becomes available for delivery, the broker
>>>>> will
>>>>> associate that group with the next available consumer. The broker
>>>>> would then
>>>>> guarantee that all messages from that group are delivered to that
>>>>> consumer
>>>>> only.
>>>>> The broker will maintain the group/client association for the
>>>>> lifetime of the
>>>>> client. Should the client die or cancel its subscription, any
>>>>> unacknowledged
>>>>> messages in the group will be assigned to a different client
>>>>> (preserving
>>>>> message order). Group/client associations are not maintained
>>>>> across
>>>>> broker
>>>>> restart. These associations must be replicated in a clustered
>>>>> broker.
>>>>> Policy #2: Sequenced Consumers
>>>>> With this policy, the broker would guarantee that the order in
>>>>> which
>>>>> messages
>>>>> in a group are processed by consumers is the same order in which
>>>>> the
>>>>> messages
>>>>> where enqueued. This guarantee would be upheld even if the
>>>>> messages
>>>>> of the
>>>>> group are processed by different consumers. No two messages from
>>>>> the
>>>>> same
>>>>> group would be processed in parallel by different consumers.
>>>>> Specifically, for any given group, the broker allows only the
>>>>> first
>>>>> N messages
>>>>> in the group to be available for delivery to a particular
>>>>> consumer.
>>>>> The value
>>>>> of N would be determined by the selected consumer's configured
>>>>> prefetch
>>>>> capacity. The broker blocks access to the remaining messages in
>>>>> that
>>>>> group by
>>>>> any other consumer. Once the selected consumer has acknowledged
>>>>> that
>>>>> first set
>>>>> of delivered messages, the broker allows the next messages in the
>>>>> group to be
>>>>> available for delivery. The next set of messages may be delivered
>>>>> to
>>>>> a
>>>>> different consumer.
>>>>> This policy would be configured on a per-queue basis.
>>>>> Configuration
>>>>> would
>>>>> include designating the key of the application header that
>>>>> specifies
>>>>> the group
>>>>> id.
>>>>> Note will that, with this policy, the consuming application has
>>>>> to:
>>>>> 1. ensure that it has completely processed the data in a received
>>>>> message
>>>>> before accepting that message, as described in Section 2.6.2.
>>>>> Transfer of
>>>>> Responsibility, of the AMQP-0.10 specification.
>>>>> 2. ensure that messages are not selectively acknowledged or
>>>>> released
>>>>> - order
>>>>> must be preserved in both cases.
>>>> What happens if the consuming application fails to respect these
>>>> requirements?
>>> Hmmm... good question. My initial thoughts are that this would be
>>> considered an application error, since preserving order is important
>>> for this feature. Perhaps the broker should respond with an error
>>> exception?
>>>>> Note well that in the case of either of these proposed policies,
>>>>> distinct
>>>>> message groups would not block each other from delivery. For
>>>>> example, assume a
>>>>> queue contains messages from two different message groups - say
>>>>> group "A" and
>>>>> group "B" - and they are enqueued such that "A"'s messages are in
>>>>> front of
>>>>> "B". If the first message of group "A" is in the process of being
>>>>> consumed by a
>>>>> client, then the remaining "A" messages are blocked, but the
>>>>> messages of the
>>>>> "B" group are available for consumption - even though it is
>>>>> "behind"
>>>>> group "A"
>>>>> in the queue.
>>>>> ## Rationale
>>>>> The solution described above allows an application to designate a
>>>>> set of
>>>>> related messages, and provides policies for controlling the
>>>>> consumption of the
>>>>> message group.
>>>>>     * Goal: allow dynamic values for the group identifiers (no
>>>>>     preconfiguration of identifiers necessary)
>>>>>     * Goal: the number of message groups "in flight" should only be
>>>>>     limited by the available resources (no need to configure a hard
>>>>>     limit).
>>>>>     * Goal: manageability: visibility into the message groups
>>>>>     currently on a given queue
>>>>>     * Goal: manageability: purge, move, reroute messages at the
>>>>>     group
>>>>>     level.
>>>>> ## Implementation Notes
>>>>>     * Queues: support configuration of the group identifier header
>>>>>     key.
>>>>>     * Messages: provide access to group identifier.
>>>>>     * Queues: identify head message of next available message
>>>>>     group.
>>>>>     * Queues: block the trailing messages in a given message group
>>>>>     from being consumed.
>>>>>     * Consumers: track the message group of the currently acquired
>>>>>     message(s).
>>>>>     * Clustering: ensure state is replicated within the cluster as
>>>>>     needed.
>>>>> ## Consequences
>>>>>     * __Development:__ No changes to the development process.
>>>>>     * __Release:__ No changes to the release process.
>>>>>     * __Documentation:__ User documentation will be needed to
>>>>>     explain
>>>>>     the feature, and the steps to configure and manage the new
>>>>>     feature.
>>>>>     * __Configuration:__ Yes: per-queue group identifier header key
>>>>>     is
>>>>>     configurable. Queue state with regard to in-flight message
>>>>>     groups
>>>>>     needs to be visible. Additional methods to purge/move/reroute a
>>>>>     message group.
>>>>>     * __Compatibility:__ Unlikely - new feature that would need to
>>>>>     be
>>>>>     enabled manually. Applications wishing to use the feature would
>>>>>     need to implement a message grouping policy, and ensure the
>>>>>     processing of received message data is compliant with the
>>>>>     desired
>>>>>     policy.
>>>>> ## References
>>>>>     * None.
>>>>> ## Contributor-in-Charge
>>>>> Kenneth Giusti,<kgiusti@redhat.com>
>>>>> ## Contributors
>>>>>     * Ted Ross,<tross@redhat.com>
>>>>> ## Version
>>>>> 0.2
