qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF subversion and git services (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (QPID-6397) [C++ broker] segfault when processing QMF method during periodic processing
Date Thu, 05 Mar 2015 11:40:38 GMT

    [ https://issues.apache.org/jira/browse/QPID-6397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348642#comment-14348642
] 

ASF subversion and git services commented on QPID-6397:
-------------------------------------------------------

Commit 1664313 from [~pmoravec] in branch 'qpid/trunk'
[ https://svn.apache.org/r1664313 ]

QPID-6397: [C++ broker] segfault when processing QMF method during periodic processing

> [C++ broker] segfault when processing QMF method during periodic processing
> ---------------------------------------------------------------------------
>
>                 Key: QPID-6397
>                 URL: https://issues.apache.org/jira/browse/QPID-6397
>             Project: Qpid
>          Issue Type: Bug
>          Components: C++ Broker
>    Affects Versions: 0.31
>            Reporter: Pavel Moravec
>
> There is a race condition causing segfault when:
> - one thread executes periodic processing with traces enabled (at least for qpid::management::ManagementAgent::debugSnapshot)
> - second thread is just processing QMF method from a client
> The root cause is, the first thread iterates through managementObjects map in dumpMap
(or through newManagementObjects in dumpVector) while the second thread is executing moveNewObjects
that moves stuff from newManagementObjects to managementObjects.
> See backtraces hit (dumpMap shown, hit also dumpVector):
> (gdb) bt # of thread 1
> #0  0x0000003f0c632885 in raise (sig=6) at ../nptl/sysdeps/unix/sysv/linux/raise.c:64
> #1  0x0000003f0c634065 in abort () at abort.c:92
> #2  0x0000003f0c62b9fe in __assert_fail_base (fmt=<value optimized out>, assertion=0x7ff117bd5c94
"px != 0", 
>     file=0x7ff117bd5c68 "/usr/include/boost/smart_ptr/shared_ptr.hpp", line=<value
optimized out>, function=<value optimized out>)
>     at assert.c:96
> #3  0x0000003f0c62bac0 in __assert_fail (assertion=0x7ff117bd5c94 "px != 0", 
>     file=0x7ff117bd5c68 "/usr/include/boost/smart_ptr/shared_ptr.hpp", line=418, 
>     function=0x7ff117bd5e80 "T* boost::shared_ptr< <template-parameter-1-1>
>::operator->() const [with T = qpid::management::ManagementObject]") at assert.c:105
> #4  0x00007ff117947139 in boost::shared_ptr<qpid::management::ManagementObject>::operator->()
const ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #5  0x00007ff117bafcd9 in qpid::management::(anonymous namespace)::dumpMap(std::map<qpid::management::ObjectId,
boost::shared_ptr<qpid::management::ManagementObject>, std::less<qpid::management::ObjectId>,
std::allocator<std::pair<qpid::management::ObjectId const, boost::shared_ptr<qpid::management::ManagementObject>
> > > const&) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #6  0x00007ff117bb06ba in qpid::management::ManagementAgent::debugSnapshot(char const*)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #7  0x00007ff117b954ed in qpid::management::ManagementAgent::periodicProcessing() ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #8  0x00007ff117bc5c1f in boost::_mfi::mf0<void, qpid::management::ManagementAgent>::operator()(qpid::management::ManagementAgent*)
const () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #9  0x00007ff117bc4384 in void boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*>
>::operator()<boost::_mfi::mf0<void, qpid::management::ManagementAgent>, boost::_bi::list0>(boost::_bi::type<void>,
boost::_mfi::mf0<void, qpid::management::ManagementAgent>&, boost::_bi::list0&,
int) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #10 0x00007ff117bc1269 in boost::_bi::bind_t<void, boost::_mfi::mf0<void, qpid::management::ManagementAgent>,
boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> > >::operator()()
() from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #11 0x00007ff117bbc1e0 in boost::detail::function::void_function_obj_invoker0<boost::_bi::bind_t<void,
boost::_mfi::mf0<void, qpid::management::ManagementAgent>, boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*>
> >, void>::invoke(boost::detail::function::function_buffer&) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #12 0x00007ff117a5e2af in boost::function0<void>::operator()() const () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #13 0x00007ff117b8e400 in qpid::management::(anonymous namespace)::Periodic::fire() ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #14 0x00007ff1173b518f in qpid::sys::TimerTask::fireTask() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> ---Type <return> to continue, or q <return> to quit---
> #15 0x00007ff1173b63e9 in qpid::sys::Timer::fire(boost::intrusive_ptr<qpid::sys::TimerTask>)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #16 0x00007ff1173b5d1d in qpid::sys::Timer::run() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #17 0x00007ff1173280ef in qpid::sys::(anonymous namespace)::runRunnable(void*) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #18 0x0000003f0ce077f1 in start_thread (arg=0x7ff116c8f700) at pthread_create.c:301
> #19 0x0000003f0c6e570d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:115
> (gdb) bt  # of thread 2
> #0  0x00007ff116ca99c7 in qpid::types::VariantImpl::~VariantImpl() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidtypes.so.1
> #1  0x00007ff116caf2e9 in qpid::types::Variant::~Variant() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidtypes.so.1
> #2  0x00007ff117967876 in qmf::org::apache::qpid::broker::Connection::mapEncodeValues(std::map<std::basic_string<char,
std::char_traits<char>, std::allocator<char> >, qpid::types::Variant, std::less<std::basic_string<char,
std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::basic_string<char,
std::char_traits<char>, std::allocator<char> > const, qpid::types::Variant>
> >&, bool, bool) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #3  0x00007ff117bb118d in qpid::management::ManagementAgent::DeletedObject::DeletedObject(boost::shared_ptr<qpid::management::ManagementObject>,
bool, bool) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #4  0x00007ff117b94ec1 in qpid::management::ManagementAgent::moveNewObjects() ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #5  0x00007ff117b9e79a in qpid::management::ManagementAgent::handleMethodRequest(std::basic_string<char,
std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char,
std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char,
std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char,
std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char,
std::char_traits<char>, std::allocator<char> > const&, bool) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #6  0x00007ff117bacb70 in qpid::management::ManagementAgent::dispatchAgentCommand(qpid::broker::Message&,
bool) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #7  0x00007ff117b9ccc3 in qpid::management::ManagementAgent::dispatchCommand(qpid::broker::Deliverable&,
std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&,
qpid::framing::FieldTable const*, bool, int) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #8  0x00007ff117bca08a in qpid::broker::ManagementDirectExchange::route(qpid::broker::Deliverable&)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #9  0x00007ff117b49d65 in qpid::broker::SemanticState::route(qpid::broker::Message&,
qpid::broker::Deliverable&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #10 0x00007ff117b6a8f3 in qpid::broker::SessionState::handleContent(qpid::framing::AMQFrame&)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #11 0x00007ff117b6af31 in qpid::broker::SessionState::handleIn(qpid::framing::AMQFrame&)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #12 0x00007ff117b72dc1 in qpid::framing::Handler<qpid::framing::AMQFrame&>::MemFunRef<qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface,
&(qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface::handleIn(qpid::framing::AMQFrame&))>::handle(qpid::framing::AMQFrame&)
() from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #13 0x00007ff117376ba3 in qpid::amqp_0_10::SessionHandler::handleIn(qpid::framing::AMQFrame&)
()
> ---Type <return> to continue, or q <return> to quit---
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #14 0x00007ff117b72dc1 in qpid::framing::Handler<qpid::framing::AMQFrame&>::MemFunRef<qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface,
&(qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface::handleIn(qpid::framing::AMQFrame&))>::handle(qpid::framing::AMQFrame&)
() from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #15 0x00007ff117ac640a in qpid::framing::Handler<qpid::framing::AMQFrame&>::operator()(qpid::framing::AMQFrame&)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #16 0x00007ff117ac1a8c in qpid::broker::ConnectionHandler::handle(qpid::framing::AMQFrame&)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #17 0x00007ff117ab8444 in qpid::broker::amqp_0_10::Connection::received(qpid::framing::AMQFrame&)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #18 0x00007ff117a23408 in qpid::amqp_0_10::Connection::decode(char const*, unsigned long)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #19 0x00007ff117b34581 in qpid::broker::SecureConnection::decode(char const*, unsigned
long) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #20 0x00007ff1173abc2f in qpid::sys::AsynchIOHandler::readbuff(qpid::sys::AsynchIO&,
qpid::sys::AsynchIOBufferBase*) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #21 0x00007ff117bd2ce6 in boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&,
qpid::sys::AsynchIOBufferBase*>::operator()(qpid::sys::AsynchIOHandler*, qpid::sys::AsynchIO&,
qpid::sys::AsynchIOBufferBase*) const ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #22 0x00007ff117bd1a24 in void boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>,
boost::arg<1>, boost::arg<2> >::operator()<boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler,
qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*>, boost::_bi::list2<qpid::sys::AsynchIO&,
qpid::sys::AsynchIOBufferBase*&> >(boost::_bi::type<void>, boost::_mfi::mf2<void,
qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*>&,
boost::_bi::list2<qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*&>&,
int) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #23 0x00007ff117bd0eb6 in void boost::_bi::bind_t<void, boost::_mfi::mf2<void,
qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*>,
boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>, boost::arg<1>,
boost::arg<2> > >::operator()<qpid::sys::AsynchIO, qpid::sys::AsynchIOBufferBase*>(qpid::sys::AsynchIO&,
qpid::sys::AsynchIOBufferBase*&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #24 0x00007ff117bd018f in boost::detail::function::void_function_obj_invoker2<boost::_bi::bind_t<void,
boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*>,
boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>, boost::arg<1>,
boost::arg<2> > >, void, qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*>::invoke(boost::detail::function::function_buffer&,
qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> ---Type <return> to continue, or q <return> to quit---
> #25 0x00007ff11730f204 in boost::function2<void, qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*>::operator()(qpid::sys::AsynchIO&,
qpid::sys::AsynchIOBufferBase*) const () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #26 0x00007ff11730c28b in qpid::sys::posix::AsynchIO::readable(qpid::sys::DispatchHandle&)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #27 0x00007ff117314202 in boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, qpid::sys::DispatchHandle&>::operator()(qpid::sys::posix::AsynchIO*,
qpid::sys::DispatchHandle&) const () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #28 0x00007ff117313553 in void boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>,
boost::arg<1> >::operator()<boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO,
qpid::sys::DispatchHandle&>, boost::_bi::list1<qpid::sys::DispatchHandle&>
>(boost::_bi::type<void>, boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, qpid::sys::DispatchHandle&>&,
boost::_bi::list1<qpid::sys::DispatchHandle&>&, int) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #29 0x00007ff117312844 in void boost::_bi::bind_t<void, boost::_mfi::mf1<void,
qpid::sys::posix::AsynchIO, qpid::sys::DispatchHandle&>, boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>,
boost::arg<1> > >::operator()<qpid::sys::DispatchHandle>(qpid::sys::DispatchHandle&)
() from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #30 0x00007ff117311821 in boost::detail::function::void_function_obj_invoker1<boost::_bi::bind_t<void,
boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, qpid::sys::DispatchHandle&>,
boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>, boost::arg<1>
> >, void, qpid::sys::DispatchHandle&>::invoke(boost::detail::function::function_buffer&,
qpid::sys::DispatchHandle&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #31 0x00007ff1173afc7a in boost::function1<void, qpid::sys::DispatchHandle&>::operator()(qpid::sys::DispatchHandle&)
const ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #32 0x00007ff1173af240 in qpid::sys::DispatchHandle::processEvent(qpid::sys::Poller::EventType)
()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #33 0x00007ff1173355d6 in qpid::sys::Poller::Event::process() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #34 0x00007ff1173348d4 in qpid::sys::Poller::run() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #35 0x00007ff1173ade21 in qpid::sys::Dispatcher::run() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #36 0x00007ff117a30177 in qpid::broker::Broker::run() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #37 0x000000000040760c in qpid::broker::QpiddBroker::execute(qpid::broker::QpiddOptions*)
()
> #38 0x000000000040b461 in qpid::broker::run_broker(int, char**, bool) ()
> #39 0x00000000004076a6 in main ()
> Reproducer: little bit tricky but basically:
> 1) create as much objects (and continue in creating them to have something in newManagementObjects
map)
> 2) invoke some QMF method just when periodic processing is starting
> In particular, I used this repro:
> 1) populate_provisioning.sh:
> for i in $(seq 1 50); do qpid-config add exchange fanout ex_${i} & done
> wait
> while true; do
> 	j=$(uuidgen)
> 	echo "$(date): starting with queue ${j}_q_1"
> 	for i in $(seq 1 200); do qpid-receive -a "${j}_q_${i}; {create:always, node:{durable:true,
x-bindings:[{exchange:'amq.direct', queue:'${j}_q_${i}', key:''}, {exchange:'amq.fanout',
queue:'${j}_q_${i}'}, {exchange:'ex_1', queue:'${j}_q_${i}'}, {exchange:'ex_2', queue:'${j}_q_${i}'},
{exchange:'ex_3', queue:'${j}_q_${i}'}, {exchange:'ex_4', queue:'${j}_q_${i}'}, {exchange:'ex_5',
queue:'${j}_q_${i}'}, {exchange:'ex_6', queue:'${j}_q_${i}'}, {exchange:'ex_7', queue:'${j}_q_${i}'},
{exchange:'ex_8', queue:'${j}_q_${i}'}, {exchange:'ex_9', queue:'${j}_q_${i}'}, {exchange:'ex_10',
queue:'${j}_q_${i}'}, {exchange:'ex_11', queue:'${j}_q_${i}'}, {exchange:'ex_12', queue:'${j}_q_${i}'},
{exchange:'ex_13', queue:'${j}_q_${i}'}, {exchange:'ex_14', queue:'${j}_q_${i}'}, {exchange:'ex_15',
queue:'${j}_q_${i}'}, {exchange:'ex_16', queue:'${j}_q_${i}'}, {exchange:'ex_17', queue:'${j}_q_${i}'},
{exchange:'ex_18', queue:'${j}_q_${i}'}, {exchange:'ex_19', queue:'${j}_q_${i}'}, {exchange:'ex_20',
queue:'${j}_q_${i}'}, {exchange:'ex_21', queue:'${j}_q_${i}'}, {exchange:'ex_22', queue:'${j}_q_${i}'},
{exchange:'ex_23', queue:'${j}_q_${i}'}, {exchange:'ex_24', queue:'${j}_q_${i}'}, {exchange:'ex_25',
queue:'${j}_q_${i}'}, {exchange:'ex_26', queue:'${j}_q_${i}'}, {exchange:'ex_27', queue:'${j}_q_${i}'},
{exchange:'ex_28', queue:'${j}_q_${i}'}, {exchange:'ex_29', queue:'${j}_q_${i}'}, {exchange:'ex_30',
queue:'${j}_q_${i}'}, {exchange:'ex_31', queue:'${j}_q_${i}'}, {exchange:'ex_32', queue:'${j}_q_${i}'},
{exchange:'ex_33', queue:'${j}_q_${i}'}, {exchange:'ex_34', queue:'${j}_q_${i}'}, {exchange:'ex_35',
queue:'${j}_q_${i}'}, {exchange:'ex_36', queue:'${j}_q_${i}'}, {exchange:'ex_37', queue:'${j}_q_${i}'},
{exchange:'ex_38', queue:'${j}_q_${i}'}, {exchange:'ex_39', queue:'${j}_q_${i}'}, {exchange:'ex_40',
queue:'${j}_q_${i}'}, {exchange:'ex_41', queue:'${j}_q_${i}'}, {exchange:'ex_42', queue:'${j}_q_${i}'},
{exchange:'ex_43', queue:'${j}_q_${i}'}, {exchange:'ex_44', queue:'${j}_q_${i}'}, {exchange:'ex_45',
queue:'${j}_q_${i}'}, {exchange:'ex_46', queue:'${j}_q_${i}'}, {exchange:'ex_47', queue:'${j}_q_${i}'},
{exchange:'ex_48', queue:'${j}_q_${i}'}, {exchange:'ex_49', queue:'${j}_q_${i}'}, {exchange:'ex_50',
queue:'${j}_q_${i}'}] }}" & sleep 0.1; done
> 	wait
> done
> 2) while true; do date "+%T.%N"; ./purge_queue whateverQueue 0 > /dev/null 2>&1;
sleep 9.83; done
> purge_queue.cpp:
> /* To the extent possible under law, Red Hat, Inc. has dedicated all copyright
>  * to this software to the public domain worldwide, pursuant to the CC0 Public
>  * Domain Dedication. This software is distributed without any warranty.  See
>  *  <http://creativecommons.org/publicdomain/zero/1.0/>.
> */
> #include <qpid/messaging/Connection.h>
> #include <qpid/messaging/Session.h>
> #include <qpid/messaging/Sender.h>
> #include <qpid/messaging/Receiver.h>
> #include <qpid/messaging/Message.h>
> #include <qpid/messaging/Address.h>
> #include <iostream>
> #include <cstdlib>
> using namespace std;
> using namespace qpid::messaging;
> using namespace qpid::types;
> int main(int argc, char** argv) {
>   if (argc < 3) {
>     cerr << "Invalid number of parameters, expecting: queue_name, quantity." <<
endl;
>     return 1;
>   }
>   string queue_name = argv[1];
>   uint32_t qty = atoi(argv[2]);
>   Connection connection(argc>3?argv[3]:"localhost:5672");
>   connection.open();
>   Session session = connection.createSession();
>   Sender sender = session.createSender("qmf.default.direct/broker");
>   Address responseQueue("#reply-queue; {create:always, node:{x-declare:{auto-delete:true}}}");
>   Receiver receiver = session.createReceiver(responseQueue);
>   Message message;
>   Variant::Map content;
>   Variant::Map OID;
>   Variant::Map arguments;
>   string object_name = "org.apache.qpid.broker:queue:" + queue_name;
>   OID["_object_name"] = object_name;
>   arguments["request"] = qty;
>   content["_object_id"] = OID;
>   content["_method_name"] = "purge";
>   content["_arguments"] = arguments;
>   encode(content, message);
>   message.setReplyTo(responseQueue);
>   message.setProperty("x-amqp-0-10.app-id", "qmf2");
>   message.setProperty("qmf.opcode", "_method_request");
>   message.setContentType("amqp/map");
>   Variant::Map map;
>   decode(message, map);
>   std::cout << map << std::endl;
>   sender.send(message, true);
>   Message response;
>   if (receiver.fetch(response,qpid::messaging::Duration(30000)) == true) {
>     qpid::types::Variant::Map recv_props = response.getProperties();
>     if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
>       if (recv_props["qmf.opcode"] == "_method_response")
>         std::cout << "Response: OK" << std::endl;
>       else if (recv_props["qmf.opcode"] == "_exception")
>         std::cerr << "Error: " << response.getContent() << std::endl;
>       else
>         std::cerr << "Invalid response received!" << std::endl;
>     else
>       std::cerr << "Invalid response not of qmf2 type received!" << std::endl;
>   }
>   else
>     std::cout << "Timeout: No response received within 30 seconds!" << std::endl;
>   receiver.close();
>   sender.close();
>   session.close();
>   connection.close();
>   return 0;
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message