qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gordon Sim <g...@redhat.com>
Subject [java] build broken by svn commit: r496753?
Date Tue, 16 Jan 2007 18:49:47 GMT
FYI: org.apache.qpid.util.concurrent.BooleanLatch seems to be missing 
from the repository.

rgreig@apache.org wrote:
> Author: rgreig
> Date: Tue Jan 16 08:44:17 2007
> New Revision: 496753
> 
> URL: http://svn.apache.org/viewvc?view=rev&rev=496753
> Log:
> (Patch submitted by Rupert Smith) Refactored the test ping classes to share common code. Made them runnable from the command line (the same as they already were). Also made it possible to instantiate them in other code more easily. A unit test class has been added that performs a ping. This is so that it can be run with the TKTestRunner to scale it up and take performance measurements.
> 
> Junit-toolkit added as a dependency to the maven build system.
> 
> Added:
>     incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
>     incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
>     incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java
>     incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
>     incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
> Modified:
>     incubator/qpid/trunk/qpid/java/perftests/bin/testPingClient.sh
>     incubator/qpid/trunk/qpid/java/perftests/bin/testPingProducer.sh
>     incubator/qpid/trunk/qpid/java/perftests/pom.xml
>     incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
>     incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
>     incubator/qpid/trunk/qpid/java/pom.xml
> 
> Modified: incubator/qpid/trunk/qpid/java/perftests/bin/testPingClient.sh
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/testPingClient.sh?view=diff&rev=496753&r1=496752&r2=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/bin/testPingClient.sh (original)
> +++ incubator/qpid/trunk/qpid/java/perftests/bin/testPingClient.sh Tue Jan 16 08:44:17 2007
> @@ -30,4 +30,4 @@
>  # XXX -Xms1024m -XX:NewSize=300m
>  . ./setupclasspath.sh
>  echo $CP
> -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.pingpong.TestPingClient $thehosts guest guest /test "$@"
> +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.ping.TestPingClient $thehosts guest guest /test "$@"
> 
> Modified: incubator/qpid/trunk/qpid/java/perftests/bin/testPingProducer.sh
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/testPingProducer.sh?view=diff&rev=496753&r1=496752&r2=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/bin/testPingProducer.sh (original)
> +++ incubator/qpid/trunk/qpid/java/perftests/bin/testPingProducer.sh Tue Jan 16 08:44:17 2007
> @@ -30,4 +30,4 @@
>  # XXX -Xms1024m -XX:NewSize=300m
>  . ./setupclasspath.sh
>  echo $CP
> -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.pingpong.TestPingProducer $thehosts /test
> +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.ping.TestPingProducer $thehosts /test
> 
> Modified: incubator/qpid/trunk/qpid/java/perftests/pom.xml
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/pom.xml?view=diff&rev=496753&r1=496752&r2=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/pom.xml (original)
> +++ incubator/qpid/trunk/qpid/java/perftests/pom.xml Tue Jan 16 08:44:17 2007
> @@ -49,20 +49,16 @@
>              <artifactId>log4j</artifactId>
>          </dependency>
>  
> -	<!-- Here JUnit is a deliberate 'compile' scope dependency, it will be packaged with the perf testing tools. -->
> +        <!-- Test dependencies. -->
>          <dependency>
>              <groupId>junit</groupId>
>              <artifactId>junit</artifactId>
> -            <scope>compile</scope>
>          </dependency>
>  
> -        <!-- Will be added to maven repo soon. JUnit test runner that can add repeats/concurrenct/timing etc to tests.
>          <dependency>
>              <groupId>uk.co.thebadgerset</groupId>
>              <artifactId>junit-toolkit</artifactId>
> -            <scope>compile</scope>
>          </dependency>
> -        -->
>  
>      </dependencies>
>  
> @@ -78,6 +74,85 @@
>                  <configuration>
>                      <!--<skip>true</skip>-->
>                  </configuration>
> +            </plugin>
> +
> +            <!-- The JUnit Toolkit maven2 plugin is in the process of being added to the maven repository. It will take a day or two from 16/1/2007.
> +
> +	         Configures the toolkit test runner for performance testing. These can be run from within maven, or by using the generated
> +                 scripts.
> +
> +		 To run from maven:
> +
> +		 mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:tktest
> +
> +		 To run from the command line (after doing assembly:assembly goal):
> +
> +		 java -cp target/test_jar-jar-with-dependencies.jar uk.co.thebadgerset.junit.extensions.OldTKTestRunner -s 1 -r 100000 -o target org.apache.qpid.requestreply.PingPongTestPerf
> +
> +		 To generate the scripts do:
> +
> +		 mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:scripts
> +
> +		 Then to run the scripts do (after doing assembly:assembly goal):
> +
> +		 ./bin/script_name or ./bin/script_name.bat
> +
> +		 These scripts can find everything in the 'all test dependencies' jar.
> +		 -->	
> +            <!--	 
> +            <plugin>
> +              <groupId>uk.co.thebadgerset</groupId>
> +              <artifactId>junit-toolkit-maven-plugin</artifactId> 
> +              <version>0.3</version>
> +
> +              <configuration>
> +                <commands>
> +                  <!## Run the ping pong test once. This is just to check toolkit test runner is working. Real tests to follow. ##>
> +                  <param>-s 1 -r 1 -o target org.apache.qpid.requestreply.PingPongTestPerf</param>
> +                  <!## Add more here... <param> ... </param> ##>
> +
> +		  <!## A little bit of work needs to be done on TKTestRunner before the following syntax works. See Javadoc for details. ##>
> +		  <!## Thread ramp up: <param>-c [1,5,10,50,100] -r 1000 -o target org.apache.qpid.requestreply.PingPongTestPerf</param> ##>
> +		  <!## Run for a length of time: <param>-d1H -o target org.apache.qpid.requestreply.PingPongTestPerf</param> ##>
> +		  <!## Configure test parameters: <param>pingQueue=myping persistent=true transacted=true messageSize=1000 -d10M ... </param> ##>
> +		  <!## And so on. ##>
> +                </commands>
> +              </configuration>
> +
> +              <executions>
> +                <execution>
> +                  <phase>test</phase>
> +                  <!##<goals>
> +                    <goal>tktest</goal>
> +                  </goals>##>
> +                </execution>
> +              </executions>
> +            </plugin>
> +	    -->
> +
> +            <!-- Bundles all the dependencies, fully expanded into a single jar, required to run the tests.
> +
> +                 Usefull when bundling system, integration or performance tests into a convenient
> +                 package to hand over to testers. To use it run:
> +
> +                 java -cp target/your_app_name-all-test-deps.jar path.to.your.Class
> +     
> +                 or often:
> +
> +                 java -cp target/your_app_name-all-test-deps.jar junit.framework.textui.TestRunner path.to.your.test.Class
> +
> +                 or other JUnit test runner invocations.
> +                 -->
> +            <plugin>
> +              <groupId>org.apache.maven.plugins</groupId>
> +              <artifactId>maven-assembly-plugin</artifactId>
> +              <configuration>
> +                <descriptors>
> +                  <descriptor>jar-with-dependencies.xml</descriptor>
> +                </descriptors>
> +                <outputDirectory>target</outputDirectory>
> +                <workDirectory>target/assembly/work</workDirectory>
> +              </configuration>
>              </plugin>
>  
>          </plugins>
> 
> Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java?view=auto&rev=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java (added)
> +++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java Tue Jan 16 08:44:17 2007
> @@ -0,0 +1,70 @@
> +package org.apache.qpid.ping;
> +
> +import javax.jms.JMSException;
> +
> +import org.apache.log4j.Logger;
> +
> +import org.apache.qpid.jms.Session;
> +
> +/**
> + * Provides functionality common to all ping clients. Provides the ability to manage a session and a convenience method
> + * to commit on the current transaction.
> + *
> + * <p><table id="crc"><caption>CRC Card</caption>
> + * <tr><th> Responsibilities <th> Collaborations
> + * <tr><td> Commit the current transcation.
> + * </table>
> + *
> + * @author Rupert Smith
> + */
> +public abstract class AbstractPingClient
> +{
> +    private static final Logger _logger = Logger.getLogger(TestPingClient.class);
> +
> +    /** Used to keep a handle on the JMS session to send replies using. */
> +    protected Session _session;
> +
> +    /**
> +     * Creates an abstract ping client to manage the specified transcation.
> +     *
> +     * @param session The session.
> +     */
> +    public AbstractPingClient(Session session)
> +    {
> +        _session = session;
> +    }
> +
> +    /**
> +     * Convenience method to commit the transaction on the session associated with this bounce back client.
> +     *
> +     * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
> +     */
> +    protected void commitTx() throws JMSException
> +    {
> +        if (_session.getTransacted())
> +        {
> +            try
> +            {
> +                _session.commit();
> +                _logger.trace("Session Commited.");
> +            }
> +            catch (JMSException e)
> +            {
> +                _logger.trace("JMSException on commit:" + e.getMessage(), e);
> +
> +                try
> +                {
> +                    _session.rollback();
> +                    _logger.debug("Message rolled back.");
> +                }
> +                catch (JMSException jmse)
> +                {
> +                    _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
> +
> +                    // Both commit and rollback failed. Throw the rollback exception.
> +                    throw jmse;
> +                }
> +            }
> +        }
> +    }
> +}
> 
> Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java?view=auto&rev=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java (added)
> +++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java Tue Jan 16 08:44:17 2007
> @@ -0,0 +1,193 @@
> +package org.apache.qpid.ping;
> +
> +import javax.jms.*;
> +
> +import org.apache.log4j.Logger;
> +
> +import org.apache.qpid.client.AMQNoConsumersException;
> +import org.apache.qpid.client.message.TestMessageFactory;
> +import org.apache.qpid.jms.Session;
> +
> +/**
> + * This abstract class captures functionality that is common to all ping producers. It provides functionality to
> + * manage a session, and a convenience method to commit a transaction on the session. It also provides a framework
> + * for running a ping loop, and terminating that loop on exceptions or a shutdown handler.
> + *
> + * <p><table id="crc"><caption>CRC Card</caption>
> + * <tr><th> Responsibilities <th> Collaborations
> + * <tr><td> Manage session.
> + * <tr><td> Provide clean shutdown on exception or shutdown hook.
> + * <tr><td> Provide useable shutdown hook implementation.
> + * </table>
> + *
> + * @author Rupert Smith
> + */
> +public abstract class AbstractPingProducer implements Runnable, ExceptionListener
> +{
> +    private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
> +
> +    /** Holds the current Qpid session to send and receive pings on. */
> +    protected Session _session;
> +
> +    /** Used to tell the ping loop when to terminate, it only runs while this is true. */
> +    protected boolean _publish = true;
> +
> +    /**
> +     * Creates an AbstractPingProducer on a session.
> +     */
> +    public AbstractPingProducer(Session session)
> +    {
> +        _session = session;
> +    }
> +
> +    /**
> +     * Generates a test message of the specified size.
> +     *
> +     * @param session     The Qpid session under which to generate the message.
> +     * @param replyQueue  The reply-to destination for the message.
> +     * @param messageSize The desired size of the message in bytes.
> +     * @param currentTime The timestamp to add to the message as a "timestamp" property.
> +     *
> +     * @return A freshly generated test message.
> +     *
> +     * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
> +     */
> +    public static ObjectMessage getTestMessage(Session session, Queue replyQueue, int messageSize, long currentTime,
> +                                               boolean persistent) throws JMSException
> +    {
> +        ObjectMessage msg;
> +
> +        if (messageSize != 0)
> +        {
> +            msg = TestMessageFactory.newObjectMessage(session, messageSize);
> +        }
> +        else
> +        {
> +            msg = session.createObjectMessage();
> +        }
> +
> +        // Set the messages persistent delivery flag.
> +        msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
> +
> +        // Timestamp the message.
> +        msg.setLongProperty("timestamp", currentTime);
> +
> +        // Ensure that the temporary reply queue is set as the reply to destination for the message.
> +        if (replyQueue != null)
> +        {
> +            msg.setJMSReplyTo(replyQueue);
> +        }
> +
> +        return msg;
> +    }
> +
> +    /**
> +     * Convenience method for a short pause.
> +     *
> +     * @param sleepTime The time in milliseconds to pause for.
> +     */
> +    public static void pause(long sleepTime)
> +    {
> +        if (sleepTime > 0)
> +        {
> +            try
> +            {
> +                Thread.sleep(sleepTime);
> +            }
> +            catch (InterruptedException ie)
> +            { }
> +        }
> +    }
> +
> +    public abstract void pingLoop();
> +
> +    /**
> +     * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
> +     * flag has been cleared.
> +     */
> +    public void stop()
> +    {
> +        _publish = false;
> +    }
> +
> +    /**
> +     * Implements a ping loop that repeatedly pings until the publish flag becomes false.
> +     */
> +    public void run()
> +    {
> +        // Keep running until the publish flag is cleared.
> +        while (_publish)
> +        {
> +            pingLoop();
> +        }
> +    }
> +
> +    /**
> +     * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
> +     * connection, this clears the publish flag which in turn will halt the ping loop.
> +     *
> +     * @param e The exception that triggered this callback method.
> +     */
> +    public void onException(JMSException e)
> +    {
> +        _publish = false;
> +        _logger.debug("There was a JMSException: " + e.getMessage(), e);
> +    }
> +
> +    /**
> +     * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
> +     * with the runtime system as a shutdown hook.
> +     *
> +     * @return A shutdown hook for the ping loop.
> +     */
> +    public Thread getShutdownHook()
> +    {
> +        return new Thread(new Runnable()
> +            {
> +                public void run()
> +                {
> +                    stop();
> +                }
> +            });
> +    }
> +
> +    /**
> +     * Convenience method to commit the transaction on the session associated with this pinger.
> +     *
> +     * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
> +     */
> +    protected void commitTx() throws JMSException
> +    {
> +        if (_session.getTransacted())
> +        {
> +            try
> +            {
> +                _session.commit();
> +                _logger.trace("Session Commited.");
> +            }
> +            catch (JMSException e)
> +            {
> +                _logger.trace("JMSException on commit:" + e.getMessage(), e);
> +
> +                // Warn that the bounce back client is not available.
> +                if (e.getLinkedException() instanceof AMQNoConsumersException)
> +                {
> +                    _logger.debug("No consumers on queue.");
> +                }
> +
> +                try
> +                {
> +                    _session.rollback();
> +                    _logger.trace("Message rolled back.");
> +                }
> +                catch (JMSException jmse)
> +                {
> +                    _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
> +
> +                    // Both commit and rollback failed. Throw the rollback exception.
> +                    throw jmse;
> +                }
> +            }
> +        }
> +    }
> +}
> 
> Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java?view=diff&rev=496753&r1=496752&r2=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java (original)
> +++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java Tue Jan 16 08:44:17 2007
> @@ -7,9 +7,9 @@
>   * to you under the Apache License, Version 2.0 (the
>   * "License"); you may not use this file except in compliance
>   * with the License.  You may obtain a copy of the License at
> - * 
> + *
>   *   http://www.apache.org/licenses/LICENSE-2.0
> - * 
> + *
>   * Unless required by applicable law or agreed to in writing,
>   * software distributed under the License is distributed on an
>   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -18,112 +18,104 @@
>   * under the License.
>   *
>   */
> -package org.apache.qpid.pingpong;
> +package org.apache.qpid.ping;
> +
> +import java.net.InetAddress;
> +
> +import javax.jms.*;
>  
>  import org.apache.log4j.Logger;
> -import org.apache.log4j.Level;
> +
>  import org.apache.qpid.client.AMQConnection;
>  import org.apache.qpid.client.AMQQueue;
>  import org.apache.qpid.jms.Session;
>  
> -import javax.jms.*;
> -import java.net.InetAddress;
> -
> -public class TestPingClient
> +/**
> + * PingClient is a message listener that received time stamped ping messages. It can work out how long a ping took,
> + * provided that its clokc is synchronized to that of the ping producer, or by running it on the same machine (or jvm)
> + * as the ping producer.
> + *
> + * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
> + * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
> + * be disabled for real timing tests as writing to the console will slow things down.
> + *
> + * <p><table id="crc"><caption>CRC Card</caption>
> + * <tr><th> Responsibilities <th> Collaborations
> + * <tr><td> Provide command line invocation to start the ping consumer on a configurable broker url.
> + * </table>
> + */
> +class TestPingClient extends AbstractPingClient implements MessageListener
>  {
>      private static final Logger _logger = Logger.getLogger(TestPingClient.class);
>  
> -    private static class TestPingMessageListener implements MessageListener
> -    {
> -        public TestPingMessageListener()
> -        {
> -        }
> -
> -        long _lastTimestamp = 0L;
> -        long _lastTimestampString = 0L;
> -
> -        public void onMessage(javax.jms.Message message)
> -        {
> -            if (_logger.isInfoEnabled())
> -            {
> -                long timestamp = 0L;
> -                long timestampString = 0L;
> -
> -                try
> -                {
> -                    timestamp = message.getLongProperty("timestamp");
> -                    timestampString = Long.parseLong(message.getStringProperty("timestampString"));
> -
> -                    if (timestampString != timestamp)
> -                    {
> -                        _logger.info("Timetamps differ!:\n" +
> -                                     "timestamp:" + timestamp + "\n" +
> -                                     "timestampString:" + timestampString);
> -                    }
> -
> -                }
> -                catch (JMSException jmse)
> -                {
> -                    //ignore
> -                }
> +    /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
> +    private boolean _verbose = false;
>  
> -                long diff = timestamp - _lastTimestamp;
> -                _lastTimestamp = timestamp;
> -
> -                long stringDiff = timestampString - _lastTimestampString;
> -
> -                _lastTimestampString = timestampString;
> -
> -                _logger.info("Ping: T:" + diff + "ms, TS:" + stringDiff);
> +    /**
> +     * Creates a PingPongClient on the specified session.
> +     *
> +     * @param session       The JMS Session for the ping pon client to run on.
> +     * @param consumer      The message consumer to receive the messages with.
> +     * @param verbose       If set to <tt>true</tt> will output timing information on every message.
> +     */
> +    public TestPingClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException
> +    {
> +        // Hang on to the session for the replies.
> +        super(session);
>  
> -                // _logger.info(_name + " got message '" + message + "\n");
> -            }
> -        }
> +        // Set this up to listen for messages on the queue.
> +        consumer.setMessageListener(this);
>      }
>  
> +    /**
> +     * Starts a stand alone ping-pong client running in verbose mode.
> +     *
> +     * @param args
> +     */
>      public static void main(String[] args)
>      {
> -        _logger.setLevel(Level.INFO);
> -
>          _logger.info("Starting...");
>  
> +        // Display help on the command line.
>          if (args.length < 4)
>          {
> -            System.out.println("Usage: brokerdetails username password virtual-path [selector] ");
> +            System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]");
>              System.exit(1);
>          }
> +
> +        // Extract all comman line parameters.
> +        String brokerDetails = args[0];
> +        String username = args[1];
> +        String password = args[2];
> +        String virtualpath = args[3];
> +        boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
> +        String selector = (args.length == 6) ? args[5] : null;
> +
>          try
>          {
>              InetAddress address = InetAddress.getLocalHost();
> -            AMQConnection con1 = new AMQConnection(args[0], args[1], args[2],
> -                                                   address.getHostName(), args[3]);
>  
> +            AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath);
>  
>              _logger.info("Connected with URL:" + con1.toURL());
> -            
> -            final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session)
> -                    con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
> -
>  
> -            String selector = null;
> +            // Create a transactional or non-transactional session depending on the command line parameter.
> +            Session session = null;
>  
> -            if (args.length == 5)
> +            if (transacted)
>              {
> -                selector = args[4];
> -                _logger.info("Message selector is <" + selector + ">...");
> +                session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED);
>              }
> -            else
> +            else if (!transacted)
>              {
> -                _logger.info("Not using message selector");
> +                session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
>              }
>  
> -
>              Queue q = new AMQQueue("ping");
>  
> -            MessageConsumer consumer1 = session1.createConsumer(q,
> -                                                                1, false, false, selector);
> +            MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector);
> +            new TestPingClient(session, consumer, true);
>  
> -            consumer1.setMessageListener(new TestPingMessageListener());
>              con1.start();
>          }
>          catch (Throwable t)
> @@ -134,5 +126,35 @@
>  
>          System.out.println("Waiting...");
>      }
> -}
>  
> +    /**
> +     * This is a callback method that is notified of all messages for which this has been registered as a message
> +     * listener on a message consumer.
> +     *
> +     * @param message The message that triggered this callback.
> +     */
> +    public void onMessage(javax.jms.Message message)
> +    {
> +        try
> +        {
> +            // Spew out some timing information if verbose mode is on.
> +            if (_verbose)
> +            {
> +                Long timestamp = message.getLongProperty("timestamp");
> +
> +                if (timestamp != null)
> +                {
> +                    long diff = System.currentTimeMillis() - timestamp;
> +                    _logger.info("Ping time: " + diff);
> +                }
> +            }
> +
> +            // Commit the transaction if running in transactional mode.
> +            commitTx();
> +        }
> +        catch (JMSException e)
> +        {
> +            _logger.debug("There was a JMSException: " + e.getMessage(), e);
> +        }
> +    }
> +}
> 
> Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java?view=diff&rev=496753&r1=496752&r2=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java (original)
> +++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java Tue Jan 16 08:44:17 2007
> @@ -7,9 +7,9 @@
>   * to you under the Apache License, Version 2.0 (the
>   * "License"); you may not use this file except in compliance
>   * with the License.  You may obtain a copy of the License at
> - * 
> + *
>   *   http://www.apache.org/licenses/LICENSE-2.0
> - * 
> + *
>   * Unless required by applicable law or agreed to in writing,
>   * software distributed under the License is distributed on an
>   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> @@ -18,226 +18,196 @@
>   * under the License.
>   *
>   */
> -package org.apache.qpid.pingpong;
> +package org.apache.qpid.ping;
> +
> +import java.net.InetAddress;
> +
> +import javax.jms.*;
>  
>  import org.apache.log4j.Logger;
> +
>  import org.apache.qpid.client.AMQConnection;
> -import org.apache.qpid.AMQException;
> -import org.apache.qpid.url.URLSyntaxException;
> -import org.apache.qpid.client.AMQNoConsumersException;
> -import org.apache.qpid.client.BasicMessageProducer;
>  import org.apache.qpid.client.AMQQueue;
> -import org.apache.qpid.client.message.TestMessageFactory;
>  import org.apache.qpid.jms.MessageProducer;
>  import org.apache.qpid.jms.Session;
>  
> -import javax.jms.*;
> -import java.net.InetAddress;
> -import java.net.UnknownHostException;
> -
>  /**
> - * A client that behaves as follows:
> - * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
> - * <li>Creates a temporary queue</li>
> - * <li>Creates messages containing a property that is the name of the temporary queue</li>
> - * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
> - * </ul>
> + * PingProducer is a client that sends timestamped pings to a queue. It is designed to be run from the command line
> + * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session and
> + * configured message producer.
> + *
> + * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
> + * does all its work through helper methods, so that code wishing to run a ping cycle is not forced to do so
> + * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
> + * also registered to terminate the ping loop cleanly.
> + *
> + * <p/><table id="crc"><caption>CRC Card</caption>
> + * <tr><th> Responsibilities <th> Collaborations
> + * <tr><td> Provide a ping cycle.
> + * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
> + * </table>
>   */
> -public class TestPingProducer implements ExceptionListener
> +class TestPingProducer extends AbstractPingProducer
>  {
> -    private static final Logger _log = Logger.getLogger(TestPingProducer.class);
> +    private static final Logger _logger = Logger.getLogger(TestPingProducer.class);
>  
> -    private AMQConnection _connection;
> +    /** Used to set up a default message size. */
> +    private static final int DEFAULT_MESSAGE_SIZE = 0;
>  
> -    private static int _messageSize = 0;
> -    private boolean _publish;
> +    /** Used to define how long to wait between pings. */
> +    private static final long SLEEP_TIME = 250;
>  
> -    private long SLEEP_TIME = 250L;
> +    /** Used to define how long to wait before assuming that a ping has timed out. */
> +    private static final long TIMEOUT = 3000;
>  
> -//    private class CallbackHandler implements MessageListener
> -//    {
> -//
> -//        private int _actualMessageCount;
> -//
> -//
> -//        public void onMessage(Message m)
> -//        {
> -//            if (_log.isDebugEnabled())
> -//            {
> -//                _log.debug("Message received: " + m);
> -//            }
> -//            _actualMessageCount++;
> -//            if (_actualMessageCount % 1000 == 0)
> -//            {
> -//                _log.info("Received message count: " + _actualMessageCount);
> -//            }
> -//        }
> -//    }
> +    /** Holds the name of the queue to send pings on. */
> +    private static final String PING_QUEUE_NAME = "ping";
> +    private static TestPingProducer _pingProducer;
>  
> -    public TestPingProducer(boolean TRANSACTED, String brokerDetails, String clientID,
> -                            String virtualpath) throws AMQException, URLSyntaxException
> -    {
> -        try
> -        {
> -            createConnection(brokerDetails, clientID, virtualpath);
> +    /** Holds the message producer to send the pings through. */
> +    private MessageProducer _producer;
>  
> -            Session session;
> +    /** Determines whether this producer sends persistent messages from the run method. */
> +    private boolean _persistent;
>  
> -            if (TRANSACTED)
> -            {
> -                session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
> -            }
> -            else
> -            {
> -                session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> -            }
> -
> -            String queue = "ping";
> -            AMQQueue destination = new AMQQueue(queue);
> -            MessageProducer producer = (MessageProducer) session.createProducer(destination);
> -
> -            _connection.setExceptionListener(this);
> -
> -            _connection.start();
> -
> -            while (_publish)
> -            {
> -/*
> -                TextMessage msg = session.createTextMessage(
> -                        "Presented to in conjunction with Mahnah Mahnah and the Snowths: " + ++messageNumber);
> -*/
> -                ObjectMessage msg = null;
> -                if (_messageSize != 0)
> -                {
> -                    msg = TestMessageFactory.newObjectMessage(session, _messageSize);
> -                }
> -                else
> -                {
> -                    msg = session.createObjectMessage();
> -                }
> -                
> -                msg.setStringProperty("timestampString", Long.toString(System.currentTimeMillis()));
> -                msg.setLongProperty("timestamp", System.currentTimeMillis());
> -
> -                ((BasicMessageProducer) producer).send(msg, DeliveryMode.NON_PERSISTENT, true);
> -                _log.info("Message Sent.");
> -                _log.debug(msg);
> -
> -
> -                if (TRANSACTED)
> -                {
> -                    try
> -                    {
> -                        session.commit();
> -                        _log.debug("Session Commited.");
> -                    }
> -                    catch (JMSException e)
> -                    {
> -                        _log.trace("JMSException on commit:" + e);
> -                        try
> -                        {
> -                            session.rollback();
> -                            _log.debug("Message rolled back.");
> -                        }
> -                        catch (JMSException jsme)
> -                        {
> -                            _log.trace("JMSE on rollback:" + jsme);
> -                        }
> -
> -
> -                        if (e.getLinkedException() instanceof AMQNoConsumersException)
> -                        {
> -                            _log.info("No Consumers on queue:'" + queue + "'");
> -                            continue;
> -                        }
> -                    }
> -                }
> -
> -
> -                if (SLEEP_TIME > 0)
> -                {
> -                    try
> -                    {
> -                        Thread.sleep(SLEEP_TIME);
> -                    }
> -                    catch (InterruptedException ie)
> -                    {
> -                        //do nothing
> -                    }
> -                }
> +    /** Holds the message size to send, from the run method. */
> +    private int _messageSize;
>  
> -
> -            }
> -
> -        }
> -        catch (JMSException e)
> -        {
> -            _publish = false;
> -            e.printStackTrace();
> -        }
> +    public TestPingProducer(Session session, MessageProducer producer) throws JMSException
> +    {
> +        super(session);
> +        _producer = producer;
>      }
>  
> -    private void createConnection(String brokerDetails, String clientID, String virtualpath) throws AMQException, URLSyntaxException
> +    public TestPingProducer(Session session, MessageProducer producer, boolean persistent, int messageSize)
> +                     throws JMSException
>      {
> -        _publish = true;
> -        _connection = new AMQConnection(brokerDetails, "guest", "guest",
> -                                        clientID, virtualpath);
> -        _log.info("Connected with URL:" + _connection.toURL());
> +        this(session, producer);
> +
> +        _persistent = persistent;
> +        _messageSize = messageSize;
>      }
>  
>      /**
> -     * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
> -     *             means the server will allocate a name.
> +     * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
> +     * to be started to bounce the pings back again.
> +     *
> +     * <p/>The command line takes from 2 to 4 arguments:
> +     * <p/><table>
> +     * <tr><td>brokerDetails <td> The broker connection string.
> +     * <tr><td>virtualPath   <td> The virtual path.
> +     * <tr><td>transacted    <td> A boolean flag, telling this client whether or not to use transactions.
> +     * <tr><td>size          <td> The size of ping messages to use, in bytes.
> +     * </table>
> +     *
> +     * @param args The command line arguments as defined above.
>       */
> -    public static void main(String[] args)
> +    public static void main(String[] args) throws Exception
>      {
> +        // Extract the command line.
>          if (args.length < 2)
>          {
> -            System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [message size in bytes]");
> +            System.err.println(
> +                "Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [persistent] [message size in bytes]");
>              System.exit(0);
>          }
> -        try
> -        {
> -            InetAddress address = InetAddress.getLocalHost();
> -            String clientID = address.getHostName() + System.currentTimeMillis();
> -            boolean transacted = false;
> -            if (args.length == 3 )
> -            {
> -                transacted = Boolean.parseBoolean(args[2]);
> -            }
> -            else if (args.length > 3 )
> -            {
> -                transacted = Boolean.parseBoolean(args[2]);
> -                _messageSize = Integer.parseInt(args[3]);
> -            }
>  
> -            new TestPingProducer(transacted, args[0], clientID, args[1]);
> -        }
> -        catch (UnknownHostException e)
> -        {
> -            e.printStackTrace();
> -        }
> -        catch (AMQException e)
> +        String brokerDetails = args[0];
> +        String virtualpath = args[1];
> +        boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
> +        boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
> +        int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
> +
> +        // Create a connection to the broker.
> +        InetAddress address = InetAddress.getLocalHost();
> +        String clientID = address.getHostName() + System.currentTimeMillis();
> +
> +        Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
> +
> +        // Create a transactional or non-transactional session, based on the command line arguments.
> +        Session session;
> +
> +        if (transacted)
>          {
> -            System.err.println("Error in client: " + e);
> -            e.printStackTrace();
> +            session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
>          }
> -        catch (URLSyntaxException e)
> +        else
>          {
> -            System.err.println("Error in connection arguments : " + e);
> +            session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>          }
>  
> -        //System.exit(0);
> +        // Create a queue to send the pings on.
> +        Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
> +        MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
> +
> +        // Create a ping producer to handle the request/wait/reply cycle.
> +        _pingProducer = new TestPingProducer(session, producer, persistent, messageSize);
> +
> +        // Start the message consumers running.
> +        _connection.start();
> +
> +        // Create a shutdown hook to terminate the ping-pong producer.
> +        Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
> +
> +        // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
> +        _connection.setExceptionListener(_pingProducer);
> +        Thread pingThread = new Thread(_pingProducer);
> +        pingThread.run();
> +
> +        // Run until the ping loop is terminated.
> +        pingThread.join();
>      }
>  
>      /**
> -     * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
> +     * Sends the specified ping message.
> +     *
> +     * @param message The message to send.
> +     *
> +     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
>       */
> -    public void onException(JMSException e)
> +    public void ping(Message message) throws JMSException
>      {
> -        System.err.println(e.getMessage());
> +        _producer.send(message);
> +
> +        // Keep the messageId to correlate with the reply.
> +        String messageId = message.getJMSMessageID();
> +
> +        // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
> +        // this method, as the message will not be sent until the transaction is committed.
> +        commitTx();
> +    }
>  
> +    /**
> +     * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
> +     * flag has been cleared.
> +     */
> +    public void stop()
> +    {
>          _publish = false;
> -        e.printStackTrace(System.err);
> +    }
> +
> +    /**
> +     * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
> +     * waits for short pauses in between each.
> +     */
> +    public void pingLoop()
> +    {
> +        try
> +        {
> +            // Generate a sample message and time stamp it.
> +            ObjectMessage msg = getTestMessage(_session, null, _messageSize, System.currentTimeMillis(), _persistent);
> +            msg.setLongProperty("timestamp", System.currentTimeMillis());
> +
> +            // Send the message.
> +            ping(msg);
> +
> +            // Introduce a short pause if desired.
> +            pause(SLEEP_TIME);
> +        }
> +        catch (JMSException e)
> +        {
> +            _publish = false;
> +            _logger.debug("There was a JMSException: " + e.getMessage(), e);
> +        }
>      }
>  }
> 
> Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java?view=auto&rev=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java (added)
> +++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java Tue Jan 16 08:44:17 2007
> @@ -0,0 +1,175 @@
> +/*
> + *
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + *
> + */
> +package org.apache.qpid.requestreply;
> +
> +import java.net.InetAddress;
> +
> +import javax.jms.*;
> +
> +import org.apache.log4j.Logger;
> +
> +import org.apache.qpid.client.AMQConnection;
> +import org.apache.qpid.client.AMQQueue;
> +import org.apache.qpid.jms.Session;
> +import org.apache.qpid.ping.AbstractPingClient;
> +
> +/**
> + * PingPongClient is a message listener the bounces back messages to their reply to destination. This is used to return
> + * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes too.
> + *
> + * <p/>The message id from the received message is extracted, and placed into the reply as the correlation id. Messages
> + * are bounced back to the reply-to destination. The original sender of the message has the option to use either a unique
> + * temporary queue or the correlation id to correlate the original message to the reply.
> + *
> + * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
> + * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
> + * be disabled for real timing tests as writing to the console will slow things down.
> + *
> + * <p><table id="crc"><caption>CRC Card</caption>
> + * <tr><th> Responsibilities <th> Collaborations
> + * <tr><td> Bounce back messages to their reply to destination.
> + * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
> + * </table>
> + *
> + * @todo Rename this to BounceBackClient or something similar.
> + */
> +public class PingPongClient extends AbstractPingClient implements MessageListener
> +{
> +    private static final Logger _logger = Logger.getLogger(PingPongClient.class);
> +
> +    /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
> +    private boolean _verbose = false;
> +
> +    /**
> +     * Creates a PingPongClient on the specified session.
> +     *
> +     * @param session       The JMS Session for the ping pon client to run on.
> +     * @param consumer      The message consumer to receive the messages with.
> +     * @param verbose       If set to <tt>true</tt> will output timing information on every message.
> +     */
> +    public PingPongClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException
> +    {
> +        // Hang on to the session for the replies.
> +        super(session);
> +
> +        // Set this up to listen for messages on the queue.
> +        consumer.setMessageListener(this);
> +    }
> +
> +    /**
> +     * Starts a stand alone ping-pong client running in verbose mode.
> +     *
> +     * @param args
> +     */
> +    public static void main(String[] args)
> +    {
> +        _logger.info("Starting...");
> +
> +        // Display help on the command line.
> +        if (args.length < 4)
> +        {
> +            System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]");
> +            System.exit(1);
> +        }
> +
> +        // Extract all comman line parameters.
> +        String brokerDetails = args[0];
> +        String username = args[1];
> +        String password = args[2];
> +        String virtualpath = args[3];
> +        boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
> +        String selector = (args.length == 6) ? args[5] : null;
> +
> +        try
> +        {
> +            InetAddress address = InetAddress.getLocalHost();
> +
> +            AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath);
> +
> +            _logger.info("Connected with URL:" + con1.toURL());
> +
> +            // Create a transactional or non-transactional session depending on the command line parameter.
> +            Session session = null;
> +
> +            if (transacted)
> +            {
> +                session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED);
> +            }
> +            else if (!transacted)
> +            {
> +                session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
> +            }
> +
> +            Queue q = new AMQQueue("ping");
> +
> +            MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector);
> +            new PingPongClient(session, consumer, true);
> +
> +            con1.start();
> +        }
> +        catch (Throwable t)
> +        {
> +            System.err.println("Fatal error: " + t);
> +            t.printStackTrace();
> +        }
> +
> +        System.out.println("Waiting...");
> +    }
> +
> +    /**
> +     * This is a callback method that is notified of all messages for which this has been registered as a message
> +     * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
> +     * destination of the message.
> +     *
> +     * @param message The message that triggered this callback.
> +     */
> +    public void onMessage(javax.jms.Message message)
> +    {
> +        try
> +        {
> +            // Spew out some timing information if verbose mode is on.
> +            if (_verbose)
> +            {
> +                Long timestamp = message.getLongProperty("timestamp");
> +
> +                if (timestamp != null)
> +                {
> +                    long diff = System.currentTimeMillis() - timestamp;
> +                    _logger.info("Ping time: " + diff);
> +                }
> +            }
> +
> +            // Correlate the reply to the original.
> +            message.setJMSCorrelationID(message.getJMSMessageID());
> +
> +            // Send the receieved message as the pong reply.
> +            MessageProducer producer = _session.createProducer(message.getJMSReplyTo());
> +            producer.send(message);
> +
> +            // Commit the transaction if running in transactional mode.
> +            commitTx();
> +        }
> +        catch (JMSException e)
> +        {
> +            _logger.debug("There was a JMSException: " + e.getMessage(), e);
> +        }
> +    }
> +}
> 
> Added: incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=auto&rev=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (added)
> +++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Tue Jan 16 08:44:17 2007
> @@ -0,0 +1,301 @@
> +/*
> + *
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + *
> + */
> +package org.apache.qpid.requestreply;
> +
> +import java.net.InetAddress;
> +import java.util.HashMap;
> +import java.util.Map;
> +
> +import javax.jms.*;
> +
> +import org.apache.log4j.Logger;
> +
> +import org.apache.qpid.client.AMQConnection;
> +import org.apache.qpid.client.AMQNoConsumersException;
> +import org.apache.qpid.client.AMQQueue;
> +import org.apache.qpid.client.message.TestMessageFactory;
> +import org.apache.qpid.jms.MessageProducer;
> +import org.apache.qpid.jms.Session;
> +import org.apache.qpid.ping.AbstractPingProducer;
> +import org.apache.qpid.util.concurrent.BooleanLatch;
> +
> +/**
> + * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
> + * client (see {@link org.apache.qpid.requestreply.PingPongClient} for the bounce back client). It is designed to be run from the command line
> + * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
> + * message producer and message consumer to run the ping-pong cycle on.
> + *
> + * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
> + * This means that this class has to do some work to correlate pings with pongs; it expectes the original message
> + * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
> + * this correlation would not need to be done.
> + *
> + * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
> + * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
> + * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
> + * also registered to terminate the ping-pong loop cleanly.
> + *
> + * <p/><table id="crc"><caption>CRC Card</caption>
> + * <tr><th> Responsibilities <th> Collaborations
> + * <tr><td> Provide a ping and wait for response cycle.
> + * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
> + * </table>
> + *
> + * @todo Make temp queue per ping a command line option.
> + *
> + * @todo Make the queue name a command line option.
> + */
> +public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
> +{
> +    private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
> +
> +    /** Used to set up a default message size. */
> +    private static final int DEFAULT_MESSAGE_SIZE = 0;
> +
> +    /** Used to define how long to wait between pings. */
> +    private static final long SLEEP_TIME = 250;
> +
> +    /** Used to define how long to wait before assuming that a ping has timed out. */
> +    private static final long TIMEOUT = 3000;
> +
> +    /** Holds the name of the queue to send pings on. */
> +    private static final String PING_QUEUE_NAME = "ping";
> +
> +    /** Keeps track of the ping producer instance used in the run loop. */
> +    private static PingPongProducer _pingProducer;
> +
> +    /** Holds the message producer to send the pings through. */
> +    private MessageProducer _producer;
> +
> +    /** Holds the queue to send the ping replies to. */
> +    private Queue _replyQueue;
> +
> +    /** Determines whether this producer sends persistent messages from the run method. */
> +    private boolean _persistent;
> +
> +    /** Holds the message size to send, from the run method. */
> +    private int _messageSize;
> +
> +    /** Holds a map from message ids to latches on which threads wait for replies. */
> +    private Map<String, BooleanLatch> trafficLights = new HashMap<String, BooleanLatch>();
> +
> +    /** Holds a map from message ids to correlated replies. */
> +    private Map<String, Message> replies = new HashMap<String, Message>();
> +
> +    public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer)
> +                     throws JMSException
> +    {
> +        super(session);
> +        _producer = producer;
> +        _replyQueue = replyQueue;
> +
> +        consumer.setMessageListener(this);
> +    }
> +
> +    public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer,
> +                            boolean persistent, int messageSize) throws JMSException
> +    {
> +        this(session, replyQueue, producer, consumer);
> +
> +        _persistent = persistent;
> +        _messageSize = messageSize;
> +    }
> +
> +    /**
> +     * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
> +     * to be started to bounce the pings back again.
> +     *
> +     * <p/>The command line takes from 2 to 4 arguments:
> +     * <p/><table>
> +     * <tr><td>brokerDetails <td> The broker connection string.
> +     * <tr><td>virtualPath   <td> The virtual path.
> +     * <tr><td>transacted    <td> A boolean flag, telling this client whether or not to use transactions.
> +     * <tr><td>size          <td> The size of ping messages to use, in bytes.
> +     * </table>
> +     *
> +     * @param args The command line arguments as defined above.
> +     */
> +    public static void main(String[] args) throws Exception
> +    {
> +        // Extract the command line.
> +        if (args.length < 2)
> +        {
> +            System.err.println(
> +                "Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [persistent] [message size in bytes]");
> +            System.exit(0);
> +        }
> +
> +        String brokerDetails = args[0];
> +        String virtualpath = args[1];
> +        boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
> +        boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
> +        int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
> +
> +        // Create a connection to the broker.
> +        InetAddress address = InetAddress.getLocalHost();
> +        String clientID = address.getHostName() + System.currentTimeMillis();
> +
> +        Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
> +
> +        // Create a transactional or non-transactional session, based on the command line arguments.
> +        Session session;
> +
> +        if (transacted)
> +        {
> +            session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
> +        }
> +        else
> +        {
> +            session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> +        }
> +
> +        // Create a queue to send the pings on.
> +        Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
> +        MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
> +
> +        // Create a temporary queue to reply with the pongs on.
> +        Queue replyQueue = session.createTemporaryQueue();
> +
> +        // Create a message consumer to get the replies with.
> +        MessageConsumer consumer = session.createConsumer(replyQueue);
> +
> +        // Create a ping producer to handle the request/wait/reply cycle.
> +        _pingProducer = new PingPongProducer(session, replyQueue, producer, consumer, persistent, messageSize);
> +
> +        // Start the message consumers running.
> +        _connection.start();
> +
> +        // Create a shutdown hook to terminate the ping-pong producer.
> +        Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
> +
> +        // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
> +        _connection.setExceptionListener(_pingProducer);
> +        Thread pingThread = new Thread(_pingProducer);
> +        pingThread.run();
> +
> +        // Run until the ping loop is terminated.
> +        pingThread.join();
> +    }
> +
> +    /**
> +     * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
> +     * correlating reply may be waiting on.
> +     *
> +     * @param message The received message.
> +     */
> +    public void onMessage(Message message)
> +    {
> +        try
> +        {
> +            // Store the reply.
> +            String correlationID = message.getJMSCorrelationID();
> +            replies.put(correlationID, message);
> +
> +            // Turn the traffic light to green.
> +            BooleanLatch trafficLight = trafficLights.get(correlationID);
> +
> +            if (trafficLight != null)
> +            {
> +                trafficLight.signal();
> +            }
> +            else
> +            {
> +                _logger.debug("There was no thread waiting for reply: " + correlationID);
> +            }
> +        }
> +        catch (JMSException e)
> +        {
> +            _logger.warn("There was a JMSException: " + e.getMessage(), e);
> +        }
> +    }
> +
> +    /**
> +     * Sends the specified ping message and then waits for a correlating reply. If the wait times out before a reply
> +     * arrives, then a null reply is returned from this method.
> +     *
> +     * @param message The message to send.
> +     * @param timeout The timeout in milliseconds.
> +     *
> +     * @return The reply, or null if no reply arrives before the timeout.
> +     *
> +     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
> +     */
> +    public Message pingAndWaitForReply(Message message, long timeout) throws JMSException
> +    {
> +        _producer.send(message);
> +
> +        // Keep the messageId to correlate with the reply.
> +        String messageId = message.getJMSMessageID();
> +
> +        // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
> +        // this method, as the message will not be sent until the transaction is committed.
> +        commitTx();
> +
> +        // Block the current thread until a reply to the message is received, or it times out.
> +        BooleanLatch trafficLight = new BooleanLatch();
> +        trafficLights.put(messageId, trafficLight);
> +
> +        // Note that this call expects a timeout in nanoseconds, millisecond timeout is multiplied up.
> +        trafficLight.await(timeout * 1000);
> +
> +        // Check the replies to see if one was generated, if not then the reply timed out.
> +        Message result = replies.get(messageId);
> +
> +        return result;
> +    }
> +
> +    /**
> +     * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
> +     * connection, this clears the publish flag which in turn will halt the ping loop.
> +     *
> +     * @param e The exception that triggered this callback method.
> +     */
> +    public void onException(JMSException e)
> +    {
> +        _publish = false;
> +        _logger.debug("There was a JMSException: " + e.getMessage(), e);
> +    }
> +
> +    /**
> +     * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
> +     * waits for replies and inserts short pauses in between each.
> +     */
> +    public void pingLoop()
> +    {
> +        try
> +        {
> +            // Generate a sample message and time stamp it.
> +            ObjectMessage msg = getTestMessage(_session, _replyQueue, _messageSize, System.currentTimeMillis(), _persistent);
> +            msg.setLongProperty("timestamp", System.currentTimeMillis());
> +
> +            // Send the message and wait for a reply.
> +            pingAndWaitForReply(msg, TIMEOUT);
> +
> +            // Introduce a short pause if desired.
> +            pause(SLEEP_TIME);
> +        }
> +        catch (JMSException e)
> +        {
> +            _publish = false;
> +            _logger.debug("There was a JMSException: " + e.getMessage(), e);
> +        }
> +    }
> +}
> 
> Added: incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java?view=auto&rev=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (added)
> +++ incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Tue Jan 16 08:44:17 2007
> @@ -0,0 +1,245 @@
> +package org.apache.qpid.requestreply;
> +
> +import java.net.InetAddress;
> +import java.util.Properties;
> +
> +import javax.jms.*;
> +
> +import junit.framework.Assert;
> +import junit.framework.TestCase;
> +
> +import org.apache.log4j.Logger;
> +import org.apache.log4j.NDC;
> +
> +import org.apache.qpid.client.AMQConnection;
> +import org.apache.qpid.client.AMQQueue;
> +import org.apache.qpid.jms.Connection;
> +import org.apache.qpid.jms.MessageProducer;
> +import org.apache.qpid.jms.Session;
> +
> +/**
> + * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
> + * many times simultaneously. A full round trip ping sends a message from a producer to a conumer, then the consumer
> + * replies to the message on a temporary queue.
> + *
> + * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
> + * full round trip ping. This test may be scaled up using a suitable JUnit test runner. See {@link TKTestRunner} or
> + * {@link PPTestRunner} for more information on how to do this.
> + *
> + * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
> + * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
> + * except if the connection is lost in which case an attempt to re-establish the setup is made.
> + *
> + * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
> + * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
> + * temporary queue.
> + *
> + * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
> + *
> + * <p><table id="crc"><caption>CRC Card</caption>
> + * <tr><th> Responsibilities <th> Collaborations
> + * </table>
> + *
> + * @author Rupert Smith
> + */
> +public class PingPongTestPerf extends TestCase implements ExceptionListener //, TimingControllerAware
> +{
> +    private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
> +
> +    /** Holds the name of the property to get the test message size from. */
> +    private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
> +
> +    /** Holds the name of the property to get the ping queue name from. */
> +    private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
> +
> +    /** Holds the name of the property to get the test delivery mode from. */
> +    private static final String PERSISTENT_MODE_PROPNAME = "persistent";
> +
> +    /** Holds the name of the property to get the test transactional mode from. */
> +    private static final String TRANSACTED_PROPNAME = "transacted";
> +
> +    /** Holds the name of the property to get the test broker url from. */
> +    private static final String BROKER_PROPNAME = "broker";
> +
> +    /** Holds the name of the property to get the test broker virtual path. */
> +    private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
> +
> +    /** Holds the size of message body to attach to the ping messages. */
> +    private static final int MESSAGE_SIZE_DEFAULT = 0;
> +
> +    /** Holds the name of the queue to which pings are sent. */
> +    private static final String PING_QUEUE_NAME_DEFAULT = "ping";
> +
> +    /** Holds the message delivery mode to use for the test. */
> +    private static final boolean PERSISTENT_MODE_DEFAULT = false;
> +
> +    /** Holds the transactional mode to use for the test. */
> +    private static final boolean TRANSACTED_DEFAULT = false;
> +
> +    /** Holds the default broker url for the test. */
> +    private static final String BROKER_DEFAULT = "tcp://localhost:5672";
> +
> +    /** Holds the default virtual path for the test. */
> +    private static final String VIRTUAL_PATH_DEFAULT = "/test";
> +
> +    /** Sets a default ping timeout. */
> +    private static final long TIMEOUT = 3000;
> +
> +    // Sets up the test parameters with defaults.
> +    static
> +    {
> +        setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
> +        setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
> +        setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
> +        setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
> +        setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
> +        setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
> +    }
> +
> +    /** Holds the test ping-pong producer. */
> +    private PingPongProducer _testPingProducer;
> +
> +    // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
> +    // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
> +    // of the test parameters to log with the results.
> +    private Properties testParameters = System.getProperties();
> +    //private Properties testParameters = new ContextualProperties(System.getProperties());
> +
> +    /** Holds the connection to the broker. */
> +    private Connection _connection = null;
> +
> +    /** Holds the current session to the broker. */
> +    private Session _session;
> +
> +    /** Holds the destination to send the ping messages to. */
> +    private Queue _pingQueue;
> +
> +    /** Holds the destination to send replies to. */
> +    private Queue _replyQueue;
> +
> +    /** Holds a message producer, set up on the ping destination, to send messages through. */
> +    private MessageProducer _producer;
> +
> +    /** Holds a message consumer, set up on the ping destination, to receive pings through. */
> +    private MessageConsumer _pingConsumer;
> +
> +    /** Holds a message consumer, set up on the pong destination, to receive replies through. */
> +    private MessageConsumer _pongConsumer;
> +
> +    /** Holds a failure flag, which gets set if the connection to the broker goes down. */
> +    private boolean _failure;
> +
> +    public PingPongTestPerf(String name)
> +    {
> +        super(name);
> +    }
> +
> +    private static void setSystemPropertyIfNull(String propName, String propValue)
> +    {
> +        if (System.getProperty(propName) == null)
> +        {
> +            System.setProperty(propName, propValue);
> +        }
> +    }
> +
> +    public void testPingPongOk() throws Exception
> +    {
> +        // Generate a sample message. This message is already time stamped and has its reply-to destination set.
> +        ObjectMessage msg =
> +            PingPongProducer.getTestMessage(_session, _replyQueue,
> +                                            Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)),
> +                                            System.currentTimeMillis(),
> +                                            Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)));
> +
> +        // Use the test timing controller to reset the test timer now and obtain the current time.
> +        // This can be used to remove the message creation time from the test.
> +        //TestTimingController timingUtils = getTimingController();
> +        //long startTime = timingUtils.restart();
> +
> +        // Send the message and wait for a reply.
> +        Message reply = _testPingProducer.pingAndWaitForReply(msg, TIMEOUT);
> +
> +        // Fail the test if the timeout was exceeded.
> +        if (reply == null)
> +        {
> +            Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID());
> +        }
> +    }
> +
> +    /**
> +     * This is a callback method that is registered to receive any JMSExceptions that occurr on the connection to
> +     * the broker. It sets a failure flag to indicate that there is an error condition.
> +     *
> +     * @param e The JMSException that triggered this callback method.
> +     *
> +     * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
> +     */
> +    public void onException(JMSException e)
> +    {
> +        // Set the failure flag.
> +        _failure = true;
> +
> +        _logger.debug("There was a JMSException: " + e.getMessage(), e);
> +    }
> +
> +    protected void setUp() throws Exception
> +    {
> +        // Log4j will propagate the test name as a thread local in all log output.
> +        NDC.push(getName());
> +
> +        // Ensure that the connection, session and ping queue are established, if they have not already been.
> +        if (_connection == null)
> +        {
> +            // Create a client id that identifies the client machine.
> +            String clientID = InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
> +
> +            // Connect to the broker.
> +            _connection = new AMQConnection(testParameters.getProperty(BROKER_PROPNAME), "guest", "guest", clientID,
> +                                            testParameters.getProperty(VIRTUAL_PATH_PROPNAME));
> +            _connection.setExceptionListener(this);
> +
> +            // Create a transactional or non-transactional session, based on the test properties, if a session has not
> +            // already been created.
> +            if (Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)))
> +            {
> +                _session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
> +            }
> +            else
> +            {
> +                _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> +            }
> +
> +            // Create a queue to send the pings on.
> +            _pingQueue = new AMQQueue(testParameters.getProperty(PING_QUEUE_NAME_PROPNAME));
> +            _producer = (MessageProducer) _session.createProducer(_pingQueue);
> +
> +            // Create a temporary queue to reply with the pongs on.
> +            _replyQueue = _session.createTemporaryQueue();
> +
> +            // Create the ping and pong consumers on their respective destinations.
> +            _pingConsumer = _session.createConsumer(_pingQueue);
> +            _pongConsumer = _session.createConsumer(_replyQueue);
> +
> +            // Establish a bounce back client on the ping queue to bounce back the pings.
> +            new org.apache.qpid.requestreply.PingPongClient(_session, _pingConsumer, false);
> +
> +            // Establish a ping-pong client on the ping queue to send pings and wait for replies.
> +            _testPingProducer = new org.apache.qpid.requestreply.PingPongProducer(_session, _replyQueue, _producer,
> +                                                                                  _pongConsumer);
> +
> +            _connection.start();
> +        }
> +    }
> +
> +    protected void tearDown() throws Exception
> +    {
> +        try
> +        {
> +            _connection.close();
> +        }
> +        finally
> +        {
> +            NDC.pop();
> +        }
> +    }
> +}
> 
> Modified: incubator/qpid/trunk/qpid/java/pom.xml
> URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/pom.xml?view=diff&rev=496753&r1=496752&r2=496753
> ==============================================================================
> --- incubator/qpid/trunk/qpid/java/pom.xml (original)
> +++ incubator/qpid/trunk/qpid/java/pom.xml Tue Jan 16 08:44:17 2007
> @@ -337,6 +337,15 @@
>                          </execution>
>                      </executions>
>                  </plugin>
> +
> +		<!-- The JUnit Toolkit maven2 plugin is in the process of being added to the maven repository. It will take a day or two from 16/1/2007.
> +                <plugin>
> +                    <groupId>uk.co.thebadgerset</groupId>
> +                    <artifactId>junit-toolkit-maven-plugin</artifactId>
> +                    <version>0.3</version>
> +		</plugin>
> +		-->
> +
>              </plugins>
>          </pluginManagement>
>  
> @@ -437,14 +446,12 @@
>                  <scope>test</scope>
>              </dependency>
>  
> -            <!-- Will be added to maven repo soon. JUnit test runner that can add repeats/concurrenct/timing etc to tests.
>              <dependency>
>                  <groupId>uk.co.thebadgerset</groupId>
>                  <artifactId>junit-toolkit</artifactId>
>                  <version>0.3</version>
>                  <scope>test</scope>
>              </dependency>
> -            -->
>  
>              <!-- Qpid Version Dependencies -->
>              <dependency>
> 
> 
> 


Mime
View raw message