mina-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brad Harvey <harve...@gmail.com>
Subject Re: ReadFuture (Was: Re: New feature in 2.0: Request-Response Filter)
Date Sat, 12 May 2007 12:21:37 GMT
On 4/12/07, Maarten Bosteels <mbosteels.dns@... 
<http://www.nabble.com/user/SendEmail.jtp?type=post&post=9957679&i=0>> 
wrote:
 > Suppose you have a request-response protocol and you want to 
implement the
 > client synchronously:
 > connect, send a request and wait until the response comes in.
 >
 > What is currently the best way to go ?

Hi All,

Just saw this while I was doing some browsing - hope I'm not jumping in 
too late.  I had a crack at a client side synchronous API for Mina a 
while ago (pre 1.0), but never had a real project to try it out on so it 
fell by the wayside.  Below is an overview of how a client would use it, 
let me know if you'd like the implementation and I'll submit a JIRA issue.

Regards,
Brad.

Synchronous send & receive is achieved by wrapping an IoSession with a 
"Connection".  Client code then simply calls send and receive methods on 
the connection.  Both blocking and non blocking calls are supported with 
the introduction of ReadFuture and TimedReadFuture interfaces.

Here's an example client that runs against the EchoServer.

// Setup an IoServiceConfig for your client connection as per normal.
InetSocketAddress socketAddress = new InetSocketAddress("localhost", 8080);
IoServiceConfig config = new SocketConnectorConfig();
DefaultIoFilterChainBuilder filterChainBuilder = new 
DefaultIoFilterChainBuilder();
filterChainBuilder.addLast("TextLine", new ProtocolCodecFilter(new 
TextLineCodecFactory()));
config.setFilterChainBuilder(filterChainBuilder);

// The ConnectionFactory creates new synchronous connections with the 
given address & config.
ConnectionFactory cf = new ConnectionFactoryImpl(socketAddress, config);
// createConnection does the actual connect (blocking only).
NonBlockingConnection connection = cf.createConnection();
try {

    // blocking send and receive
    connection.send("Hello There!");
    System.out.println(connection.receive());

    // blocking with timeout
    connection.send("Hi", 1000);
    System.out.println(connection.receive(1000));

    // or non blocking. 
    WriteFuture writeFuture = connection.sendNoWait("Hello There!");
    writeFuture.join();
    ReadFuture readFuture = connection.receiveNoWait();
    readFuture.join();
    System.out.println(readFuture.getMessage());
   
    // All styles can be mixed and matched.
    connection.sendNoWait("1");
    connection.sendNoWait("2");
    connection.send("3");
    readFuture = connection.receiveNoWait();
    System.out.println(connection.receive());
    System.out.println(connection.receive(1000));
    readFuture.join();
    System.out.println(readFuture.getMessage());
} finally {
    connection.close();
}

Output of above:

Hello There!
Hi
Hello There!
2
3
1

Under the covers, Connection wraps an IoHandler that uses a blocking 
message queue.  If you're wondering about the naming 
(ConnectionFactory/Connection), I was also planning a JCA resource 
adapter which would offer pooled connections.  Unfortunately I only got 
half way or so with that part.

Here are the important interfaces. 

/**
 * A connection over which messages can be sent and received in a 
synchronous
 * manner.
 */
public interface Connection {

    /**
     * Send a message.
     * @param message message to send
     * @throws RuntimeIOException if the message cannot be sent.
     */
    public void send(Object message) throws RuntimeIOException;

    /**
     * Send a message.
     * @param message message to send.
     * @param timeoutInMillis ms to wait for the message to be sent
     * @throws RuntimeIOException if the message cannot be sent within 
the specified timeout.
     */
    public void send(Object message, long timeoutInMillis) throws 
RuntimeIOException;
   
    /**
     * Wait for a message to arrive.
     * @return message received.
     * @throws RuntimeIOException if no message could be read.
     */
    public Object receive() throws RuntimeIOException;

    /**
     * Wait for a message to arrive for up to timeoutInMillis.
     * @param timeoutInMillis ms to wait for message.
     * @return the received message
     * @throws RuntimeIOException if no message could be read within the 
specified timeout.
     */
    public Object receive(final long timeoutInMillis) throws 
RuntimeIOException;

    /**
     * Close the connection.  Further operations on this connection will 
fail.
     */
    public void close();
   
    /**
     * @return true if the connection is closed or in the process of 
closing.
     */
    public boolean isClosing();
}


/**
 * Adds non blocking read and write methods to the standard connections.
 */
public interface NonBlockingConnection extends Connection {

    /**
     * @param message message to send
     * @return a WriteFuture representing pending completion of the write.
     */
    public WriteFuture sendNoWait(Object message);

    /**
     * @return a read future representing pending completion of the read.
     * @throws RuntimeIOException if it isn't possible to receive.
     */
    public ReadFuture receiveNoWait() throws RuntimeIOException;

    /**
     * The "noWait" in the method name refers to this method returning 
immediately.
     * The receive will wait in the background for up to timeoutInMillis 
for a
     * message to arrive.
     * @param timeoutInMillis milliseconds to wait for the receive to 
complete.
     * @return a timed read future representing pending completion of 
the read.
     * @throws RuntimeIOException if it isn't possible to receive.
     */
    public TimedReadFuture receiveNoWait(final long timeoutInMillis)
            throws RuntimeIOException;

    /**
     * Close the connection without waiting for completion.
     * @return a close future representing pending completion of the close.
     */
    public CloseFuture closeNoWait();
   
    /**
     * @return the underlying IoSession
     */
    public IoSession getIoSession();

}



/**
 * The result of an asynchronous read operation.
 */
public interface ReadFuture extends IoFuture {
    /**
     * @return true if something has been read.  Either a message was 
received or
     * an exception occured.
     */
    public boolean isValueRead();
   
    /**
     * @return the object received.
     * @throws RuntimeIOException if isException is true.
     */
    public Object getMessage() throws RuntimeIOException;
   
    /**
     * @return true if an exception was encountered.  This could be due 
to a socket event
     * or a problem with filters.
     */
    public boolean isException();
   
    /**
     * @return true if connection is closed (isException will also be true).
     */
    public boolean isClose();
   
}


Mime
View raw message