synapse-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Supun Kamburugamuva <supu...@gmail.com>
Subject Re: svn commit: r1072963 - in /synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp: HttpCoreNIOListener.java ListenerContext.java ServerHandler.java ServerWorker.java
Date Tue, 22 Feb 2011 14:00:11 GMT
+1, I too feel that the Nhttp code quality has gone down over time. It may
be time for a major re-factor in the code base.

Thanks,
Supun..

On Mon, Feb 21, 2011 at 10:31 PM, indika kumara <indika.kuma@gmail.com>wrote:

> Spun, Is the ListenerContext just a holder of a set of parameters - a way
> to reduce the number of parameters passing for a method ?  I am extremely
> sorry for my ignorance.
>
> The large number of parameters most probably is a bad smell of having a lot
> of responsibility. A solution would be to identify and clearly separate
> orthogonal responsibilities thorugh correct abstractions.  Such a design
> would mostly improve modifiability and readability (A correct abstraction is
> the best self documentation).  I am not sure how much a solution like
> 'ListenerContext' can provide a suitable long term solution.  I would rather
> go for a radical refratoring based on pure OOP and responsibility driven
> approach.
>
> BTW, I never want to go against a community decision. This is just a
> personal thought and you can ignore it.
>
> Thanks,
>
> Indika
>
>
> On Mon, Feb 21, 2011 at 7:23 PM, <supun@apache.org> wrote:
>
>> Author: supun
>> Date: Mon Feb 21 13:23:54 2011
>> New Revision: 1072963
>>
>> URL: http://svn.apache.org/viewvc?rev=1072963&view=rev
>> Log:
>> adding a listener context to hold some of the configurations of the
>> listener, this re-factors some methods to have less parameters
>>
>> Added:
>>
>>  synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
>> Modified:
>>
>>  synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
>>
>>  synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
>>
>>  synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
>>
>> Modified:
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=1072963&r1=1072962&r2=1072963&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
>> Mon Feb 21 13:23:54 2011
>> @@ -19,8 +19,6 @@
>>  package org.apache.synapse.transport.nhttp;
>>
>>  import org.apache.axiom.om.OMElement;
>> -import org.apache.axiom.om.OMAttribute;
>> -import org.apache.axiom.om.impl.builder.StAXOMBuilder;
>>  import org.apache.axis2.AxisFault;
>>  import org.apache.axis2.addressing.EndpointReference;
>>  import org.apache.axis2.context.ConfigurationContext;
>> @@ -39,36 +37,20 @@ import org.apache.commons.logging.LogFac
>>  import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
>>  import org.apache.http.impl.nio.reactor.SSLIOSessionHandler;
>>  import org.apache.http.nio.NHttpServiceHandler;
>> -import org.apache.http.nio.params.NIOReactorPNames;
>>  import org.apache.http.nio.reactor.IOEventDispatch;
>>  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
>>  import org.apache.http.nio.reactor.ListenerEndpoint;
>> -import org.apache.http.params.BasicHttpParams;
>> -import org.apache.http.params.HttpConnectionParams;
>>  import org.apache.http.params.HttpParams;
>> -import org.apache.http.params.HttpProtocolParams;
>> -import org.apache.synapse.commons.executors.PriorityExecutor;
>> -import org.apache.synapse.commons.executors.ExecutorConstants;
>> -import
>> org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
>> -import org.apache.synapse.commons.evaluators.Parser;
>> -import org.apache.synapse.commons.evaluators.EvaluatorException;
>> -import org.apache.synapse.commons.evaluators.EvaluatorConstants;
>>  import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
>>
>>  import javax.net.ssl.SSLContext;
>> -import javax.xml.stream.XMLStreamException;
>> -import javax.xml.namespace.QName;
>>  import java.io.IOException;
>>  import java.io.InterruptedIOException;
>> -import java.io.FileInputStream;
>> -import java.io.FileNotFoundException;
>>  import java.net.InetAddress;
>>  import java.net.InetSocketAddress;
>> -import java.net.UnknownHostException;
>>  import java.util.ArrayList;
>>  import java.util.HashMap;
>>  import java.util.Map;
>> -import java.util.Properties;
>>
>>  /**
>>  * NIO transport listener for Axis2 based on HttpCore and NIO extensions
>> @@ -76,11 +58,6 @@ import java.util.Properties;
>>  public class HttpCoreNIOListener implements TransportListener,
>> ManagementSupport {
>>
>>     private static final Log log =
>> LogFactory.getLog(HttpCoreNIOListener.class);
>> -
>> -    /** The Axis2 configuration context */
>> -    private ConfigurationContext cfgCtx;
>> -    /** The Axis2 Transport In Description for the transport */
>> -    private TransportInDescription transportIn;
>>     /** The IOReactor */
>>     private DefaultListeningIOReactor ioReactor = null;
>>
>> @@ -94,32 +71,20 @@ public class HttpCoreNIOListener impleme
>>     private Map<String, String> eprToServiceNameMap = new HashMap<String,
>> String>();
>>     /** the axis observer that gets notified of service life cycle
>> events*/
>>     private final AxisObserver axisObserver = new GenericAxisObserver();
>> -    /** The port to listen on, defaults to 8280 */
>> -    private int port = 8280;
>> -    /** The hostname to use, defaults to localhost */
>> -    private String host = "localhost";
>> -    /** The bind addresses as (address, port) pairs */
>> -    private String bindAddress = null;
>>     /** SSLContext if this listener is a SSL listener */
>>     private SSLContext sslContext = null;
>>     /** The SSL session handler that manages client authentication etc */
>>     private SSLIOSessionHandler sslIOSessionHandler = null;
>>     /** JMX support */
>>     private TransportMBeanSupport mbeanSupport;
>> -    /** Metrics collector for this transport */
>> -    private NhttpMetricsCollector metrics = null;
>>     /** state of the listener */
>>     private volatile int state = BaseConstants.STOPPED;
>>     /** The ServerHandler */
>>     private ServerHandler handler = null;
>> -    /** This will execute the requests based on calculate priority */
>> -    private PriorityExecutor executor = null;
>> -    /** parser for calculating the priority of incoming messages */
>> -    private Parser parser = null;
>> -    /** if falses we won't dispatch to axis2 service in case of rest
>> scenarios */
>> -    private boolean restDispatching = true;
>> -    /** WSDL processor for Get requests*/
>> -    private HttpGetRequestProcessor httpGetRequestProcessor = null;
>> +    /** Listener configurations */
>> +    private ListenerContext listenerContext;
>> +    /** Metrics */
>> +    private NhttpMetricsCollector metrics = null;
>>
>>     protected IOEventDispatch getEventDispatch(
>>         NHttpServiceHandler handler, SSLContext sslContext,
>> @@ -128,73 +93,33 @@ public class HttpCoreNIOListener impleme
>>     }
>>
>>     /**
>> -     * get HTTP protocol parameters to which the listener must adhere to
>> -     * @return the applicable HTTP protocol parameters
>> -     */
>> -    private HttpParams getServerParameters() {
>> -        HttpParams params = new BasicHttpParams();
>> -        NHttpConfiguration cfg = NHttpConfiguration.getInstance();
>> -        params
>> -            .setIntParameter(HttpConnectionParams.SO_TIMEOUT,
>> -                cfg.getProperty(HttpConnectionParams.SO_TIMEOUT, 60000))
>> -            .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE,
>> -                cfg.getProperty(HttpConnectionParams.SOCKET_BUFFER_SIZE,
>> 8 * 1024))
>> -
>>  .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK,
>> -
>>  cfg.getProperty(HttpConnectionParams.STALE_CONNECTION_CHECK, 0) == 1)
>> -            .setBooleanParameter(HttpConnectionParams.TCP_NODELAY,
>> -                cfg.getProperty(HttpConnectionParams.TCP_NODELAY, 1) ==
>> 1)
>> -            .setParameter(HttpProtocolParams.ORIGIN_SERVER,
>> "Synapse-HttpComponents-NIO");
>> -
>> -        if (cfg.getBooleanValue(NIOReactorPNames.INTEREST_OPS_QUEUEING,
>> false)) {
>> -
>>  params.setBooleanParameter(NIOReactorPNames.INTEREST_OPS_QUEUEING, true);
>> -        }
>> -        return params;
>> -    }
>> -
>> -    /**
>>      * Initialize the transport listener, and execute reactor in new
>> seperate thread
>>      * @param cfgCtx the Axis2 configuration context
>> -     * @param transprtIn the description of the http/s transport from
>> Axis2 configuration
>> +     * @param transportIn the description of the http/s transport from
>> Axis2 configuration
>>      * @throws AxisFault on error
>>      */
>> -    public void init(ConfigurationContext cfgCtx, TransportInDescription
>> transprtIn)
>> +    public void init(ConfigurationContext cfgCtx, TransportInDescription
>> transportIn)
>>             throws AxisFault {
>>
>> -        this.cfgCtx = cfgCtx;
>> -        this.transportIn = transprtIn;
>>         cfgCtx.setProperty(NhttpConstants.EPR_TO_SERVICE_NAME_MAP,
>> eprToServiceNameMap);
>> -        Parameter param = transprtIn.getParameter(PARAM_PORT);
>> -        if (param != null) {
>> -            port = Integer.parseInt((String) param.getValue());
>> -        }
>>
>> -        param = transprtIn.getParameter(NhttpConstants.BIND_ADDRESS);
>> -        if (param != null) {
>> -            bindAddress = ((String) param.getValue()).trim();
>> -        }
>> +        // is this an SSL listener?
>> +        sslContext = getSSLContext(transportIn);
>> +        sslIOSessionHandler = getSSLIOSessionHandler(transportIn);
>>
>> -        param = transprtIn.getParameter(HOST_ADDRESS);
>> -        if (param != null) {
>> -            host = ((String) param.getValue()).trim();
>> -        } else {
>> -            try {
>> -                host = java.net.InetAddress.getLocalHost().getHostName();
>> -            } catch (UnknownHostException e) {
>> -                log.warn("Unable to lookup local host name, using
>> 'localhost'");
>> -            }
>> -        }
>> +        listenerContext = new ListenerContext(cfgCtx, transportIn,
>> sslContext != null);
>> +        listenerContext.build();
>>
>> -        // is this an SSL listener?
>> -        sslContext = getSSLContext(transprtIn);
>> -        sslIOSessionHandler = getSSLIOSessionHandler(transprtIn);
>> +        metrics = listenerContext.getMetrics();
>>
>> -        param = transprtIn.getParameter(NhttpConstants.WSDL_EPR_PREFIX);
>> +        Parameter param =
>> transportIn.getParameter(NhttpConstants.WSDL_EPR_PREFIX);
>>         if (param != null) {
>>             serviceEPRPrefix = getServiceEPRPrefix(cfgCtx, (String)
>> param.getValue());
>>             customEPRPrefix = (String) param.getValue();
>>         } else {
>> -            serviceEPRPrefix = getServiceEPRPrefix(cfgCtx, host, port);
>> -            customEPRPrefix = transprtIn.getName() + "://" + host + ":" +
>> (port == 80 ? "" : port) + "/";
>> +            serviceEPRPrefix = getServiceEPRPrefix(cfgCtx,
>> listenerContext.getHost(), listenerContext.getPort());
>> +            customEPRPrefix = transportIn.getName() + "://" +
>> listenerContext.getHost() +
>> +                    ":" + (listenerContext.getPort() == 80 ? "" :
>> listenerContext.getPort()) + "/";
>>         }
>>
>>         // register to receive updates on services for lifetime management
>> @@ -204,115 +129,12 @@ public class HttpCoreNIOListener impleme
>>         mbeanSupport
>>             = new TransportMBeanSupport(this, "nio-http" + (sslContext ==
>> null ? "" : "s"));
>>         mbeanSupport.register();
>> -        metrics = new NhttpMetricsCollector(true, sslContext != null);
>> -
>> -        // create the priority based executor and parser
>> -        param =
>> transprtIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
>> -        if (param != null && param.getValue() != null) {
>> -            createPriorityConfiguration(param.getValue().toString());
>> -        }
>> -
>> -        param =
>> transprtIn.getParameter(NhttpConstants.DISABLE_REST_SERVICE_DISPATCHING);
>> -        if (param != null && param.getValue() != null) {
>> -            if (param.getValue().equals("true")) {
>> -                restDispatching = false;
>> -            }
>> -        }
>> -
>> -        // create http Get processor
>> -        param =
>> transprtIn.getParameter(NhttpConstants.HTTP_GET_PROCESSOR);
>> -        if (param != null && param.getValue() != null) {
>> -            httpGetRequestProcessor =
>> createHttpGetProcessor(param.getValue().toString());
>> -            if (httpGetRequestProcessor == null) {
>> -                handleException("Cannot create HttpGetRequestProcessor");
>> -            }
>> -        } else {
>> -            httpGetRequestProcessor = new DefaultHttpGetProcessor();
>> -        }
>> -    }
>> -
>> -    private HttpGetRequestProcessor createHttpGetProcessor(String str)
>> throws AxisFault {
>> -        Object obj = null;
>> -        try {
>> -            obj = Class.forName(str).newInstance();
>> -        } catch (ClassNotFoundException e) {
>> -            handleException("Error creating WSDL processor", e);
>> -        } catch (InstantiationException e) {
>> -            handleException("Error creating WSDL processor", e);
>> -        } catch (IllegalAccessException e) {
>> -            handleException("Error creating WSDL processor", e);
>> -        }
>> -
>> -        if (obj instanceof HttpGetRequestProcessor) {
>> -            return (HttpGetRequestProcessor) obj;
>> -        } else {
>> -            handleException("Error creating WSDL processor. The
>> HttpProcessor should be of type " +
>> -
>>  "org.apache.synapse.transport.nhttp.HttpGetRequestProcessor");
>> -        }
>> -
>> -        return null;
>>     }
>>
>>     public int getActiveConnectionsSize() {
>>         return handler.getActiveConnectionsSize();
>>     }
>>
>> -    /**
>> -     * Create a priority executor from the given file
>> -     *
>> -     * @param fileName file name of the executor configuration
>> -     * @throws AxisFault if an error occurs
>> -     */
>> -    private void createPriorityConfiguration(String fileName) throws
>> AxisFault {
>> -        OMElement definitions = null;
>> -        try {
>> -            FileInputStream fis = new FileInputStream(fileName);
>> -            definitions = new StAXOMBuilder(fis).getDocumentElement();
>> -            definitions.build();
>> -        } catch (FileNotFoundException e) {
>> -            handleException("Priority configuration file cannot be found
>> : " + fileName, e);
>> -        } catch (XMLStreamException e) {
>> -            handleException("Error parsing priority configuration xml
>> file " + fileName, e);
>> -        }
>> -
>> -        assert definitions != null;
>> -        OMElement executorElem = definitions.getFirstChildWithName(
>> -                new QName(ExecutorConstants.PRIORITY_EXECUTOR));
>> -
>> -        if (executorElem == null) {
>> -            handleException(ExecutorConstants.PRIORITY_EXECUTOR +
>> -                    " configuration is mandatory for priority based
>> routing");
>> -        }
>> -
>> -        executor = PriorityExecutorFactory.createExecutor(
>> -                null, executorElem, false, new Properties());
>> -        OMElement conditionsElem = definitions.getFirstChildWithName(
>> -                new QName(EvaluatorConstants.CONDITIONS));
>> -        if (conditionsElem == null) {
>> -            handleException("Conditions configuration is mandatory for
>> priority based routing");
>> -        }
>> -
>> -        executor.init();
>> -
>> -        assert conditionsElem != null;
>> -        OMAttribute defPriorityAttr = conditionsElem.getAttribute(
>> -                new QName(EvaluatorConstants.DEFAULT_PRIORITY));
>> -        if (defPriorityAttr != null) {
>> -            parser = new
>> Parser(Integer.parseInt(defPriorityAttr.getAttributeValue()));
>> -        } else {
>> -            parser = new Parser();
>> -        }
>> -
>> -        try {
>> -            parser.init(conditionsElem);
>> -        } catch (EvaluatorException e) {
>> -            handleException("Invalid " + EvaluatorConstants.CONDITIONS +
>> -                    " configuration for priority based mediation", e);
>> -        }
>> -
>> -        log.info("Created a priority based executor from the
>> configuration: " +
>> -                fileName);
>> -    }
>>
>>     /**
>>      * Return the EPR prefix for services made available over this
>> transport
>> @@ -374,7 +196,7 @@ public class HttpCoreNIOListener impleme
>>         }
>>
>>         // configure the IO reactor on the specified port
>> -        HttpParams params = getServerParameters();
>> +        HttpParams params = listenerContext.getParams();
>>         try {
>>             String prefix = (sslContext == null ? "http" : "https") +
>> "-Listener I/O dispatcher";
>>             ioReactor = new DefaultListeningIOReactor(
>> @@ -398,25 +220,26 @@ public class HttpCoreNIOListener impleme
>>             handleException("Error starting the IOReactor", e);
>>         }
>>
>> +        ConfigurationContext cfgCtx = listenerContext.getCfgCtx();
>> +
>>         for (Object obj :
>> cfgCtx.getAxisConfiguration().getServices().values()) {
>>             addToServiceURIMap((AxisService) obj);
>>         }
>> -
>> -        handler = new ServerHandler(cfgCtx, params, sslContext != null
>> -                , metrics, parser, executor, restDispatching,
>> httpGetRequestProcessor);
>> +
>> +        handler = new ServerHandler(listenerContext);
>>         final IOEventDispatch ioEventDispatch = getEventDispatch(handler,
>>                 sslContext, sslIOSessionHandler, params);
>>         state = BaseConstants.STARTED;
>>
>> -        httpGetRequestProcessor.init(cfgCtx,handler);
>> +        listenerContext.getHttpGetRequestProcessor().init(cfgCtx,
>> handler);
>>
>>         ListenerEndpoint endpoint;
>>         try {
>> -            if (bindAddress == null) {
>> -                endpoint = ioReactor.listen(new InetSocketAddress(port));
>> +            if (listenerContext.getBindAddress() == null) {
>> +                endpoint = ioReactor.listen(new
>> InetSocketAddress(listenerContext.getPort()));
>>             } else {
>>                 endpoint = ioReactor.listen(new InetSocketAddress(
>> -                    InetAddress.getByName(bindAddress), port));
>> +
>>  InetAddress.getByName(listenerContext.getBindAddress()),
>> listenerContext.getPort()));
>>             }
>>         } catch (IOException e) {
>>             handleException("Encountered an I/O error: " + e.getMessage(),
>> e);
>> @@ -448,9 +271,10 @@ public class HttpCoreNIOListener impleme
>>         } catch (InterruptedException e) {
>>             log.warn("HttpCoreNIOListener#start() was interrupted");
>>         }
>> -
>> +
>>         log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener
>> started on" +
>> -            (bindAddress != null ? " address : " + bindAddress : "") + "
>> port : " + port);
>> +                (listenerContext.getBindAddress() != null ? " address : "
>> + listenerContext.getBindAddress() : "") +
>> +                " port : " + listenerContext.getPort());
>>     }
>>
>>     private void addToServiceURIMap(AxisService service) {
>> @@ -480,7 +304,7 @@ public class HttpCoreNIOListener impleme
>>             ioReactor.shutdown();
>>             handler.stop();
>>             state = BaseConstants.STOPPED;
>> -            for (Object obj :
>> cfgCtx.getAxisConfiguration().getServices().values()) {
>> +            for (Object obj :
>> listenerContext.getCfgCtx().getAxisConfiguration().getServices().values()) {
>>                 removeServiceFfromURIMap((AxisService) obj);
>>             }
>>         } catch (IOException e) {
>> @@ -626,7 +450,7 @@ public class HttpCoreNIOListener impleme
>>
>>     public void destroy() {
>>         ioReactor = null;
>> -
>>  cfgCtx.getAxisConfiguration().getObserversList().remove(axisObserver);
>> +
>>  listenerContext.getCfgCtx().getAxisConfiguration().getObserversList().remove(axisObserver);
>>         mbeanSupport.unregister();
>>         metrics.destroy();
>>     }
>> @@ -644,7 +468,7 @@ public class HttpCoreNIOListener impleme
>>         public void serviceUpdate(AxisEvent event, AxisService service) {
>>
>>             if (!ignoreService(service)
>> -                    && BaseUtils.isUsingTransport(service,
>> transportIn.getName())) {
>> +                    && BaseUtils.isUsingTransport(service,
>> listenerContext.getTransportIn().getName())) {
>>                 switch (event.getEventType()) {
>>                     case AxisEvent.SERVICE_DEPLOY :
>>                         addToServiceURIMap(service);
>> @@ -684,11 +508,6 @@ public class HttpCoreNIOListener impleme
>>         throw new AxisFault(msg, e);
>>     }
>>
>> -    private void handleException(String msg) throws AxisFault {
>> -        log.error(msg);
>> -        throw new AxisFault(msg);
>> -    }
>> -
>>     // -- jmx/management methods--
>>     public long getMessagesReceived() {
>>         if (metrics != null) {
>>
>> Added:
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java?rev=1072963&view=auto
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
>> (added)
>> +++
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ListenerContext.java
>> Mon Feb 21 13:23:54 2011
>> @@ -0,0 +1,305 @@
>> +/*
>> + *  Licensed to the Apache Software Foundation (ASF) under one
>> + *  or more contributor license agreements.  See the NOTICE file
>> + *  distributed with this work for additional information
>> + *  regarding copyright ownership.  The ASF licenses this file
>> + *  to you under the Apache License, Version 2.0 (the
>> + *  "License"); you may not use this file except in compliance
>> + *  with the License.  You may obtain a copy of the License at
>> + *
>> + *   http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + *  Unless required by applicable law or agreed to in writing,
>> + *  software distributed under the License is distributed on an
>> + *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + *  KIND, either express or implied.  See the License for the
>> + *  specific language governing permissions and limitations
>> + *  under the License.
>> + */
>> +
>> +package org.apache.synapse.transport.nhttp;
>> +
>> +import org.apache.axiom.om.OMAttribute;
>> +import org.apache.axiom.om.OMElement;
>> +import org.apache.axiom.om.impl.builder.StAXOMBuilder;
>> +import org.apache.axis2.AxisFault;
>> +import org.apache.axis2.context.ConfigurationContext;
>> +import org.apache.axis2.description.Parameter;
>> +import org.apache.axis2.description.TransportInDescription;
>> +import org.apache.axis2.transport.TransportListener;
>> +import org.apache.commons.logging.Log;
>> +import org.apache.commons.logging.LogFactory;
>> +import org.apache.http.nio.params.NIOReactorPNames;
>> +import org.apache.http.params.BasicHttpParams;
>> +import org.apache.http.params.HttpConnectionParams;
>> +import org.apache.http.params.HttpParams;
>> +import org.apache.http.params.HttpProtocolParams;
>> +import org.apache.synapse.commons.evaluators.EvaluatorConstants;
>> +import org.apache.synapse.commons.evaluators.EvaluatorException;
>> +import org.apache.synapse.commons.evaluators.Parser;
>> +import org.apache.synapse.commons.executors.ExecutorConstants;
>> +import org.apache.synapse.commons.executors.PriorityExecutor;
>> +import
>> org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
>> +import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
>> +
>> +import javax.xml.namespace.QName;
>> +import javax.xml.stream.XMLStreamException;
>> +import java.io.FileInputStream;
>> +import java.io.FileNotFoundException;
>> +import java.net.InetAddress;
>> +import java.net.UnknownHostException;
>> +import java.util.Properties;
>> +
>> +/**
>> + * This class is being used to hold the different runtime objects used by
>> the Listeners
>> + */
>> +public class ListenerContext {
>> +    private Log log = LogFactory.getLog(ListenerContext.class);
>> +
>> +    /** The Axis2 configuration context */
>> +    private ConfigurationContext cfgCtx;
>> +    /** The Axis2 Transport In Description for the transport */
>> +    private TransportInDescription transportIn;
>> +    /** SSLContext if this listener is a SSL listener */
>> +    private boolean ssl = false;
>> +    /** JMX support */
>> +    private NhttpMetricsCollector metrics = null;
>> +    /** This will execute the requests based on calculate priority */
>> +    private PriorityExecutor executor = null;
>> +    /** parser for calculating the priority of incoming messages */
>> +    private Parser parser = null;
>> +    /** if false we won't dispatch to axis2 service in case of rest
>> scenarios */
>> +    private boolean restDispatching = true;
>> +    /** WSDL processor for Get requests*/
>> +    private HttpGetRequestProcessor httpGetRequestProcessor = null;
>> +    /** The port to listen on, defaults to 8280 */
>> +    private int port = 8280;
>> +    /** The hostname to use, defaults to localhost */
>> +    private String host = "localhost";
>> +    /** The bind addresses as (address, port) pairs */
>> +    private String bindAddress = null;
>> +
>> +    private HttpParams params = null;
>> +
>> +
>> +    public ListenerContext(ConfigurationContext cfgCtx,
>> +                           TransportInDescription transportIn,
>> +                           boolean ssl) {
>> +        this.cfgCtx = cfgCtx;
>> +        this.transportIn = transportIn;
>> +        this.ssl = ssl;
>> +    }
>> +
>> +    public void build() throws AxisFault {
>> +        Parameter param =
>> transportIn.getParameter(TransportListener.PARAM_PORT);
>> +        if (param != null) {
>> +            port = Integer.parseInt((String) param.getValue());
>> +        }
>> +
>> +        param = transportIn.getParameter(NhttpConstants.BIND_ADDRESS);
>> +        if (param != null) {
>> +            bindAddress = ((String) param.getValue()).trim();
>> +        }
>> +
>> +        param = transportIn.getParameter(TransportListener.HOST_ADDRESS);
>> +        if (param != null) {
>> +            host = ((String) param.getValue()).trim();
>> +        } else {
>> +            try {
>> +                host = InetAddress.getLocalHost().getHostName();
>> +            } catch (UnknownHostException e) {
>> +                log.warn("Unable to lookup local host name, using
>> 'localhost'");
>> +            }
>> +        }
>> +
>> +        metrics = new NhttpMetricsCollector(true, ssl);
>> +
>> +        // create the priority based executor and parser
>> +        param =
>> transportIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
>> +        if (param != null && param.getValue() != null) {
>> +            createPriorityConfiguration(param.getValue().toString());
>> +        }
>> +
>> +        param =
>> transportIn.getParameter(NhttpConstants.DISABLE_REST_SERVICE_DISPATCHING);
>> +        if (param != null && param.getValue() != null) {
>> +            if (param.getValue().equals("true")) {
>> +                restDispatching = false;
>> +            }
>> +        }
>> +
>> +        // create http Get processor
>> +        param =
>> transportIn.getParameter(NhttpConstants.HTTP_GET_PROCESSOR);
>> +        if (param != null && param.getValue() != null) {
>> +            httpGetRequestProcessor =
>> createHttpGetProcessor(param.getValue().toString());
>> +            if (httpGetRequestProcessor == null) {
>> +                handleException("Cannot create HttpGetRequestProcessor");
>> +            }
>> +        } else {
>> +            httpGetRequestProcessor = new DefaultHttpGetProcessor();
>> +        }
>> +
>> +        params = getListenerParameters();
>> +    }
>> +
>> +/**
>> +     * Create a priority executor from the given file
>> +     *
>> +     * @param fileName file name of the executor configuration
>> +     * @throws org.apache.axis2.AxisFault if an error occurs
>> +     */
>> +    private void createPriorityConfiguration(String fileName) throws
>> AxisFault {
>> +        OMElement definitions = null;
>> +        try {
>> +            FileInputStream fis = new FileInputStream(fileName);
>> +            definitions = new StAXOMBuilder(fis).getDocumentElement();
>> +            definitions.build();
>> +        } catch (FileNotFoundException e) {
>> +            handleException("Priority configuration file cannot be found
>> : " + fileName, e);
>> +        } catch (XMLStreamException e) {
>> +            handleException("Error parsing priority configuration xml
>> file " + fileName, e);
>> +        }
>> +
>> +        assert definitions != null;
>> +        OMElement executorElem = definitions.getFirstChildWithName(
>> +                new QName(ExecutorConstants.PRIORITY_EXECUTOR));
>> +
>> +        if (executorElem == null) {
>> +            handleException(ExecutorConstants.PRIORITY_EXECUTOR +
>> +                    " configuration is mandatory for priority based
>> routing");
>> +        }
>> +
>> +        executor = PriorityExecutorFactory.createExecutor(
>> +                null, executorElem, false, new Properties());
>> +        OMElement conditionsElem = definitions.getFirstChildWithName(
>> +                new QName(EvaluatorConstants.CONDITIONS));
>> +        if (conditionsElem == null) {
>> +            handleException("Conditions configuration is mandatory for
>> priority based routing");
>> +        }
>> +
>> +        executor.init();
>> +
>> +        assert conditionsElem != null;
>> +        OMAttribute defPriorityAttr = conditionsElem.getAttribute(
>> +                new QName(EvaluatorConstants.DEFAULT_PRIORITY));
>> +        if (defPriorityAttr != null) {
>> +            parser = new
>> Parser(Integer.parseInt(defPriorityAttr.getAttributeValue()));
>> +        } else {
>> +            parser = new Parser();
>> +        }
>> +
>> +        try {
>> +            parser.init(conditionsElem);
>> +        } catch (EvaluatorException e) {
>> +            handleException("Invalid " + EvaluatorConstants.CONDITIONS +
>> +                    " configuration for priority based mediation", e);
>> +        }
>> +
>> +        log.info("Created a priority based executor from the
>> configuration: " +
>> +                fileName);
>> +    }
>> +
>> +    private HttpGetRequestProcessor createHttpGetProcessor(String str)
>> throws AxisFault {
>> +        Object obj = null;
>> +        try {
>> +            obj = Class.forName(str).newInstance();
>> +        } catch (ClassNotFoundException e) {
>> +            handleException("Error creating WSDL processor", e);
>> +        } catch (InstantiationException e) {
>> +            handleException("Error creating WSDL processor", e);
>> +        } catch (IllegalAccessException e) {
>> +            handleException("Error creating WSDL processor", e);
>> +        }
>> +
>> +        if (obj instanceof HttpGetRequestProcessor) {
>> +            return (HttpGetRequestProcessor) obj;
>> +        } else {
>> +            handleException("Error creating WSDL processor. The
>> HttpProcessor should be of type " +
>> +
>>  "org.apache.synapse.transport.nhttp.HttpGetRequestProcessor");
>> +        }
>> +
>> +        return null;
>> +    }
>> +
>> +    /**
>> +     * get HTTP protocol parameters to which the listener must adhere to
>> +     * @return the applicable HTTP protocol parameters
>> +     */
>> +    private HttpParams getListenerParameters() {
>> +        HttpParams params = new BasicHttpParams();
>> +        NHttpConfiguration cfg = NHttpConfiguration.getInstance();
>> +        params
>> +            .setIntParameter(HttpConnectionParams.SO_TIMEOUT,
>> +                cfg.getProperty(HttpConnectionParams.SO_TIMEOUT, 60000))
>> +            .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE,
>> +                cfg.getProperty(HttpConnectionParams.SOCKET_BUFFER_SIZE,
>> 8 * 1024))
>> +
>>  .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK,
>> +
>>  cfg.getProperty(HttpConnectionParams.STALE_CONNECTION_CHECK, 0) == 1)
>> +            .setBooleanParameter(HttpConnectionParams.TCP_NODELAY,
>> +                cfg.getProperty(HttpConnectionParams.TCP_NODELAY, 1) ==
>> 1)
>> +            .setParameter(HttpProtocolParams.ORIGIN_SERVER,
>> "Synapse-HttpComponents-NIO");
>> +
>> +        if (cfg.getBooleanValue(NIOReactorPNames.INTEREST_OPS_QUEUEING,
>> false)) {
>> +
>>  params.setBooleanParameter(NIOReactorPNames.INTEREST_OPS_QUEUEING, true);
>> +        }
>> +        return params;
>> +    }
>> +
>> +
>> +    public ConfigurationContext getCfgCtx() {
>> +        return cfgCtx;
>> +    }
>> +
>> +    public TransportInDescription getTransportIn() {
>> +        return transportIn;
>> +    }
>> +
>> +    public boolean isSsl() {
>> +        return ssl;
>> +    }
>> +
>> +    public NhttpMetricsCollector getMetrics() {
>> +        return metrics;
>> +    }
>> +
>> +    public PriorityExecutor getExecutor() {
>> +        return executor;
>> +    }
>> +
>> +    public Parser getParser() {
>> +        return parser;
>> +    }
>> +
>> +    public boolean isRestDispatching() {
>> +        return restDispatching;
>> +    }
>> +
>> +    public HttpGetRequestProcessor getHttpGetRequestProcessor() {
>> +        return httpGetRequestProcessor;
>> +    }
>> +
>> +    public int getPort() {
>> +        return port;
>> +    }
>> +
>> +    public String getHost() {
>> +        return host;
>> +    }
>> +
>> +    public String getBindAddress() {
>> +        return bindAddress;
>> +    }
>> +
>> +    public HttpParams getParams() {
>> +        return params;
>> +    }
>> +
>> +    private void handleException(String msg, Exception e) throws
>> AxisFault {
>> +        log.error(msg, e);
>> +        throw new AxisFault(msg, e);
>> +    }
>> +
>> +    private void handleException(String msg) throws AxisFault {
>> +        log.error(msg);
>> +        throw new AxisFault(msg);
>> +    }
>> +}
>>
>> Modified:
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=1072963&r1=1072962&r2=1072963&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
>> Mon Feb 21 13:23:54 2011
>> @@ -121,15 +121,15 @@ public class ServerHandler implements NH
>>     public static final String CONNECTION_CREATION_TIME =
>> "synapse.connectionCreationTime";
>>     public static final String SERVER_CONNECTION_DEBUG =
>> "synapse.server-connection-debug";
>>
>> -    public ServerHandler(final ConfigurationContext cfgCtx, final
>> HttpParams params,
>> -        final boolean isHttps, final NhttpMetricsCollector metrics,
>> -        Parser parser, PriorityExecutor executor, boolean
>> restDispatching,
>> -        HttpGetRequestProcessor httpGetRequestProcessor) {
>> +    private ListenerContext listenerContext = null;
>> +
>> +    public ServerHandler(ListenerContext listenerContext) {
>>         super();
>> -        this.cfgCtx = cfgCtx;
>> -        this.params = params;
>> -        this.isHttps = isHttps;
>> -        this.metrics = metrics;
>> +        this.listenerContext = listenerContext;
>> +        this.cfgCtx = listenerContext.getCfgCtx();
>> +        this.params = listenerContext.getParams();
>> +        this.isHttps = listenerContext.isSsl();
>> +        this.metrics = listenerContext.getMetrics();
>>         this.responseFactory = new DefaultHttpResponseFactory();
>>         this.httpProcessor = getHttpProcessor();
>>         this.connStrategy = new DefaultConnectionReuseStrategy();
>> @@ -137,10 +137,10 @@ public class ServerHandler implements NH
>>         this.activeConnections = new ArrayList<NHttpServerConnection>();
>>         this.latencyView = new LatencyView(isHttps);
>>         this.threadingView = new ThreadingView("HttpServerWorker", true,
>> 50);
>> -        this.restDispatching = restDispatching;
>> +        this.restDispatching = listenerContext.isRestDispatching();
>>
>>         this.cfg = NHttpConfiguration.getInstance();
>> -        if (executor == null)  {
>> +        if (listenerContext.getExecutor() == null)  {
>>             this.workerPool = WorkerPoolFactory.getWorkerPool(
>>                 cfg.getServerCoreThreads(),
>>                 cfg.getServerMaxThreads(),
>> @@ -148,11 +148,11 @@ public class ServerHandler implements NH
>>                 cfg.getServerQueueLen(),
>>                 "Server Worker thread group", "HttpServerWorker");
>>         } else {
>> -            this.executor = executor;
>> -            this.parser = parser;
>> +            this.executor = listenerContext.getExecutor();
>> +            this.parser = listenerContext.getParser();
>>         }
>>
>> -        this.httpGetRequestProcessor = httpGetRequestProcessor;
>> +        this.httpGetRequestProcessor =
>> listenerContext.getHttpGetRequestProcessor();
>>     }
>>
>>     /**
>> @@ -209,8 +209,8 @@ public class ServerHandler implements NH
>>                 metrics.incrementMessagesReceived();
>>             }
>>             // hand off processing of the request to a thread off the pool
>> -            ServerWorker worker = new ServerWorker(cfgCtx, conn, isHttps,
>> metrics, this,
>> -                        request, is, response, os, restDispatching,
>> httpGetRequestProcessor);
>> +            ServerWorker worker = new ServerWorker(listenerContext, conn,
>> this,
>> +                    request, is, response, os);
>>
>>             if (workerPool != null) {
>>                 workerPool.execute(worker);
>>
>> Modified:
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
>> URL:
>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java?rev=1072963&r1=1072962&r2=1072963&view=diff
>>
>> ==============================================================================
>> ---
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
>> (original)
>> +++
>> synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerWorker.java
>> Mon Feb 21 13:23:54 2011
>> @@ -95,38 +95,33 @@ public class ServerWorker implements Run
>>      * its output. This however does not force the processor to write a
>> response back as the
>>      * traditional servlet service() method, but creates the background
>> required to write the
>>      * response, if one would be created.
>> -     * @param cfgCtx the Axis2 configuration context
>> +     *
>> +     * @param listenerContext the listener configuration
>>      * @param conn the underlying http connection
>> -     * @param isHttps whether https or not
>> -     * @param metrics metrics for the transport
>>      * @param serverHandler the handler of the server side messages
>>      * @param request the http request received (might still be in the
>> process of being streamed)
>>      * @param is the stream input stream to read the request body
>>      * @param response the response to be populated if applicable
>>      * @param os the output stream to write the response body if one is
>> applicable
>> -     * @param isRestDispatching weather we should dispatch in case of
>> rest
>>      */
>> -    public ServerWorker(final ConfigurationContext cfgCtx, final
>> NHttpServerConnection conn,
>> -        final boolean isHttps,
>> -        final MetricsCollector metrics,
>> +    public ServerWorker(ListenerContext listenerContext,
>> +                        final NHttpServerConnection conn,
>>         final ServerHandler serverHandler,
>>         final HttpRequest request, final InputStream is,
>> -        final HttpResponse response, final OutputStream os,
>> -        final boolean isRestDispatching,
>> -        final HttpGetRequestProcessor httpGetRequestProcessor) {
>> +        final HttpResponse response, final OutputStream os) {
>>
>> -        this.cfgCtx = cfgCtx;
>> +        this.cfgCtx = listenerContext.getCfgCtx();
>>         this.conn = conn;
>> -        this.isHttps = isHttps;
>> -        this.metrics = metrics;
>> +        this.isHttps = listenerContext.isSsl();
>> +        this.metrics = listenerContext.getMetrics();
>>         this.serverHandler = serverHandler;
>>         this.request = request;
>>         this.response = response;
>>         this.is = is;
>>         this.os = os;
>>         this.msgContext = createMessageContext(request);
>> -        this.isRestDispatching = isRestDispatching;
>> -        this.httpGetRequestProcessor = httpGetRequestProcessor;
>> +        this.isRestDispatching = listenerContext.isRestDispatching();
>> +        this.httpGetRequestProcessor =
>> listenerContext.getHttpGetRequestProcessor();
>>     }
>>
>>     /**
>>
>>
>>
>


-- 
Supun Kamburugamuva
Technical Lead &  Product Manager, WSO2 Inc.; http://wso2.com
Member, Apache Software Foundation; http://www.apache.org
WSO2 Inc.;  http://wso2.org
E-mail: supun@wso2.com;  Mobile: +94 77 431 3585
Blog: http://supunk.blogspot.com

Mime
View raw message