james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Norman Maurer <norman.mau...@googlemail.com>
Subject Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Date Sat, 24 Dec 2011 10:53:18 GMT
Hi Eric,

comments inside....

Am 24.12.2011 um 10:05 schrieb Eric Charles <eric@apache.org>:

> Hi Stephano,
> 
> Opening the discussion to learn more :)
> 
> - Why are you considering that 2 threads is a criteria to use standard synchronization
rather than some atomic fields.
> 
If you only have a small count of concurrent threads its not slower to use synchronization
as the context switching will not happen often..


> - I can understand you replace a concurrent by a non-concurrent queue. However, you now
have a blocking queue. Is there an impact due to this blocking aspect?

Nope there is not as we not use the blocking methods. We could even replace it with a LinkedList.

> 
> - You defined isAsync as volatile and sometimes encapsulate access to isAsync in a synchronized
block, sometime not. Why using 2 different thread-safety strategies in this class?
> 
If you only need to access a "status flag" ina concurrent way then its more cheap to just
use a volatile for it. If you need to update more then one field in a "atomic" way you need
synchronized. Updating a volatile in a synchronized is not a problem...


> Thx,
> 
> Eric
> 
> 
Hope it helps,
Norman


> On 21/12/11 15:47, bago@apache.org wrote:
>> Author: bago
>> Date: Wed Dec 21 14:47:25 2011
>> New Revision: 1221748
>> 
>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>> Log:
>> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back
to standard synchronization as we only have max 2 threads competing for the queue so it doesn't
make sense to use a non blocking queue. Norman, please overview, and feel free to revert if
you don't like the solution (i thought it was better to simply commit instead of opening a
JIRA to show you this).
>> 
>> Modified:
>>     james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> 
>> Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>> ==============================================================================
>> --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
(original)
>> +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Wed Dec 21 14:47:25 2011
>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>  import java.io.InputStream;
>>  import java.io.UnsupportedEncodingException;
>>  import java.util.List;
>> -import java.util.concurrent.ConcurrentLinkedQueue;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>> -
>> +import java.util.Queue;
>> +import java.util.concurrent.LinkedBlockingQueue;
>> 
>>  import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>> 
>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>> 
>> 
>>      // TODO: Should we limit the size ?
>> -    private final ConcurrentLinkedQueue<Response>  responses = new ConcurrentLinkedQueue<Response>();
>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>> +    private final Queue<Response>  responses = new LinkedBlockingQueue<Response>();
>> +    private volatile boolean isAsync = false;
>> 
>>      /**
>>       * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
org.apache.james.protocols.api.ProtocolSession)
>>       */
>>      public final void writeResponse(Response response, final ProtocolSession session)
{
>> -        // just add the response to the queue. We will trigger the write operation
later
>> -        responses.add(response);
>> -
>> -        // trigger the write
>> -        writeQueuedResponses(session);
>> +        // if we already in asynchrnous mode we simply enqueue the response
>> +        // we do this synchronously because we may have a dequeuer thread working
on
>> +        // isAsync and responses.
>> +        boolean enqueued = false;
>> +        synchronized(this) {
>> +            if (isAsync == true) {
>> +                responses.offer(response);
>> +                enqueued = true;
>> +            }
>> +        }
>> +
>> +        // if we didn't enqueue then we check if the response is writable or we
have to
>> +        // set us "asynchrnous" and wait for response to be ready.
>> +        if (!enqueued) {
>> +            if (isResponseWritable(response)) {
>> +                writeResponseToClient(response, session);
>> +            } else {
>> +                addDequeuerListener(response, session);
>> +                isAsync = true;
>> +            }
>> +        }
>>      }
>> 
>>      /**
>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>       * @param session
>>       */
>>      private  void writeQueuedResponses(final ProtocolSession session) {
>> -        Response queuedResponse = null;
>> 
>> -        if (write.compareAndSet(false, true)){
>> -            boolean listenerAdded = false;
>> -            // dequeue Responses until non is left
>> -            while ((queuedResponse = responses.poll()) != null) {
>> -
>> -                // check if we need to take special care of FutureResponses
>> -                if (queuedResponse instanceof FutureResponse) {
>> -                    FutureResponse futureResponse =(FutureResponse) queuedResponse;
>> -                    if (futureResponse.isReady()) {
>> -                        // future is ready so we can write it without blocking the
IO-Thread
>> -                        writeResponseToClient(queuedResponse, session);
>> -                    } else {
>> -
>> -                        // future is not ready so we need to write it via a ResponseListener
otherwise we MAY block the IO-Thread
>> -                        futureResponse.addListener(new ResponseListener() {
>> -
>> -                            public void onResponse(FutureResponse response) {
>> -                                writeResponseToClient(response, session);
>> -                                if (write.compareAndSet(true, false)) {
>> -                                    writeQueuedResponses(session);
>> -                                }
>> -                            }
>> -                        });
>> -                        listenerAdded = true;
>> -                        // just break here as we will trigger the dequeue later
>> -                        break;
>> -                    }
>> -
>> -                } else {
>> -                    // the Response is not a FutureResponse, so just write it back
the the remote peer
>> -                    writeResponseToClient(queuedResponse, session);
>> +        // dequeue Responses until non is left
>> +        while (true) {
>> +
>> +            Response queuedResponse = null;
>> +
>> +            // synchrnously we check responses and if it is empty we move back to
non asynch
>> +            // behaviour
>> +            synchronized(this) {
>> +                queuedResponse = responses.poll();
>> +                if (queuedResponse == null) {
>> +                    isAsync = false;
>> +                    break;
>>                  }
>> -
>>              }
>> -            // Check if a ResponseListener was added before. If not we can allow
to write
>> -            // responses again. Otherwise the writing will get triggered from the
listener
>> -            if (listenerAdded == false) {
>> -                write.set(false);
>> +
>> +            // if we have something in the queue we continue writing until we
>> +            // find something asynchronous.
>> +            if (isResponseWritable(queuedResponse)) {
>> +                writeResponseToClient(queuedResponse, session);
>> +            } else {
>> +                addDequeuerListener(queuedResponse, session);
>> +                // no changes to isAsync here, because in this method we are always
already async.
>> +                break;
>>              }
>>          }
>> -
>> -
>> +    }
>> +
>> +    private boolean isResponseWritable(Response response) {
>> +        return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
>> +    }
>> +
>> +    private void addDequeuerListener(Response response, final ProtocolSession session)
{
>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>> +
>> +            public void onResponse(FutureResponse response) {
>> +                writeResponseToClient(response, session);
>> +                writeQueuedResponses(session);
>> +            }
>> +        });
>>      }
>> 
>>      /**
>> 
>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>> For additional commands, e-mail: server-dev-help@james.apache.org
>> 
> 
> -- 
> Eric http://about.echarles.net
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message