james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bagnara <apa...@bago.org>
Subject Re: svn commit: r1221748 - /james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Date Sat, 24 Dec 2011 11:19:07 GMT
2011/12/24 Eric Charles <eric@apache.org>:
> Hi Stephano,
>
> Opening the discussion to learn more :)
>
> - Why are you considering that 2 threads is a criteria to use standard
> synchronization rather than some atomic fields.

It is a criteria to not use the ConcurrentLinkedQueue that is a
structure thought to handle many concurrent threads and is overkill
for 2 threads.

> - I can understand you replace a concurrent by a non-concurrent queue.
> However, you now have a blocking queue. Is there an impact due to this
> blocking aspect?

I think the answer is no. That's why I did that.
Remember we have 2 threads and they do 2 different things, they simply
block each other when they add or remove from the queue.

> - You defined isAsync as volatile and sometimes encapsulate access to
> isAsync in a synchronized block, sometime not. Why using 2 different
> thread-safety strategies in this class?

Because some times it needed sincronization, other times I felt it was
not needed (the access to the volatile doesn't need synchronization. I
just synchronize to ensure that the change to the list happens
together with the change in the volatile var).
If you can find a better solution you're welcome to provide one. It
took a couple of hours to reach a working solution.

The previous one was not thread safe at this line:
------
if (listenerAdded == false) {
  write.set(false);
-----
It could happen another thread already added a new item to the queue
but skipped to process it because write was true. So we ended up with
an item in the queue never written.

I don't like too much my solution and I felt it a bit hackish, but
that was my best solution for my limited time, so if you can provide a
more elegant solution while still being thread safe, I'm more than
happy :-)

Stefano

>
> Thx,
>
> Eric
>
>
>
> On 21/12/11 15:47, bago@apache.org wrote:
>>
>> Author: bago
>> Date: Wed Dec 21 14:47:25 2011
>> New Revision: 1221748
>>
>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>> Log:
>> An attempt to refactor AbstractProtocolTransport to be thread safe. I
>> moved back to standard synchronization as we only have max 2 threads
>> competing for the queue so it doesn't make sense to use a non blocking
>> queue. Norman, please overview, and feel free to revert if you don't like
>> the solution (i thought it was better to simply commit instead of opening a
>> JIRA to show you this).
>>
>> Modified:
>>
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>
>> Modified:
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> URL:
>> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>>
>> ==============================================================================
>> ---
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> (original)
>> +++
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> Wed Dec 21 14:47:25 2011
>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>>  import java.io.InputStream;
>>  import java.io.UnsupportedEncodingException;
>>  import java.util.List;
>> -import java.util.concurrent.ConcurrentLinkedQueue;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>> -
>> +import java.util.Queue;
>> +import java.util.concurrent.LinkedBlockingQueue;
>>
>>  import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>>
>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>>
>>
>>      // TODO: Should we limit the size ?
>> -    private final ConcurrentLinkedQueue<Response>  responses = new
>> ConcurrentLinkedQueue<Response>();
>> -    private final AtomicBoolean write = new AtomicBoolean(false);
>> +    private final Queue<Response>  responses = new
>> LinkedBlockingQueue<Response>();
>> +    private volatile boolean isAsync = false;
>>
>>      /**
>>       * @see
>> org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
>> org.apache.james.protocols.api.ProtocolSession)
>>       */
>>      public final void writeResponse(Response response, final
>> ProtocolSession session) {
>> -        // just add the response to the queue. We will trigger the write
>> operation later
>> -        responses.add(response);
>> -
>> -        // trigger the write
>> -        writeQueuedResponses(session);
>> +        // if we already in asynchrnous mode we simply enqueue the
>> response
>> +        // we do this synchronously because we may have a dequeuer thread
>> working on
>> +        // isAsync and responses.
>> +        boolean enqueued = false;
>> +        synchronized(this) {
>> +            if (isAsync == true) {
>> +                responses.offer(response);
>> +                enqueued = true;
>> +            }
>> +        }
>> +
>> +        // if we didn't enqueue then we check if the response is writable
>> or we have to
>> +        // set us "asynchrnous" and wait for response to be ready.
>> +        if (!enqueued) {
>> +            if (isResponseWritable(response)) {
>> +                writeResponseToClient(response, session);
>> +            } else {
>> +                addDequeuerListener(response, session);
>> +                isAsync = true;
>> +            }
>> +        }
>>      }
>>
>>      /**
>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>>       * @param session
>>       */
>>      private  void writeQueuedResponses(final ProtocolSession session) {
>> -        Response queuedResponse = null;
>>
>> -        if (write.compareAndSet(false, true)){
>> -            boolean listenerAdded = false;
>> -            // dequeue Responses until non is left
>> -            while ((queuedResponse = responses.poll()) != null) {
>> -
>> -                // check if we need to take special care of
>> FutureResponses
>> -                if (queuedResponse instanceof FutureResponse) {
>> -                    FutureResponse futureResponse =(FutureResponse)
>> queuedResponse;
>> -                    if (futureResponse.isReady()) {
>> -                        // future is ready so we can write it without
>> blocking the IO-Thread
>> -                        writeResponseToClient(queuedResponse, session);
>> -                    } else {
>> -
>> -                        // future is not ready so we need to write it
via
>> a ResponseListener otherwise we MAY block the IO-Thread
>> -                        futureResponse.addListener(new ResponseListener()
>> {
>> -
>> -                            public void onResponse(FutureResponse
>> response) {
>> -                                writeResponseToClient(response,
session);
>> -                                if (write.compareAndSet(true, false))
{
>> -                                    writeQueuedResponses(session);
>> -                                }
>> -                            }
>> -                        });
>> -                        listenerAdded = true;
>> -                        // just break here as we will trigger the dequeue
>> later
>> -                        break;
>> -                    }
>> -
>> -                } else {
>> -                    // the Response is not a FutureResponse, so just
>> write it back the the remote peer
>> -                    writeResponseToClient(queuedResponse, session);
>> +        // dequeue Responses until non is left
>> +        while (true) {
>> +
>> +            Response queuedResponse = null;
>> +
>> +            // synchrnously we check responses and if it is empty we move
>> back to non asynch
>> +            // behaviour
>> +            synchronized(this) {
>> +                queuedResponse = responses.poll();
>> +                if (queuedResponse == null) {
>> +                    isAsync = false;
>> +                    break;
>>                  }
>> -
>>              }
>> -            // Check if a ResponseListener was added before. If not we
>> can allow to write
>> -            // responses again. Otherwise the writing will get triggered
>> from the listener
>> -            if (listenerAdded == false) {
>> -                write.set(false);
>> +
>> +            // if we have something in the queue we continue writing
>> until we
>> +            // find something asynchronous.
>> +            if (isResponseWritable(queuedResponse)) {
>> +                writeResponseToClient(queuedResponse, session);
>> +            } else {
>> +                addDequeuerListener(queuedResponse, session);
>> +                // no changes to isAsync here, because in this method we
>> are always already async.
>> +                break;
>>              }
>>          }
>> -
>> -
>> +    }
>> +
>> +    private boolean isResponseWritable(Response response) {
>> +        return !(response instanceof FutureResponse) || ((FutureResponse)
>> response).isReady();
>> +    }
>> +
>> +    private void addDequeuerListener(Response response, final
>> ProtocolSession session) {
>> +        ((FutureResponse) response).addListener(new ResponseListener() {
>> +
>> +            public void onResponse(FutureResponse response) {
>> +                writeResponseToClient(response, session);
>> +                writeQueuedResponses(session);
>> +            }
>> +        });
>>      }
>>
>>      /**
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
>> For additional commands, e-mail: server-dev-help@james.apache.org
>>
>
> --
> Eric http://about.echarles.net
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
> For additional commands, e-mail: server-dev-help@james.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message