mina-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Trustin Lee" <trus...@gmail.com>
Subject Re: svn commit: r417502 - /directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java
Date Mon, 03 Jul 2006 08:48:58 GMT
Hi Peter!

Your addition of ReadThrottleFilter looks good for me.  It is very creative!
:)

Here's my comments:

* The class name ends with 'Filter', but it's not a filter.  Wouldn't there
be a better name?  This is very close to IoFilterChainBuilder, but it's too
long.  Any idea?
* We have to increate the counter by remaining() instead of capacity() to be
more accurate.
* Is it impossible to use getTrafficMask() instead of using the session
attribute?  Was there any reason to do so?

That's all.  Everything is superb except that. :D

Cheers,
Trustin

On 6/28/06, proyal@apache.org <proyal@apache.org> wrote:
>
> Author: proyal
> Date: Tue Jun 27 09:31:30 2006
> New Revision: 417502
>
> URL: http://svn.apache.org/viewvc?rev=417502&view=rev
> Log:
> DIRMINA-206 - Filter that watches the total amount queue'd for a
> connection in the ThreadPoolFilter and will disable reads for a connection
> when a configurable (default 1mb) limit is reached.
>
> Added:
>
>     directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java
>
> Added:
> directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java
> URL:
> http://svn.apache.org/viewvc/directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java?rev=417502&view=auto
>
> ==============================================================================
> ---
> directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java
> (added)
> +++
> directory/trunks/mina/core/src/main/java/org/apache/mina/filter/ReadThrottleFilter.java
> Tue Jun 27 09:31:30 2006
> @@ -0,0 +1,192 @@
> +/*
> + * @(#) $Id: StreamWriteFilterTest.java 389042 2006-03-27 07:49:41Z
> trustin $
> + *
> + * Copyright 2004 The Apache Software Foundation
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.mina.filter;
> +
> +import java.util.Iterator;
> +import java.util.List;
> +
> +import org.apache.mina.common.ByteBuffer;
> +import org.apache.mina.common.DefaultIoFilterChainBuilder;
> +import org.apache.mina.common.IoFilterAdapter;
> +import org.apache.mina.common.IoFilterChain;
> +import org.apache.mina.common.IoSession;
> +import org.apache.mina.filter.thread.ThreadPoolFilter;
> +
> +/**
> + * This filter will automatically disable reads on an
> <code>IoSession</code> once the data batched for that session in
> + * the <code>ThreadPoolFilter</code> reaches a defined threshold (the
> default is 1mb). It accomplishes this by being in
> + * the filter chain before <strong>and</strong> after the
> <code>ThreadPoolFilter</code>. It is possible to subvert the
> + * behavior of this filter by adding filters immediately after the
> <code>ThreadPoolFilter</code> after adding this
> + * filter. Thus, it is recommended to add this filter towards the end of
> your filter chain construction, if you need to
> + * ensure that other filters need to be right after the
> <code>ThreadPoolFilter</code>.
> + *
> + * <p>Usage:
> + *
> + * <pre><code>
> + * DefaultFilterChainBuilder builder = ...
> + * ReadThrottleFilter filter = new ReadThrottleFilter();
> + * filter.attach( builder );
> + * </code></pre>
> + *
> + * or
> + *
> + * <pre><code>
> + * IoFilterChain chain = ...
> + * ReadThrottleFilter filter = new ReadThrottleFilter();
> + * filter.attach( chain );
> + * </code></pre>
> + *
> + * @author The Apache Directory Project (mina-dev@directory.apache.org)
> + * @version $Rev: 406554 $, $Date: 2006-05-15 06:46:02Z $
> + */
> +public class ReadThrottleFilter
> +{
> +    public static final String COUNTER = ReadThrottleFilter.class.getName()
> + ".counter";
> +    public static final String SUSPENDED_READS =
> ReadThrottleFilter.class.getName() + ".suspendedReads";
> +
> +    private volatile int maximumConnectionBufferSize = 1024 * 1024; //
> 1mb
> +
> +    /**
> +     * Set the maximum amount of data to buffer in the ThreadPoolFilter
> prior to disabling reads. Changing the value
> +     * will only take effect when new data is received for a connection,
> including existing connections. Default value
> +     * is 1mb
> +     *
> +     * @param maximumConnectionBufferSize New buffer size. Must be > 0
> +     */
> +    public void setMaximumConnectionBufferSize( int
> maximumConnectionBufferSize )
> +    {
> +        this.maximumConnectionBufferSize = maximumConnectionBufferSize;
> +    }
> +
> +    /**
> +     * Attach this filter to the specified filter chain. It will search
> for the ThreadPoolFilter, and attach itself
> +     * before and after that filter.
> +     *
> +     * @param chain <code>IoFilterChain</code> to attach self to.
> +     */
> +    public void attach( IoFilterChain chain )
> +    {
> +        String name = getThreadPoolFilterEntryName( chain.getAll() );
> +
> +        chain.addBefore( name, getClass().getName() + ".add", new Add()
> );
> +        chain.addAfter( name, getClass().getName() + ".release", new
> Release() );
> +    }
> +
> +    /**
> +     * Attach this filter to the specified builder. It will search for
> the ThreadPoolFilter, and attach itself before
> +     * and after that filter.
> +     *
> +     * @param builder <code>DefaultIoFilterChainBuilder</code> to attach
> self to.
> +     */
> +    public void attach( DefaultIoFilterChainBuilder builder )
> +    {
> +        String name = getThreadPoolFilterEntryName( builder.getAll() );
> +
> +        builder.addBefore( name, getClass().getName() + ".add", new Add()
> );
> +        builder.addAfter( name, getClass().getName() + ".release", new
> Release() );
> +    }
> +
> +    private String getThreadPoolFilterEntryName( List entries )
> +    {
> +        Iterator i = entries.iterator();
> +
> +        while( i.hasNext() )
> +        {
> +            IoFilterChain.Entry entry = (IoFilterChain.Entry)i.next();
> +
> +            if( entry.getFilter().getClass().isAssignableFrom(
> ThreadPoolFilter.class ) )
> +            {
> +                return entry.getName();
> +            }
> +        }
> +
> +        throw new IllegalStateException( "Chain does not contain a
> ThreadPoolFilter" );
> +    }
> +
> +    private void add( IoSession session, int size )
> +    {
> +        synchronized( session )
> +        {
> +            int counter = getCounter( session ) + size;
> +
> +            session.setAttribute( COUNTER, new Integer( counter ) );
> +
> +            if( counter >= maximumConnectionBufferSize &&
> session.getTrafficMask().isReadable() )
> +            {
> +                session.suspendRead();
> +                session.setAttribute( SUSPENDED_READS );
> +            }
> +        }
> +    }
> +
> +    private void release( IoSession session, int size )
> +    {
> +        synchronized( session )
> +        {
> +            int counter = Math.max( 0, getCounter( session ) - size );
> +
> +            session.setAttribute( COUNTER, new Integer( counter ) );
> +
> +            if( counter < maximumConnectionBufferSize &&
> isSuspendedReads( session ) )
> +            {
> +                session.resumeRead();
> +            }
> +
> +            session.removeAttribute( SUSPENDED_READS );
> +        }
> +    }
> +
> +    private boolean isSuspendedReads( IoSession session )
> +    {
> +        Boolean flag = (Boolean)session.getAttribute( SUSPENDED_READS );
> +
> +        return null != flag && flag.booleanValue();
> +    }
> +
> +    private int getCounter( IoSession session )
> +    {
> +        Integer i = (Integer)session.getAttribute( COUNTER );
> +        return null == i ? 0 : i.intValue();
> +    }
> +
> +    private class Add extends IoFilterAdapter
> +    {
> +        public void messageReceived( NextFilter nextFilter, IoSession
> session, Object message ) throws Exception
> +        {
> +            if( message instanceof ByteBuffer )
> +            {
> +                add( session, ( (ByteBuffer)message ).capacity() );
> +            }
> +
> +            nextFilter.messageReceived( session, message );
> +        }
> +    }
> +
> +    private class Release extends IoFilterAdapter
> +    {
> +        public void messageReceived( NextFilter nextFilter, IoSession
> session, Object message ) throws Exception
> +        {
> +            if( message instanceof ByteBuffer )
> +            {
> +                release( session, ( (ByteBuffer)message ).capacity() );
> +            }
> +
> +            nextFilter.messageReceived( session, message );
> +        }
> +    }
> +}
>
>
>


-- 
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP key fingerprints:
* E167 E6AF E73A CBCE EE41  4A29 544D DE48 FE95 4E7E
* B693 628E 6047 4F8F CFA4  455E 1C62 A7DC 0255 ECA6

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message