synapse-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andreas Veithen <andreas.veit...@gmail.com>
Subject Re: svn commit: r773818 - in /synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix: FIXApplicationFactory.java FIXSessionFactory.java FIXTransportListener.java FIXTransportSender.java
Date Wed, 13 May 2009 09:11:25 GMT
We definitely need to make the WorkerPool created by
AbstractTransportListener configurable. The question whether this
should be done using a property file or using parameters in
TransportInDescription (as with all other configuration settings).
What are the arguments in favor of doing it in a separate property
file?

Andreas

On Wed, May 13, 2009 at 08:31, Hiranya Jayathilaka <hiranya911@gmail.com> wrote:
>
>
> On Wed, May 13, 2009 at 10:30 AM, Ruwan Linton <ruwan.linton@gmail.com>
> wrote:
>>
>> I think we need to make that configurable as well.... currently hard
>> codded setting will work in 98% of the cases, but there can be a scenario
>> where it requires a tune up.
>>
>> Can we do this in a manner that we can configure them per transport.
>
> One simple solution would be to read a transport specific configuration file
> at AbstractTransportListener#init(). The init method gets a
> TransportInDescription object as an argument and from that we can retrieve
> the transport name to construct a file name unique to a given transport (eg:
> mail.properties, vfs.properties). This approach has the benefit that it
> doesn't require changes to any of the actual transport implementations.
> Everything is taken care of by the abstract class.
>
> However this class now belongs to the WS-Commons transports project. So the
> enhancement should be made there.
>
> Thanks,
> Hiranya
>
>
>>
>> Thanks,
>> Ruwan
>>
>> On Wed, May 13, 2009 at 10:06 AM, Hiranya Jayathilaka
>> <hiranya911@gmail.com> wrote:
>>>
>>> Hi Ruwan,
>>>
>>> On Wed, May 13, 2009 at 12:43 AM, Ruwan Linton <ruwan.linton@gmail.com>
>>> wrote:
>>>>
>>>> Hiranya,
>>>>
>>>> If you can make the worker pool configurable that would be of much
>>>> importance... you may have a look at the nhttp transport thread pool, which
>>>> can be configurable via the nhttp.properties file.
>>>
>>> Currently the FIX sender initializes the WorkerPool in a manner similar
>>> to the AbstractTransportListener. The WorkerPool in
>>> AbstractTransportListener is used by several transports (JMS, Mail etc) via
>>> inheritance. FIX listener also makes use of the same thread pool. Do we have
>>> any plans to make that thread pool configurable too? Othrewise I don't think
>>> it mkes much sense just to make the FIX sender's thread pool configurable.
>>>
>>> Thanks,
>>> Hiranya
>>>
>>>>
>>>>
>>>> Thanks,
>>>> Ruwan
>>>>
>>>> On Tue, May 12, 2009 at 1:30 PM, <hiranya@apache.org> wrote:
>>>>>
>>>>> Author: hiranya
>>>>> Date: Tue May 12 08:00:27 2009
>>>>> New Revision: 773818
>>>>>
>>>>> URL: http://svn.apache.org/viewvc?rev=773818&view=rev
>>>>> Log:
>>>>> Enhancements and code cleanup in the FIX transport:
>>>>> * FIX sender now has its own worker pool and hence does not rely on the
>>>>> FIX listener any more. Therefore listener and sender can be enabled
>>>>> individually
>>>>> * Made FIXSessionFactory a singleton to effectively share session data
>>>>> among the listener and the sender
>>>>> * Cleanup logic for initiators during sender shutdown
>>>>> * Minor bug fix at FIXSessionFactory for a bug which prevented the
>>>>> sample 259 and similar scenarios from operating properly
>>>>>
>>>>> Modified:
>>>>>
>>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>>
>>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>>
>>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>>
>>>>>  synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>>
>>>>> Modified:
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>>
>>>>> ==============================================================================
>>>>> ---
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>> (original)
>>>>> +++
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXApplicationFactory.java
>>>>> Tue May 12 08:00:27 2009
>>>>> @@ -27,15 +27,12 @@
>>>>>  public class FIXApplicationFactory {
>>>>>
>>>>>     private ConfigurationContext cfgCtx;
>>>>> -    private WorkerPool workerPool;
>>>>> -
>>>>> -    public FIXApplicationFactory(ConfigurationContext cfgCtx,
>>>>> WorkerPool workerPool) {
>>>>>
>>>>> +    public FIXApplicationFactory(ConfigurationContext cfgCtx) {
>>>>>         this.cfgCtx = cfgCtx;
>>>>> -        this.workerPool = workerPool;
>>>>>     }
>>>>>
>>>>> -    public Application getFIXApplication(AxisService service, boolean
>>>>> acceptor) {
>>>>> +    public Application getFIXApplication(AxisService service,
>>>>> WorkerPool workerPool, boolean acceptor) {
>>>>>         return new FIXIncomingMessageHandler(cfgCtx, workerPool,
>>>>> service, acceptor);
>>>>>     }
>>>>>  }
>>>>> \ No newline at end of file
>>>>>
>>>>> Modified:
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>>
>>>>> ==============================================================================
>>>>> ---
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>> (original)
>>>>> +++
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXSessionFactory.java
>>>>> Tue May 12 08:00:27 2009
>>>>> @@ -23,6 +23,7 @@
>>>>>  import org.apache.axis2.description.AxisService;
>>>>>  import org.apache.axis2.description.Parameter;
>>>>>  import org.apache.axis2.transport.base.BaseUtils;
>>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>>>>  import org.apache.commons.logging.Log;
>>>>>  import org.apache.commons.logging.LogFactory;
>>>>>  import quickfix.*;
>>>>> @@ -65,16 +66,29 @@
>>>>>     /** A Map containing all the FIX applications created for
>>>>> initiators, keyed by FIX EPR */
>>>>>     private Map<String, Application> applicationStore;
>>>>>     /** An ApplicationFactory handles creating FIX Applications
>>>>> (FIXIncomingMessageHandler Objects) */
>>>>> -    private FIXApplicationFactory applicationFactory;
>>>>> +    private static FIXApplicationFactory applicationFactory = null;
>>>>> +
>>>>> +    private WorkerPool listenerThreadPool;
>>>>> +    private WorkerPool senderThreadPool;
>>>>>
>>>>>     private Log log;
>>>>>
>>>>> -    public FIXSessionFactory(FIXApplicationFactory applicationFactory)
>>>>> {
>>>>> -        this.applicationFactory = applicationFactory;
>>>>> +    private static FIXSessionFactory INSTANCE = new
>>>>> FIXSessionFactory();
>>>>> +
>>>>> +    public static FIXSessionFactory getInstance(FIXApplicationFactory
>>>>> af) {
>>>>> +        if (applicationFactory == null) {
>>>>> +            applicationFactory = af;
>>>>> +        }
>>>>> +        return INSTANCE;
>>>>> +    }
>>>>> +
>>>>> +    private FIXSessionFactory() {
>>>>>         this.log = LogFactory.getLog(this.getClass());
>>>>>         this.acceptorStore = new HashMap<String,Acceptor>();
>>>>>         this.initiatorStore = new HashMap<String, Initiator>();
>>>>>         this.applicationStore = new HashMap<String, Application>();
>>>>> +        this.listenerThreadPool = null;
>>>>> +        this.senderThreadPool = null;
>>>>>     }
>>>>>
>>>>>     /**
>>>>> @@ -101,7 +115,7 @@
>>>>>                 MessageFactory messageFactory = new
>>>>> DefaultMessageFactory();
>>>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>>>> settings, true);
>>>>>                 //Get a new FIX Application
>>>>> -                Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, true);
>>>>> +                Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, listenerThreadPool, true);
>>>>>                 //Create a new FIX Acceptor
>>>>>                 Acceptor acceptor = new SocketAcceptor(
>>>>>                         messageHandler,
>>>>> @@ -174,7 +188,7 @@
>>>>>         MessageStoreFactory storeFactory =
>>>>> getMessageStoreFactory(service, settings, false);
>>>>>         MessageFactory messageFactory = new DefaultMessageFactory();
>>>>>         //Get a new FIX application
>>>>> -        Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, false);
>>>>> +        Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>>>
>>>>>         try {
>>>>>            //Create a new FIX initiator
>>>>> @@ -216,7 +230,7 @@
>>>>>                 MessageFactory messageFactory = new
>>>>> DefaultMessageFactory();
>>>>>                 quickfix.LogFactory logFactory = getLogFactory(service,
>>>>> settings, true);
>>>>>                 //Get a new FIX Application
>>>>> -                Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, false);
>>>>> +                Application messageHandler =
>>>>> applicationFactory.getFIXApplication(service, senderThreadPool, false);
>>>>>
>>>>>                 Initiator initiator = new SocketInitiator(
>>>>>                     messageHandler,
>>>>> @@ -246,10 +260,10 @@
>>>>>             }
>>>>>
>>>>>         } else {
>>>>> -            String msg = "The " +
>>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>>>> -                    "not specified. Unable to initialize the
initiator
>>>>> session at this stage.";
>>>>> -            log.info(msg);
>>>>> -            throw new AxisFault(msg);
>>>>> +            // FIX initiator session is not configured
>>>>> +            // It could be intentional - So not an error (we don't
>>>>> need initiators at all times)
>>>>> +            log.info("The " +
>>>>> FIXConstants.FIX_INITIATOR_CONFIG_URL_PARAM + " parameter is " +
>>>>> +                    "not specified. Unable to initialize the
initiator
>>>>> session at this stage.");
>>>>>         }
>>>>>     }
>>>>>
>>>>> @@ -276,6 +290,24 @@
>>>>>     }
>>>>>
>>>>>     /**
>>>>> +     * Stops all the FIX initiators created so far and cleans up all
>>>>> the mappings
>>>>> +     * related to them
>>>>> +     */
>>>>> +    public void disposeFIXInitiators() {
>>>>> +        boolean debugEnabled = log.isDebugEnabled();
>>>>> +
>>>>> +        for (String key : initiatorStore.keySet()) {
>>>>> +            initiatorStore.get(key).stop();
>>>>> +            if (debugEnabled) {
>>>>> +                log.debug("FIX initiator to the EPR " + key
+ "
>>>>> stopped");
>>>>> +            }
>>>>> +        }
>>>>> +
>>>>> +        initiatorStore.clear();
>>>>> +        applicationStore.clear();
>>>>> +    }
>>>>> +
>>>>> +    /**
>>>>>      * Returns an array of Strings representing EPRs for the specified
>>>>> service
>>>>>      *
>>>>>      * @param serviceName the name of the service
>>>>> @@ -444,6 +476,14 @@
>>>>>         }
>>>>>         return app;
>>>>>     }
>>>>> +
>>>>> +    public void setListenerThreadPool(WorkerPool listenerThreadPool)
{
>>>>> +        this.listenerThreadPool = listenerThreadPool;
>>>>> +    }
>>>>> +
>>>>> +    public void setSenderThreadPool(WorkerPool senderThreadPool) {
>>>>> +        this.senderThreadPool = senderThreadPool;
>>>>> +    }
>>>>>  }
>>>>>
>>>>>
>>>>>
>>>>> Modified:
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>>
>>>>> ==============================================================================
>>>>> ---
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>> (original)
>>>>> +++
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportListener.java
>>>>> Tue May 12 08:00:27 2009
>>>>> @@ -60,14 +60,8 @@
>>>>>                      TransportInDescription trpInDesc) throws
AxisFault
>>>>> {
>>>>>
>>>>>         super.init(cfgCtx, trpInDesc);
>>>>> -        //initialize the FIXSessionFactory
>>>>> -        fixSessionFactory = new FIXSessionFactory(
>>>>> -                new FIXApplicationFactory(this.cfgCtx,
>>>>> this.workerPool));
>>>>> -        FIXTransportSender sender = (FIXTransportSender) cfgCtx.
>>>>> -
>>>>>  getAxisConfiguration().getTransportOut(FIXConstants.TRANSPORT_NAME).getSender();
>>>>> -        if (sender != null) {
>>>>> -            sender.setSessionFactory(fixSessionFactory);
>>>>> -        }
>>>>> +        fixSessionFactory = FIXSessionFactory.getInstance(new
>>>>> FIXApplicationFactory(cfgCtx));
>>>>> +        fixSessionFactory.setListenerThreadPool(this.workerPool);
>>>>>         log.info("FIX transport listener initialized...");
>>>>>     }
>>>>>
>>>>>
>>>>> Modified:
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java?rev=773818&r1=773817&r2=773818&view=diff
>>>>>
>>>>> ==============================================================================
>>>>> ---
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>> (original)
>>>>> +++
>>>>> synapse/trunk/java/modules/transports/optional/fix/src/main/java/org/apache/synapse/transport/fix/FIXTransportSender.java
>>>>> Tue May 12 08:00:27 2009
>>>>> @@ -28,6 +28,8 @@
>>>>>  import org.apache.axis2.transport.OutTransportInfo;
>>>>>  import org.apache.axis2.transport.base.AbstractTransportSender;
>>>>>  import org.apache.axis2.transport.base.BaseUtils;
>>>>> +import org.apache.axis2.transport.base.threads.WorkerPool;
>>>>> +import org.apache.axis2.transport.base.threads.WorkerPoolFactory;
>>>>>  import org.apache.commons.logging.LogFactory;
>>>>>  import quickfix.*;
>>>>>  import quickfix.field.*;
>>>>> @@ -51,17 +53,12 @@
>>>>>
>>>>>     private FIXSessionFactory sessionFactory;
>>>>>     private FIXOutgoingMessageHandler messageSender;
>>>>> +    private WorkerPool workerPool;
>>>>>
>>>>>     public FIXTransportSender() {
>>>>>         this.log = LogFactory.getLog(this.getClass());
>>>>>     }
>>>>>
>>>>> -
>>>>> -    public void setSessionFactory(FIXSessionFactory sessionFactory)
{
>>>>> -        this.sessionFactory = sessionFactory;
>>>>> -        this.messageSender.setSessionFactory(sessionFactory);
>>>>> -    }
>>>>> -
>>>>>     /**
>>>>>      * @param cfgCtx       the axis2 configuration context
>>>>>      * @param transportOut the Out Transport description
>>>>> @@ -69,10 +66,25 @@
>>>>>      */
>>>>>     public void init(ConfigurationContext cfgCtx,
>>>>> TransportOutDescription transportOut) throws AxisFault {
>>>>>         super.init(cfgCtx, transportOut);
>>>>> +        this.sessionFactory = FIXSessionFactory.getInstance(new
>>>>> FIXApplicationFactory(cfgCtx));
>>>>> +        this.workerPool = WorkerPoolFactory.getWorkerPool(
>>>>> +                            10, 20, 5, -1, "FIX Sender
Worker thread
>>>>> group", "FIX-Worker");
>>>>> +        this.sessionFactory.setSenderThreadPool(this.workerPool);
>>>>>         messageSender = new FIXOutgoingMessageHandler();
>>>>> +        messageSender.setSessionFactory(this.sessionFactory);
>>>>>         log.info("FIX transport sender initialized...");
>>>>>     }
>>>>>
>>>>> +    public void stop() {
>>>>> +        try {
>>>>> +            this.workerPool.shutdown(10000);
>>>>> +        } catch (InterruptedException e) {
>>>>> +            log.warn("Thread interrupted while waiting for worker
pool
>>>>> to shut down");
>>>>> +        }
>>>>> +        sessionFactory.disposeFIXInitiators();
>>>>> +        super.stop();
>>>>> +    }
>>>>> +
>>>>>     /**
>>>>>      * Performs the actual sending of the message.
>>>>>      *
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Ruwan Linton
>>>> Senior Software Engineer & Product Manager; WSO2 ESB;
>>>> http://wso2.org/esb
>>>> WSO2 Inc.; http://wso2.org
>>>> email: ruwan@wso2.com; cell: +94 77 341 3097
>>>> blog: http://ruwansblog.blogspot.com
>>>
>>>
>>>
>>> --
>>> Hiranya Jayathilaka
>>> Software Engineer;
>>> WSO2 Inc.;  http://wso2.org
>>> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
>>> Blog: http://techfeast-hiranya.blogspot.com
>>
>>
>>
>> --
>> Ruwan Linton
>> Senior Software Engineer & Product Manager; WSO2 ESB; http://wso2.org/esb
>> WSO2 Inc.; http://wso2.org
>> email: ruwan@wso2.com; cell: +94 77 341 3097
>> blog: http://ruwansblog.blogspot.com
>
>
>
> --
> Hiranya Jayathilaka
> Software Engineer;
> WSO2 Inc.;  http://wso2.org
> E-mail: hiranya@wso2.com;  Mobile: +94 77 633 3491
> Blog: http://techfeast-hiranya.blogspot.com
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@synapse.apache.org
For additional commands, e-mail: dev-help@synapse.apache.org


Mime
View raw message