james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Charles <e...@apache.org>
Subject Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Date Sat, 24 Dec 2011 12:58:38 GMT
Hi Stephano, Thx for the inputs. I'm fine to have volatile fields and to 
use them is synchronized blocks when needed, and out-of synchronized 
blocks when possible.

I still have to find the few hours to better understand the usage and 
context.

Maybe you can give me a hint about about the 2 user threads?

Is there a test case that ensures the class is thread-safe (or sould I 
ask 'is it feasible to have a test for this')?

How did you came to the conclusion it was not thread-safe: pure code 
review, or exceptions/abnormal behavior in an operational deployment?

I believe the beauty resides in the easy-understanding. For example, if 
this class is designed to be used by only 2 threads, it should come out 
for the reader from javadoc, method, fields... namings.

Thx,

Eric


On 24/12/11 12:19, Stefano Bagnara wrote:
> 2011/12/24 Eric Charles<eric@apache.org>:
>> Hi Stephano,
>>
>> Opening the discussion to learn more :)
>>
>> - Why are you considering that 2 threads is a criteria to use standard
>> synchronization rather than some atomic fields.
>
> It is a criteria to not use the ConcurrentLinkedQueue that is a
> structure thought to handle many concurrent threads and is overkill
> for 2 threads.
>
>> - I can understand you replace a concurrent by a non-concurrent queue.
>> However, you now have a blocking queue. Is there an impact due to this
>> blocking aspect?
>
> I think the answer is no. That's why I did that.
> Remember we have 2 threads and they do 2 different things, they simply
> block each other when they add or remove from the queue.
>
>> - You defined isAsync as volatile and sometimes encapsulate access to
>> isAsync in a synchronized block, sometime not. Why using 2 different
>> thread-safety strategies in this class?
>
> Because some times it needed sincronization, other times I felt it was
> not needed (the access to the volatile doesn't need synchronization. I
> just synchronize to ensure that the change to the list happens
> together with the change in the volatile var).
> If you can find a better solution you're welcome to provide one. It
> took a couple of hours to reach a working solution.
>
> The previous one was not thread safe at this line:
> ------
> if (listenerAdded == false) {
>    write.set(false);
> -----
> It could happen another thread already added a new item to the queue
> but skipped to process it because write was true. So we ended up with
> an item in the queue never written.
>
> I don't like too much my solution and I felt it a bit hackish, but
> that was my best solution for my limited time, so if you can provide a
> more elegant solution while still being thread safe, I'm more than
> happy :-)
>
> Stefano
>
>>
>> Thx,
>>
>> Eric
>>
>>
>>
>> On 21/12/11 15:47, bago@apache.org wrote:
>>>
>>> Author: bago
>>> Date: Wed Dec 21 14:47:25 2011
>>> New Revision: 1221748
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>>> Log:
>>> An attempt to refactor AbstractProtocolTransport to be thread safe. I
>>> moved back to standard synchronization as we only have max 2 threads
>>> competing for the queue so it doesn't make sense to use a non blocking
>>> queue. Norman, please overview, and feel free to revert if you don't like
>>> the solution (i thought it was better to simply commit instead of opening a
>>> JIRA to show you this).
>>>
>>> Modified:
>>>
>>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>>
>>> Modified:
>>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>> URL:
>>> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>>>
>>> ==============================================================================
>>> ---
>>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>> (original)
>>> +++
>>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>> Wed Dec 21 14:47:25 2011
>>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>>   import java.io.InputStream;
>>>   import java.io.UnsupportedEncodingException;
>>>   import java.util.List;
>>> -import java.util.concurrent.ConcurrentLinkedQueue;
>>> -import java.util.concurrent.atomic.AtomicBoolean;
>>> -
>>> +import java.util.Queue;
>>> +import java.util.concurrent.LinkedBlockingQueue;
>>>
>>>   import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>>>
>>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>>>
>>>
>>>       // TODO: Should we limit the size ?
>>> -    private final ConcurrentLinkedQueue<Response>    responses = new
>>> ConcurrentLinkedQueue<Response>();
>>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>>> +    private final Queue<Response>    responses = new
>>> LinkedBlockingQueue<Response>();
>>> +    private volatile boolean isAsync = false;
>>>
>>>       /**
>>>        * @see
>>> org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
>>> org.apache.james.protocols.api.ProtocolSession)
>>>        */
>>>       public final void writeResponse(Response response, final
>>> ProtocolSession session) {
>>> -        // just add the response to the queue. We will trigger the write
>>> operation later
>>> -        responses.add(response);
>>> -
>>> -        // trigger the write
>>> -        writeQueuedResponses(session);
>>> +        // if we already in asynchrnous mode we simply enqueue the
>>> response
>>> +        // we do this synchronously because we may have a dequeuer thread
>>> working on
>>> +        // isAsync and responses.
>>> +        boolean enqueued = false;
>>> +        synchronized(this) {
>>> +            if (isAsync == true) {
>>> +                responses.offer(response);
>>> +                enqueued = true;
>>> +            }
>>> +        }
>>> +
>>> +        // if we didn't enqueue then we check if the response is writable
>>> or we have to
>>> +        // set us "asynchrnous" and wait for response to be ready.
>>> +        if (!enqueued) {
>>> +            if (isResponseWritable(response)) {
>>> +                writeResponseToClient(response, session);
>>> +            } else {
>>> +                addDequeuerListener(response, session);
>>> +                isAsync = true;
>>> +            }
>>> +        }
>>>       }
>>>
>>>       /**
>>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>>        * @param session
>>>        */
>>>       private  void writeQueuedResponses(final ProtocolSession session) {
>>> -        Response queuedResponse = null;
>>>
>>> -        if (write.compareAndSet(false, true)){
>>> -            boolean listenerAdded = false;
>>> -            // dequeue Responses until non is left
>>> -            while ((queuedResponse = responses.poll()) != null) {
>>> -
>>> -                // check if we need to take special care of
>>> FutureResponses
>>> -                if (queuedResponse instanceof FutureResponse) {
>>> -                    FutureResponse futureResponse =(FutureResponse)
>>> queuedResponse;
>>> -                    if (futureResponse.isReady()) {
>>> -                        // future is ready so we can write it without
>>> blocking the IO-Thread
>>> -                        writeResponseToClient(queuedResponse, session);
>>> -                    } else {
>>> -
>>> -                        // future is not ready so we need to write it via
>>> a ResponseListener otherwise we MAY block the IO-Thread
>>> -                        futureResponse.addListener(new ResponseListener()
>>> {
>>> -
>>> -                            public void onResponse(FutureResponse
>>> response) {
>>> -                                writeResponseToClient(response, session);
>>> -                                if (write.compareAndSet(true, false)) {
>>> -                                    writeQueuedResponses(session);
>>> -                                }
>>> -                            }
>>> -                        });
>>> -                        listenerAdded = true;
>>> -                        // just break here as we will trigger the dequeue
>>> later
>>> -                        break;
>>> -                    }
>>> -
>>> -                } else {
>>> -                    // the Response is not a FutureResponse, so just
>>> write it back the the remote peer
>>> -                    writeResponseToClient(queuedResponse, session);
>>> +        // dequeue Responses until non is left
>>> +        while (true) {
>>> +
>>> +            Response queuedResponse = null;
>>> +
>>> +            // synchrnously we check responses and if it is empty we move
>>> back to non asynch
>>> +            // behaviour
>>> +            synchronized(this) {
>>> +                queuedResponse = responses.poll();
>>> +                if (queuedResponse == null) {
>>> +                    isAsync = false;
>>> +                    break;
>>>                   }
>>> -
>>>               }
>>> -            // Check if a ResponseListener was added before. If not we
>>> can allow to write
>>> -            // responses again. Otherwise the writing will get triggered
>>> from the listener
>>> -            if (listenerAdded == false) {
>>> -                write.set(false);
>>> +
>>> +            // if we have something in the queue we continue writing
>>> until we
>>> +            // find something asynchronous.
>>> +            if (isResponseWritable(queuedResponse)) {
>>> +                writeResponseToClient(queuedResponse, session);
>>> +            } else {
>>> +                addDequeuerListener(queuedResponse, session);
>>> +                // no changes to isAsync here, because in this method we
>>> are always already async.
>>> +                break;
>>>               }
>>>           }
>>> -
>>> -
>>> +    }
>>> +
>>> +    private boolean isResponseWritable(Response response) {
>>> +        return !(response instanceof FutureResponse) || ((FutureResponse)
>>> response).isReady();
>>> +    }
>>> +
>>> +    private void addDequeuerListener(Response response, final
>>> ProtocolSession session) {
>>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>>> +
>>> +            public void onResponse(FutureResponse response) {
>>> +                writeResponseToClient(response, session);
>>> +                writeQueuedResponses(session);
>>> +            }
>>> +        });
>>>       }
>>>
>>>       /**
>>>
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>>> For additional commands, e-mail: server-dev-help@james.apache.org
>>>
>>
>> --
>> Eric http://about.echarles.net
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>> For additional commands, e-mail: server-dev-help@james.apache.org
>>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
>

-- 
Eric http://about.echarles.net

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message