james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Charles <e...@apache.org>
Subject Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Date Sat, 24 Dec 2011 12:50:17 GMT
Hi Norman,
Thx for inputs. comment/confirmation inside.
Eric

On 24/12/11 11:53, Norman Maurer wrote:
> Hi Eric,
>
> comments inside....
>
> Am 24.12.2011 um 10:05 schrieb Eric Charles<eric@apache.org>:
>
>> Hi Stephano,
>>
>> Opening the discussion to learn more :)
>>
>> - Why are you considering that 2 threads is a criteria to use standard synchronization
rather than some atomic fields.
>>
> If you only have a small count of concurrent threads its not slower to use synchronization
as the context switching will not happen often..
>
>

Not slower, but not faster.


>> - I can understand you replace a concurrent by a non-concurrent queue. However, you
now have a blocking queue. Is there an impact due to this blocking aspect?
>
> Nope there is not as we not use the blocking methods. We could even replace it with a
LinkedList.
>

So why not replace with a LinkedList to make things crystal clear :)

>>
>> - You defined isAsync as volatile and sometimes encapsulate access to isAsync in
a synchronized block, sometime not. Why using 2 different thread-safety strategies in this
class?
>>
> If you only need to access a "status flag" ina concurrent way then its more cheap to
just use a volatile for it. If you need to update more then one field in a "atomic" way you
need synchronized. Updating a volatile in a synchronized is not a problem...
>

Sure. I will further look at the usage context (the 2 user threads) to 
have a better idea.

Thx again Norman,

Eric

>
>> Thx,
>>
>> Eric
>>
>>
> Hope it helps,
> Norman
>
>
>> On 21/12/11 15:47, bago@apache.org wrote:
>>> Author: bago
>>> Date: Wed Dec 21 14:47:25 2011
>>> New Revision: 1221748
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>>> Log:
>>> An attempt to refactor AbstractProtocolTransport to be thread safe. I moved back
to standard synchronization as we only have max 2 threads competing for the queue so it doesn't
make sense to use a non blocking queue. Norman, please overview, and feel free to revert if
you don't like the solution (i thought it was better to simply commit instead of opening a
JIRA to show you this).
>>>
>>> Modified:
>>>      james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>>
>>> Modified: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>> URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>>> ==============================================================================
>>> --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
(original)
>>> +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Wed Dec 21 14:47:25 2011
>>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>>   import java.io.InputStream;
>>>   import java.io.UnsupportedEncodingException;
>>>   import java.util.List;
>>> -import java.util.concurrent.ConcurrentLinkedQueue;
>>> -import java.util.concurrent.atomic.AtomicBoolean;
>>> -
>>> +import java.util.Queue;
>>> +import java.util.concurrent.LinkedBlockingQueue;
>>>
>>>   import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>>>
>>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>>>
>>>
>>>       // TODO: Should we limit the size ?
>>> -    private final ConcurrentLinkedQueue<Response>   responses = new ConcurrentLinkedQueue<Response>();
>>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>>> +    private final Queue<Response>   responses = new LinkedBlockingQueue<Response>();
>>> +    private volatile boolean isAsync = false;
>>>
>>>       /**
>>>        * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
org.apache.james.protocols.api.ProtocolSession)
>>>        */
>>>       public final void writeResponse(Response response, final ProtocolSession
session) {
>>> -        // just add the response to the queue. We will trigger the write operation
later
>>> -        responses.add(response);
>>> -
>>> -        // trigger the write
>>> -        writeQueuedResponses(session);
>>> +        // if we already in asynchrnous mode we simply enqueue the response
>>> +        // we do this synchronously because we may have a dequeuer thread working
on
>>> +        // isAsync and responses.
>>> +        boolean enqueued = false;
>>> +        synchronized(this) {
>>> +            if (isAsync == true) {
>>> +                responses.offer(response);
>>> +                enqueued = true;
>>> +            }
>>> +        }
>>> +
>>> +        // if we didn't enqueue then we check if the response is writable or
we have to
>>> +        // set us "asynchrnous" and wait for response to be ready.
>>> +        if (!enqueued) {
>>> +            if (isResponseWritable(response)) {
>>> +                writeResponseToClient(response, session);
>>> +            } else {
>>> +                addDequeuerListener(response, session);
>>> +                isAsync = true;
>>> +            }
>>> +        }
>>>       }
>>>
>>>       /**
>>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>>        * @param session
>>>        */
>>>       private  void writeQueuedResponses(final ProtocolSession session) {
>>> -        Response queuedResponse = null;
>>>
>>> -        if (write.compareAndSet(false, true)){
>>> -            boolean listenerAdded = false;
>>> -            // dequeue Responses until non is left
>>> -            while ((queuedResponse = responses.poll()) != null) {
>>> -
>>> -                // check if we need to take special care of FutureResponses
>>> -                if (queuedResponse instanceof FutureResponse) {
>>> -                    FutureResponse futureResponse =(FutureResponse) queuedResponse;
>>> -                    if (futureResponse.isReady()) {
>>> -                        // future is ready so we can write it without blocking
the IO-Thread
>>> -                        writeResponseToClient(queuedResponse, session);
>>> -                    } else {
>>> -
>>> -                        // future is not ready so we need to write it via a
ResponseListener otherwise we MAY block the IO-Thread
>>> -                        futureResponse.addListener(new ResponseListener() {
>>> -
>>> -                            public void onResponse(FutureResponse response)
{
>>> -                                writeResponseToClient(response, session);
>>> -                                if (write.compareAndSet(true, false)) {
>>> -                                    writeQueuedResponses(session);
>>> -                                }
>>> -                            }
>>> -                        });
>>> -                        listenerAdded = true;
>>> -                        // just break here as we will trigger the dequeue later
>>> -                        break;
>>> -                    }
>>> -
>>> -                } else {
>>> -                    // the Response is not a FutureResponse, so just write it
back the the remote peer
>>> -                    writeResponseToClient(queuedResponse, session);
>>> +        // dequeue Responses until non is left
>>> +        while (true) {
>>> +
>>> +            Response queuedResponse = null;
>>> +
>>> +            // synchrnously we check responses and if it is empty we move back
to non asynch
>>> +            // behaviour
>>> +            synchronized(this) {
>>> +                queuedResponse = responses.poll();
>>> +                if (queuedResponse == null) {
>>> +                    isAsync = false;
>>> +                    break;
>>>                   }
>>> -
>>>               }
>>> -            // Check if a ResponseListener was added before. If not we can allow
to write
>>> -            // responses again. Otherwise the writing will get triggered from
the listener
>>> -            if (listenerAdded == false) {
>>> -                write.set(false);
>>> +
>>> +            // if we have something in the queue we continue writing until we
>>> +            // find something asynchronous.
>>> +            if (isResponseWritable(queuedResponse)) {
>>> +                writeResponseToClient(queuedResponse, session);
>>> +            } else {
>>> +                addDequeuerListener(queuedResponse, session);
>>> +                // no changes to isAsync here, because in this method we are
always already async.
>>> +                break;
>>>               }
>>>           }
>>> -
>>> -
>>> +    }
>>> +
>>> +    private boolean isResponseWritable(Response response) {
>>> +        return !(response instanceof FutureResponse) || ((FutureResponse) response).isReady();
>>> +    }
>>> +
>>> +    private void addDequeuerListener(Response response, final ProtocolSession
session) {
>>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>>> +
>>> +            public void onResponse(FutureResponse response) {
>>> +                writeResponseToClient(response, session);
>>> +                writeQueuedResponses(session);
>>> +            }
>>> +        });
>>>       }
>>>
>>>       /**
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>>> For additional commands, e-mail: server-dev-help@james.apache.org
>>>
>>
>> --
>> Eric http://about.echarles.net
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>> For additional commands, e-mail: server-dev-help@james.apache.org
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
>

-- 
Eric http://about.echarles.net

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message