james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Charles <e...@apache.org>
Subject Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Date Sat, 24 Dec 2011 09:05:33 GMT
Hi Stephano,

Opening the discussion to learn more :)

- Why are you considering that 2 threads is a criteria to use standard 
synchronization rather than some atomic fields.

- I can understand you replace a concurrent by a non-concurrent queue. 
However, you now have a blocking queue. Is there an impact due to this 
blocking aspect?

- You defined isAsync as volatile and sometimes encapsulate access to 
isAsync in a synchronized block, sometime not. Why using 2 different 
thread-safety strategies in this class?

Thx,

Eric


On 21/12/11 15:47, bago@apache.org wrote:
> Author: bago
> Date: Wed Dec 21 14:47:25 2011
> New Revision: 1221748
>
> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
> Log:
> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back to standard
synchronization as we only have max 2 threads competing for the queue so it doesn't make sense
to use a non blocking queue. Norman, please overview, and feel free to revert if you don't
like the solution (i thought it was better to simply commit instead of opening a JIRA to show
you this).
>
> Modified:
>      james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>
> Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
> URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
> ==============================================================================
> --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
(original)
> +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Wed Dec 21 14:47:25 2011
> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>   import java.io.InputStream;
>   import java.io.UnsupportedEncodingException;
>   import java.util.List;
> -import java.util.concurrent.ConcurrentLinkedQueue;
> -import java.util.concurrent.atomic.AtomicBoolean;
> -
> +import java.util.Queue;
> +import java.util.concurrent.LinkedBlockingQueue;
>
>   import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>
> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>
>
>       // TODO: Should we limit the size ?
> -    private final ConcurrentLinkedQueue<Response>  responses = new ConcurrentLinkedQueue<Response>();
> -    private final AtomicBoolean write = new AtomicBoolean(false);
> +    private final Queue<Response>  responses = new LinkedBlockingQueue<Response>();
> +    private volatile boolean isAsync = false;
>
>       /**
>        * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
org.apache.james.protocols.api.ProtocolSession)
>        */
>       public final void writeResponse(Response response, final ProtocolSession session)
{
> -        // just add the response to the queue. We will trigger the write operation later
> -        responses.add(response);
> -
> -        // trigger the write
> -        writeQueuedResponses(session);
> +        // if we already in asynchrnous mode we simply enqueue the response
> +        // we do this synchronously because we may have a dequeuer thread working on
> +        // isAsync and responses.
> +        boolean enqueued = false;
> +        synchronized(this) {
> +            if (isAsync == true) {
> +                responses.offer(response);
> +                enqueued = true;
> +            }
> +        }
> +
> +        // if we didn't enqueue then we check if the response is writable or we have
to
> +        // set us "asynchrnous" and wait for response to be ready.
> +        if (!enqueued) {
> +            if (isResponseWritable(response)) {
> +                writeResponseToClient(response, session);
> +            } else {
> +                addDequeuerListener(response, session);
> +                isAsync = true;
> +            }
> +        }
>       }
>
>       /**
> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>        * @param session
>        */
>       private  void writeQueuedResponses(final ProtocolSession session) {
> -        Response queuedResponse = null;
>
> -        if (write.compareAndSet(false, true)){
> -            boolean listenerAdded = false;
> -            // dequeue Responses until non is left
> -            while ((queuedResponse = responses.poll()) != null) {
> -
> -                // check if we need to take special care of FutureResponses
> -                if (queuedResponse instanceof FutureResponse) {
> -                    FutureResponse futureResponse =(FutureResponse) queuedResponse;
> -                    if (futureResponse.isReady()) {
> -                        // future is ready so we can write it without blocking the IO-Thread
> -                        writeResponseToClient(queuedResponse, session);
> -                    } else {
> -
> -                        // future is not ready so we need to write it via a ResponseListener
otherwise we MAY block the IO-Thread
> -                        futureResponse.addListener(new ResponseListener() {
> -
> -                            public void onResponse(FutureResponse response) {
> -                                writeResponseToClient(response, session);
> -                                if (write.compareAndSet(true, false)) {
> -                                    writeQueuedResponses(session);
> -                                }
> -                            }
> -                        });
> -                        listenerAdded = true;
> -                        // just break here as we will trigger the dequeue later
> -                        break;
> -                    }
> -
> -                } else {
> -                    // the Response is not a FutureResponse, so just write it back the
the remote peer
> -                    writeResponseToClient(queuedResponse, session);
> +        // dequeue Responses until non is left
> +        while (true) {
> +
> +            Response queuedResponse = null;
> +
> +            // synchrnously we check responses and if it is empty we move back to non
asynch
> +            // behaviour
> +            synchronized(this) {
> +                queuedResponse = responses.poll();
> +                if (queuedResponse == null) {
> +                    isAsync = false;
> +                    break;
>                   }
> -
>               }
> -            // Check if a ResponseListener was added before. If not we can allow to
write
> -            // responses again. Otherwise the writing will get triggered from the listener
> -            if (listenerAdded == false) {
> -                write.set(false);
> +
> +            // if we have something in the queue we continue writing until we
> +            // find something asynchronous.
> +            if (isResponseWritable(queuedResponse)) {
> +                writeResponseToClient(queuedResponse, session);
> +            } else {
> +                addDequeuerListener(queuedResponse, session);
> +                // no changes to isAsync here, because in this method we are always
already async.
> +                break;
>               }
>           }
> -
> -
> +    }
> +
> +    private boolean isResponseWritable(Response response) {
> +        return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
> +    }
> +
> +    private void addDequeuerListener(Response response, final ProtocolSession session)
{
> +        ((FutureResponse) response).addListener(new ResponseListener() {
> +
> +            public void onResponse(FutureResponse response) {
> +                writeResponseToClient(response, session);
> +                writeQueuedResponses(session);
> +            }
> +        });
>       }
>
>       /**
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
>

-- 
Eric http://about.echarles.net

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message