Author: cmacnaug
Date: Thu Jun 18 03:36:41 2009
New Revision: 785887
URL: http://svn.apache.org/viewvc?rev=785887&view=rev
Log:
Updating OpenWireProtocolHandler to use count based window limiter rather than size based.
This allows the the protocol handler to work with the activemq-client.
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
Thu Jun 18 03:36:41 2009
@@ -478,7 +478,6 @@
private Broker createBroker(String name, String bindURI, String connectUri) throws Exception
{
Broker broker = new Broker();
- broker.setDefaultVirtualHost(new VirtualHost(name));
broker.addTransportServer(TransportFactory.bind(new URI(bindURI)));
broker.addConnectUri(connectUri);
broker.setDispatcher(dispatcher);
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
Thu Jun 18 03:36:41 2009
@@ -193,7 +193,7 @@
}
}
}
-
+
if (deleteAllMessages) {
getJournal().start();
journal.delete();
@@ -247,7 +247,6 @@
try {
open();
-
store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())),
null);
} finally {
indexLock.writeLock().unlock();
@@ -441,7 +440,7 @@
try {
indexLock.writeLock().lock();
long start = System.currentTimeMillis();
-
+
try {
if (!opened.get()) {
return;
@@ -718,11 +717,13 @@
public final void rollback() {
try {
- if (updateCount > 1) {
- journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
- }
if (tx != null) {
+ if (updateCount > 1) {
+ journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
+ }
tx.rollback();
+ } else {
+ throw new IllegalStateException("Not in Transaction");
}
} catch (IOException e) {
throw new FatalStoreException(e);
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Thu Jun 18 03:36:41 2009
@@ -103,6 +103,7 @@
public OpenwireProtocolHandler() {
setStoreWireFormat(new OpenWireFormat());
+
visitor = new CommandVisitor() {
// /////////////////////////////////////////////////////////////////
@@ -333,10 +334,7 @@
Command command = (Command) o;
boolean responseRequired = command.isResponseRequired();
try {
-
command.visit(visitor);
-
-
} catch (Exception e) {
if (responseRequired) {
ExceptionResponse response = new ExceptionResponse(e);
@@ -449,7 +447,7 @@
limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(),
info.getPrefetchSize() / 2) {
@Override
public int getElementSize(MessageDelivery m) {
- return m.getFlowLimiterSize();
+ return 1;
}
};
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java?rev=785887&r1=785886&r2=785887&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/openwire/OpenwireRemoteConsumer.java
Thu Jun 18 03:36:41 2009
@@ -39,40 +39,48 @@
private ConsumerInfo consumerInfo;
private Message lastMessage;
-
+
protected void initialize() {
+ inputWindowSize = 1000;
+ inputResumeThreshold = 500;
// Setup the input processing..
- final Flow flow = new Flow("client-"+name+"-inbound", false);
- inputResumeThreshold = inputWindowSize/2;
+ final Flow flow = new Flow("client-" + name + "-inbound", false);
+ inputResumeThreshold = inputWindowSize / 2;
WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false,
flow, inputWindowSize, inputResumeThreshold) {
@Override
protected void sendCredit(int credit) {
MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit,
MessageAck.STANDARD_ACK_TYPE);
write(ack);
}
+
+ public int getElementSize(MessageDelivery md) {
+ return 1;
+ }
};
inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>()
{
public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
messageReceived(controller, elem);
}
+
public String toString() {
return flow.getFlowName();
}
+
public IFlowResource getFlowResource() {
return null;
}
}, flow, limiter, inboundMutex);
inboundController.setExecutor(getDispatcher().createPriorityExecutor(getDispatcher().getDispatchPriorities()
- 1));
-
+
}
-
+
protected void setupSubscription() throws Exception, IOException {
- if( destination.getDomain().equals( Router.QUEUE_DOMAIN ) ) {
+ if (destination.getDomain().equals(Router.QUEUE_DOMAIN)) {
activemqDestination = new ActiveMQQueue(destination.getName().toString());
} else {
activemqDestination = new ActiveMQTopic(destination.getName().toString());
}
-
+
connectionInfo = createConnectionInfo(name);
transport.oneway(connectionInfo);
sessionInfo = createSessionInfo(connectionInfo);
@@ -81,7 +89,7 @@
consumerInfo.setPrefetchSize(inputWindowSize);
transport.oneway(consumerInfo);
}
-
+
public void onCommand(Object command) {
try {
if (command.getClass() == WireFormatInfo.class) {
|