trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aku...@apache.org
Subject svn commit: r954525 [1/2] - in /trafficserver/traffic/branches/UserFiber/core: include/ src/
Date Mon, 14 Jun 2010 15:55:08 GMT
Author: akundu
Date: Mon Jun 14 15:55:08 2010
New Revision: 954525

URL: http://svn.apache.org/viewvc?rev=954525&view=rev
Log:
- Bryan Call's bringing in of uf_swapcontext to avoid calling sigprocmask on every fiber context switch
- Raghav's first version of UFConf to provide a config system for UF
- Vijaya's fix to UFConnectionPoolImpl.C to reset the group's TTL on every DNS call to the resolver
- Steve Jiang's makefile changes to better integrate UFAres and getting around undefined symbols from the c-ares lib. in the archive libUF.a
- Anirban's fixes
    - UF
        - support 64-bit makecontext
        - addFiberToScheduler now allows to pass a list or a single UF to be added
        - call uf_swapcontext in 64-bit mode
    - UFPC
        - support for both streaming and non-streaming modes of data exchange between prod./consumer. (wake up only after all the data has been gotten by the producer)
        - support to not use any locks or signaling mechanisms while interacting between producer and consumer, if the developer knows that the producer and consumer are being executed in the same thread
        - optimization of not creating the iterator to iterate through consumers if there is only one consumer
        - UFPC now returns an int for status instead of UFPC code (This allows the dev. to specify random error codes to cooperate between consumer + producer)
        - producer now sits on a condWait (instead of a timer) to get notified about consumers removing the producer from their list
        - broke out the c'tor of the producer into a init fxn and provided a reset fxn to be used by a factory and by the d'tor. Applies to consumer too.
    - UFIO
        - added the active flag to be able to not activate ufs for inactive ufios
        - using the above flag, the UFConnPool can now turn off the activation of UFIOs that are not being actively managed by UFs (especially if the conn. is waiting to be used and has some activity)
        - conn pool now respects the ttl setting for a name and sets the ttl to the lowest ttl of the resolved to A-records
        - follow a map of group names -> list of ips and each ip -> ipinfo map structure
        - implement the sleep processing for inactive connections
        - break-out c-ares usage inside #defines (since there is some unexpected crashes w/ the c-ares usage)
    - UFServer
        - automatically restart a child worker process that has died
        - only allow NETIO threads (not ACCEPT threads) to handle client communication
        - better error message outputs for the various tasks that threads are being allocated for in init time


Added:
    trafficserver/traffic/branches/UserFiber/core/include/UFConf.H
    trafficserver/traffic/branches/UserFiber/core/include/UFStats.H
    trafficserver/traffic/branches/UserFiber/core/src/UFConf.C
    trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
    trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
    trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S   (with props)
Modified:
    trafficserver/traffic/branches/UserFiber/core/include/UF.H
    trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
    trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
    trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
    trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
    trafficserver/traffic/branches/UserFiber/core/src/Makefile
    trafficserver/traffic/branches/UserFiber/core/src/UF.C
    trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
    trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
    trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
    trafficserver/traffic/branches/UserFiber/core/src/UFServer.C

Modified: trafficserver/traffic/branches/UserFiber/core/include/UF.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UF.H?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UF.H Mon Jun 14 15:55:08 2010
@@ -26,6 +26,8 @@ enum UFStatus
     COMPLETED           = 4
 };
 
+typedef unsigned long long int TIME_IN_US;
+
 //create the type of UF you'd like to pass into the accept handler
 typedef unsigned long long int UFId;
 struct UFScheduler;
@@ -162,7 +164,7 @@ struct UFScheduler
     bool addFiberToScheduler(UF* uf,      /* the UF to add */
                              pthread_t tid = 0); /* the thread to add the UF to */
     //add the fxn to add multiple ufs in one shot (if they're on one tid)
-    bool addFibersToScheduler(const std::list<UF*>& ufList, 
+    bool addFiberToScheduler(const std::list<UF*>& ufList, 
                               pthread_t tid = 0);
 
 
@@ -235,11 +237,12 @@ protected:
     list<UFWaitInfo*>  _availableWaitInfo;
     UFWaitInfo* getWaitInfo();
     void releaseWaitInfo(UFWaitInfo& ufsi);
+    bool addFiberToSelf(UF* uf);
+    bool addFiberToAnotherThread(const std::list<UF*>& ufList, pthread_t tid);
 
 public:
     UFMutex testingCondTimedWait;
 };
-
 inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningListSize; }
 inline bool UFScheduler::shouldExit() const { return (_exitJustMe || _exit) ? true : false; }
 inline unsigned long long int UFScheduler::getAmtToSleep() const { return _amtToSleep; }

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFAres.H?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFAres.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFAres.H Mon Jun 14 15:55:08 2010
@@ -31,6 +31,7 @@ class UFAresUFIO : public UFIO
 	
 	~UFAresUFIO()
 	{
+	    destroy();
 	    cerr<<"aresio destructor called"<<endl;
 	};
         
@@ -90,7 +91,7 @@ class UFAres : public UFDNS
     public:
 	 UFAres()  { };
 	 ~UFAres() { };
-	 unsigned long int GetHostByName(const char *name, uint32_t timeout = 0);
+	 UFHostEnt* GetHostByName(const char *name, uint32_t timeout = 0);
 	 hostent *GetHostByNameDebug(const char *name, uint32_t timeout = 0);
 	 hostent *GetHostByAddr(uint32_t ip , uint32_t timeout = 0, uint32_t family = AF_INET) { return NULL; };
 	 hostent *GetSrvByName(const char *name, uint32_t timeout = 0) {  return NULL; };

Added: trafficserver/traffic/branches/UserFiber/core/include/UFConf.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFConf.H?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFConf.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFConf.H Mon Jun 14 15:55:08 2010
@@ -0,0 +1,93 @@
+#ifndef __UF_CONF_H__
+#define __UF_CONF_H__
+
+#include <string>
+#include <vector>
+#include <ext/hash_map>
+
+using namespace std;
+namespace std { using namespace __gnu_cxx; }
+
+namespace __gnu_cxx
+{
+    template<> struct hash< std::string >
+    {
+        size_t operator()( const std::string& x ) const
+            {
+                return hash< const char* >()( x.c_str() );
+            }
+    };
+}
+
+/** Base class for conf value type 
+ *  The sole purpose of this class is to act as a base class for conf value types
+ */
+class ConfValueBase
+{
+public:
+    virtual void dump(ostream &output)=0;
+    virtual ~ConfValueBase() { }
+};
+
+/** Template conf value 
+ *  This covers all built in types.
+ *  This is used by UFConf for string, int, bool and double
+ *  For specialized conf value types, derive from ConfValueBase
+ */
+template <class T>
+class ConfValue : public ConfValueBase {
+public:
+    T mElement;
+    void dump(ostream& output) { output << mElement; }
+    friend ostream& operator <<  (ostream& output, const ConfValue<T>& value)
+    {
+        output << value.mElement;
+        return output;
+    }
+};
+
+/** Holds config data for a given file
+ *  The class has a parent conf variable.
+ *  the get* functions look at _data. If the requested key is not found, they lookup the parent
+ */
+class UFConf
+{
+public:
+    UFConf() : _parent(NULL) { }
+    ConfValueBase *get(const std::string &key);
+    int *getInt(const std::string &key);
+    double *getDouble(const std::string &key);
+    string *getString(const std::string &key);
+    bool *getBool(const std::string &key);
+
+    void setInt(const std::string &key, int value);
+    void setDouble(const std::string &key, double value);
+    void setString(const std::string &key, const std::string &value);
+    void setBool(const std::string &key, bool value);
+    
+    void setParent(UFConf *parent) { _parent = parent; }
+    bool parse(const std::string &conf_file);
+    ~UFConf();
+
+    friend ostream& operator<<(ostream& output, const UFConf &conf);
+    
+private:
+    UFConf *_parent;
+    std::hash_map<std::string, ConfValueBase *> _data;
+};
+
+/** Manages config objects in the system 
+ * 
+ */
+class UFConfManager
+{
+public:
+    static UFConf* addChildConf(const std::string &conf_file, const std::string &parent_conf_file="/home/y/conf/UF/uf.conf");
+    static UFConf* addConf(const std::string &conf_file);
+    static UFConf* getConf(const std::string &conf_file);
+    static void dump();
+private:
+    static std::hash_map<std::string, UFConf *> _configs;
+};
+
+#endif

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H Mon Jun 14 15:55:08 2010
@@ -3,7 +3,7 @@
 
 #include "UF.H"
 #include "UFIO.H"
-
+class UFHostEnt;
 class UFDNS : public UFIO
 {
    public:
@@ -19,7 +19,7 @@ class UFDNS : public UFIO
        virtual ~UFDNS() { };
        
        virtual struct hostent *GetHostByNameDebug(const char *name, uint32_t timeout = 0) = 0 ;
-       virtual unsigned long int GetHostByName(const char *name, uint32_t timeout = 0) = 0;
+       virtual UFHostEnt* GetHostByName(const char *name, uint32_t timeout = 0) = 0;
        virtual struct hostent *GetHostByAddr(uint32_t ip , uint32_t timeout = 0, uint32_t family = AF_INET) = 0 ;
        virtual struct hostent *GetSrvByName(const char *name, uint32_t = 0) = 0 ;
        

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFIO.H?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFIO.H Mon Jun 14 15:55:08 2010
@@ -11,8 +11,6 @@ namespace std { using namespace __gnu_cx
 
 
 
-typedef unsigned long long int TIME_IN_US;
-
 
 
 struct UFIOAcceptThreadChooser
@@ -68,7 +66,7 @@ struct UFIO
 
     //The fxn will call isSetup which will make the connection request non-blocking and setup the socket
     //TODO: support ipv6 connect
-    int connect(const struct sockaddr *addr, int addrlen, TIME_IN_US timeout);
+    bool connect(const struct sockaddr *addr, int addrlen, TIME_IN_US timeout);
 
     //since this is most likely going to be run on an edge trigger system
     //reads should be done in such a manner so that if the read call
@@ -113,6 +111,7 @@ struct UFIO
 
     UFSleepInfo*                _sleepInfo;
     bool                        _markedActive;
+    bool                        _active;
 
 protected:
     int                         _fd;

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFPC.H?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFPC.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFPC.H Mon Jun 14 15:55:08 2010
@@ -2,6 +2,7 @@
 #define UFPC_H
 
 #include <set>
+#include <stack>
 #include "UF.H"
 
 struct UFMutex;
@@ -9,15 +10,10 @@ struct UFProducerData;
 struct UFConsumer;
 struct UFProducer;
 
-enum UFProducerDataCode
-{
-    ADD = 1,
-    END = 2
-};
-
 struct UFConsumer
 {
-    UFConsumer(bool shouldLockForInternalMods = true);
+    friend class UFProducer;
+    UFConsumer(bool notifyOnExitOnly = false);
     virtual ~UFConsumer();
     //user needs to call input->releaseMe after consuming the data
     UFProducerData* waitForData(UF* uf = 0);
@@ -25,45 +21,57 @@ struct UFConsumer
 
     bool joinProducer(UFProducer* ufp);
     bool removeProducer(UFProducer* ufp);
+    void reset();
 
     std::list<UFProducerData*>      _queueOfDataToConsume;
     UFMutex                         _queueOfDataToConsumeLock;
+    bool getNotifyOnExitOnly() const;
 
+    UF* getUF() const;
+    bool                            _requireLockToWaitForUpdate; //if the developer is aware that both the producer and all the consumers are going to run in the same thread - only then set this variable to false to gain some perf. benefits
 protected:
     std::set<UFProducer*>           _consumersProducerSet;
     UFMutex                         _consumersProducerSetLock;
-    bool                            _shouldLockForInternalMods;
+    bool                            _notifyOnExitOnly;
+    UF*                             _currUF;
 };
+inline UF* UFConsumer::getUF() const { return _currUF; }
+inline bool UFConsumer::getNotifyOnExitOnly() const { return _notifyOnExitOnly; }
 
 struct UFProducer
 {
     friend class UFConsumer;
     UFProducer();
-    ~UFProducer();
+    virtual ~UFProducer();
     bool removeConsumer(UFConsumer* ufc);
-    bool produceData(void* data, size_t size, bool freeDataOnExit = true, UF* uf = 0);
+    bool produceData(void* data, unsigned int size, int ufpcCode, bool freeDataOnExit = true, UF* uf = 0);
+    void reset();
+    void init();
+    bool                            _requireLockToUpdateConsumers;//if the developer is aware that both the producer and the consumers are going to run in the same thread - only then set this variable to false to gain some perf. benefits
 
 protected:
     std::set<UFConsumer*>           _producersConsumerSet;
+    size_t                          _producersConsumerSetSize;
+    UFConsumer*                     _mostRecentConsumerAdded;
     UFMutex                         _producersConsumerSetLock; //needed when the consumers are adding or removing themselves from the consumerList
+    bool                            _acceptNewConsumers;
 
-    /*TODO: producer can keep a collection of the data
-    std::list<UFProducerData*>      _producerData;
-    UFMutex                         _producerDataMutex; //this mutex is needed to add to the producerdata
-    */
     bool addConsumer(UFConsumer* ufc);
+    UF*                             _uf;
 };
 
 struct UFProducerData
 {
     void*                           _data;
-    size_t                          _size;
+    unsigned int                    _size;
     UFProducer*                     _producerWhichInserted;
-    UFProducerDataCode              _ufpcCode;
+    int                             _ufpcCode;
     bool                            _freeDataOnExit;
+    bool                            _lockToUpdate;
 
     void addRef(size_t numToAdd = 1);
     void reduceRef();
+    void reset();
 
     static UFProducerData* getObj();
     static void releaseObj(UFProducerData* obj);
@@ -71,43 +79,80 @@ struct UFProducerData
 
 protected:
     ~UFProducerData();
-    UFProducerData() { _referenceCount = 0; _freeDataOnExit = true; }
+    UFProducerData() { reset(); }
     UFMutex                         _controlReferenceCount; //control the ref. count of this data
     size_t                          _referenceCount;
+    static std::stack<UFProducerData*> _objList;
+    static UFMutex                  _objListMutex;
 };
+inline void UFProducerData::reset()
+{
+    _referenceCount = 0; 
+    _freeDataOnExit = true; 
+    _lockToUpdate = true;
+}
 
 inline bool UFProducer::addConsumer(UFConsumer* ufc)
 {
-    _producersConsumerSetLock.getSpinLock();
+    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
+    if(!_acceptNewConsumers)
+    {
+        if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
+        return false;
+    }
     _producersConsumerSet.insert(ufc); //check insertion
-    _producersConsumerSetLock.releaseSpinLock();
+    _producersConsumerSetSize++;
+    _mostRecentConsumerAdded = ufc;
+    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
     return true;
 }
 
 inline bool UFProducer::removeConsumer(UFConsumer* ufc)
 {
-    _producersConsumerSetLock.getSpinLock();
+    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
     _producersConsumerSet.erase(ufc);
-    _producersConsumerSetLock.releaseSpinLock();
+    _producersConsumerSetSize--;
+    if(_requireLockToUpdateConsumers) 
+        _producersConsumerSetLock.signal();
+    else
+    {
+        if(_uf)
+            UFScheduler::getUFScheduler()->addFiberToScheduler(_uf);
+    }
+    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
     return true;
 }
 
 inline void UFProducerData::addRef(size_t numToAdd)
 {
-    _controlReferenceCount.getSpinLock();
+    if(_lockToUpdate) _controlReferenceCount.getSpinLock();
     _referenceCount = numToAdd;
-    _controlReferenceCount.releaseSpinLock();
+    if(_lockToUpdate) _controlReferenceCount.releaseSpinLock();
 }
 
 inline void UFProducerData::reduceRef()
 {
-    _controlReferenceCount.getSpinLock();
+    if(_lockToUpdate) _controlReferenceCount.getSpinLock();
     --_referenceCount;
-    _controlReferenceCount.releaseSpinLock();
+    if(_lockToUpdate) _controlReferenceCount.releaseSpinLock();
 }
 
 inline UFProducerData* UFProducerData::getObj()
 {
+    /*
+    _objListMutex.getSpinLock();
+    if(!_objList.empty())
+    {
+        UFProducerData* retVal = _objList.top();
+        _objList.pop();
+        _objListMutex.releaseSpinLock();
+        retVal->reset();
+        return retVal;
+    }
+    _objListMutex.releaseSpinLock();
+    */
     return new UFProducerData();
 }
 
@@ -116,11 +161,18 @@ inline void UFProducerData::releaseObj(U
     if(!obj)
         return;
 
-    obj->_controlReferenceCount.getSpinLock();
+    if(obj->_lockToUpdate) obj->_controlReferenceCount.getSpinLock();
     if(!--obj->_referenceCount)
+    {
+        /*
+        _objListMutex.getSpinLock();
+        _objList.push(obj);
+        _objListMutex.releaseSpinLock();
+        */
         delete obj;
-    else
-        obj->_controlReferenceCount.releaseSpinLock();
+        return;
+    }
+    if(obj->_lockToUpdate) obj->_controlReferenceCount.releaseSpinLock();
 }
 
 inline UFProducerData::~UFProducerData()
@@ -129,4 +181,16 @@ inline UFProducerData::~UFProducerData()
         free (_data);
 }
 
+inline UFConsumer::~UFConsumer() { reset(); } 
+
+inline UFProducer::~UFProducer() { reset(); }
+inline void UFProducer::init() 
+{ 
+    _acceptNewConsumers = true; 
+    _requireLockToUpdateConsumers = true; 
+    _producersConsumerSetSize = 0;
+    _mostRecentConsumerAdded = 0;
+}
+inline UFProducer::UFProducer() { init(); }
+
 #endif

Added: trafficserver/traffic/branches/UserFiber/core/include/UFStats.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStats.H?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFStats.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFStats.H Mon Jun 14 15:55:08 2010
@@ -0,0 +1,17 @@
+#ifndef _UF_STATS_H
+#define _UF_STATS_H
+
+#include <stdint.h>
+
+namespace UFStats
+{
+    void registerStats(bool lock_needed = false);
+    extern uint32_t connectionsHandled;
+    extern uint32_t txnSuccess;
+    extern uint32_t txnFail;
+    extern uint32_t txnReject;
+    extern uint32_t bytesRead;
+    extern uint32_t bytesWritten;
+}
+
+#endif

Modified: trafficserver/traffic/branches/UserFiber/core/src/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/Makefile?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/Makefile Mon Jun 14 15:55:08 2010
@@ -21,22 +21,25 @@ LIB_DIR=../lib
 all: $(LIB_DIR)/libUF.a
 
 $(ARES_LIB): $(ARES_SRC)
-	pushd $(ARESDIR)/$(ARES) && ./configure && gmake && popd
+	cd $(ARESDIR)/$(ARES) && ./configure && make
 
 $(ARES_SRC):
 	if [ ! -d $(ARESDIR)/$(ARES) ] ; then \
-	   pushd $(ARESDIR) && \
+	   rm $(ARESDIR)/$(ARES_SRC_FILE); \
+	   cd $(ARESDIR) && \
 	   wget http://c-ares.haxx.se/$(ARES_SRC_FILE) && \
-	   tar xzf ./$(ARES_SRC_FILE) && popd; \
+	   gpg --keyserver hkp://pgp.mit.edu --recv-keys 0x279D5C91 && \
+	   gpg --verify ./$(ARES_GPG) ./$(ARES_SRC_FILE) && \
+	   tar xzf ./$(ARES_SRC_FILE); \
 	fi
 
-$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H
+$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H UFSwapContext.H
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UF.o UF.C
 
 $(LIB_DIR)/UFPC.o: UFPC.C $(INCLUDE_DIR)/UFPC.H $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFPC.o UFPC.C
 
-$(LIB_DIR)/UFConnectionPoolImpl.o: UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
+$(LIB_DIR)/UFConnectionPoolImpl.o: $(LIB_DIR)/UFAres.o UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConnectionPoolImpl.o UFConnectionPoolImpl.C
 
 $(LIB_DIR)/UFIO.o: UFIO.C $(INCLUDE_DIR)/UFIO.H $(LIB_DIR)/UF.o
@@ -48,14 +51,20 @@ $(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_
 $(LIB_DIR)/UFStatSystem.o: UFStatSystem.C $(INCLUDE_DIR)/UFStatSystem.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFStatSystem.o UFStatSystem.C
 
-$(LIB_DIR)/UFServer.o: UFServer.C $(INCLUDE_DIR)/UFServer.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFConnectionPoolImpl.o
+$(LIB_DIR)/UFStats.o: UFStats.C $(INCLUDE_DIR)/UFStats.H $(LIB_DIR)/UFStatSystem.o
+	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFStats.o UFStats.C
+
+$(LIB_DIR)/UFConf.o: UFConf.C $(INCLUDE_DIR)/UFConf.H
+	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConf.o UFConf.C
+
+$(LIB_DIR)/UFServer.o: UFServer.C $(INCLUDE_DIR)/UFServer.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFConnectionPoolImpl.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFServer.o UFServer.C
 
-$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFServer.o \
-	$(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
-	$(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $(ARES_LIB) \
-	$(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFServer.o \
-	$(LIB_DIR)/UFConnectionPoolImpl.o $(LIB_DIR)/UFAres.o $(LIB_DIR)/UFPC.o
+$(LIB_DIR)/UFSwapContext.o: UFSwapContext.S
+	$(CC) -c -o $@ $^
+
+$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
+	$(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $(ARESDIR)/$(ARES)/.libs/*.o $^
 	$(RANLIB) $(LIB_DIR)/libUF.a
 
 clean: 

Modified: trafficserver/traffic/branches/UserFiber/core/src/UF.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UF.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UF.C Mon Jun 14 15:55:08 2010
@@ -2,33 +2,40 @@
 #include "UFConnectionPool.H"
 
 #include <string.h>
-
 #include <iostream>
 #include <errno.h>
 #include <stdlib.h>
-
 #include <unistd.h>
 #include <signal.h>
 #include <stdio.h>
 #include <malloc.h>
 #include <sys/mman.h>
+#include "UFSwapContext.H"
 
 using namespace std;
 
+#if __WORDSIZE == 64
+static void runFiber(unsigned int lo, unsigned int hi)
+#else
 static void runFiber(void* args)
+#endif
 {
+#if __WORDSIZE == 64
+    UF* uf = (UF*)((((unsigned long)hi)<<32)+(unsigned long)lo);
+#else
     if(!args)
         return;
-
     UF* uf = (UF*)args;
+#endif
     uf->run();
     uf->_status = COMPLETED;
 }
 
 ///////////////UF/////////////////////
-UFFactory* UFFactory::_instance = 0;;
+UFFactory* UFFactory::_instance = 0;
 const unsigned int DEFAULT_STACK_SIZE = 4*4096;
 UFId UF::_globalId = 0;
+
 UF::UF()
 { 
     _startingArgs = 0;
@@ -156,97 +163,119 @@ UFScheduler::UFScheduler()
     _amtToSleep = 0;
 }
 
-UFScheduler::~UFScheduler()
+UFScheduler::~UFScheduler() { /*pthread_key_delete(_specific_key);*/ }
+
+bool UFScheduler::addFiberToSelf(UF* uf)
 {
-    //pthread_key_delete(_specific_key);
+    if(uf->_status == WAITING_TO_RUN) //UF is already in the queue
+        return true;
+    uf->_status = WAITING_TO_RUN;
+    if(uf->_parentScheduler) //probably putting back an existing uf into the active list
+    {
+        if(uf->_parentScheduler == this) //check that we're scheduling for the same thread
+        {
+            _activeRunningList.push_back(uf); ++_activeRunningListSize;
+            return true;
+        }
+        else
+        {
+            cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->_parentScheduler<<endl;
+            abort(); //TODO: remove the abort
+            return false;
+        }
+    }
+
+    //create a new context
+    uf->_parentScheduler = this;
+    uf->_UFContext.uc_link = &_mainContext;
+
+    getcontext(&(uf->_UFContext));
+    errno = 0;
+
+#if __WORDSIZE == 64
+    makecontext(&(uf->_UFContext), (void (*)(void)) runFiber, 2, (int)(ptrdiff_t)uf, (int)((ptrdiff_t)uf>>32));
+#else
+    makecontext(&(uf->_UFContext), (void (*)(void)) runFiber, 1, (void*)uf);
+#endif
+    if(errno != 0)
+    {
+        cerr<<"error while trying to run makecontext"<<endl;
+        return false;
+    }
+    _activeRunningList.push_back(uf); ++_activeRunningListSize;
+    return true;
 }
 
+bool UFScheduler::addFiberToAnotherThread(const list<UF*>& ufList, pthread_t tid)
+{
+    //find the other thread -- 
+    //TODO: have to lock before looking at this map - 
+    //since it could be changed if more threads are added later - not possible in the test that is being run (since the threads are created before hand)
+    UF* uf = 0;
+    list<UF*>::const_iterator beg = ufList.begin();
+    list<UF*>::const_iterator ending = ufList.end();
+    ThreadUFSchedulerMap::iterator index = _threadUFSchedulerMap.find(tid);
+    if(index == _threadUFSchedulerMap.end())
+    {
+        cerr<<"couldnt find the scheduler associated with "<<tid<<" for uf = "<<*beg<<endl;
+        ThreadUFSchedulerMap::iterator beg = _threadUFSchedulerMap.begin();
+        return false;
+    }
+
+    UFScheduler* ufs = index->second;
+    pthread_mutex_lock(&(ufs->_mutexToNominateToActiveList));
+    for(; beg != ending; ++beg)
+    {
+        uf = *beg;
+        ufs->_nominateToAddToActiveRunningList.push_back(uf);
+    }
+    pthread_cond_signal(&(ufs->_condToNominateToActiveList));
+    pthread_mutex_unlock(&(ufs->_mutexToNominateToActiveList));
+    ufs->notifyUF();
+    return true;
+}
 
 bool UFScheduler::addFiberToScheduler(UF* uf, pthread_t tid)
 {
     if(!uf)
     {
-        cerr<<"returning cause there is a scheduler already"<<endl;
+        cerr<<"null uf provided to scheduler"<<endl;
         return false;
     }
 
-    list<UF*> ufList;
-    ufList.push_back(uf);
-    return addFibersToScheduler(ufList, tid);
+    //adding to the same scheduler and as a result thread as the current job
+    if(!tid || (tid == pthread_self()))
+        return addFiberToSelf(uf);
+    else //adding to some other threads' scheduler
+    {
+        list<UF*> l;
+        l.push_back(uf);
+        return addFiberToAnotherThread(l, tid);
+    }
 }
 
-bool UFScheduler::addFibersToScheduler(const list<UF*>& ufList, pthread_t tid)
+bool UFScheduler::addFiberToScheduler(const list<UF*>& ufList, pthread_t tid)
 {
     if(ufList.empty())
         return true;
 
+    //adding to the same scheduler and as a result thread as the current job
     UF* uf = 0;
     list<UF*>::const_iterator beg = ufList.begin();
     list<UF*>::const_iterator ending = ufList.end();
-    //adding to the same scheduler and as a result thread as the current job
     if(!tid || (tid == pthread_self()))
     {
         for(; beg != ending; ++beg)
         {
             uf = *beg;
-            if(uf->_status == WAITING_TO_RUN) //UF is already in the queue
+            if(addFiberToSelf(uf))
                 continue;
-            uf->_status = WAITING_TO_RUN;
-            if(uf->_parentScheduler) //probably putting back an existing uf into the active list
-            {
-                if(uf->_parentScheduler == this) //check that we're scheduling for the same thread
-                {
-                    _activeRunningList.push_back(uf); ++_activeRunningListSize;
-                    continue;
-                }
-                else
-                {
-                    cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->_parentScheduler<<endl;
-                    abort(); //TODO: remove the abort
-                    return false;
-                }
-            }
-
-            //create a new context
-            uf->_parentScheduler = this;
-            uf->_UFContext.uc_link = &_mainContext;
-
-            getcontext(&(uf->_UFContext));
-            errno = 0;
-            makecontext(&(uf->_UFContext), (void (*)(void)) runFiber, 1, (void*)uf);
-            if(errno != 0)
-            {
-                cerr<<"error while trying to run makecontext"<<endl;
+            else
                 return false;
-            }
-            _activeRunningList.push_back(uf); ++_activeRunningListSize;
         }
     }
     else //adding to some other threads' scheduler
-    {
-        //find the other thread -- 
-        //TODO: have to lock before looking at this map - 
-        //since it could be changed if more threads are added later - not possible in the test that is being run (since the threads are created before hand)
-        ThreadUFSchedulerMap::iterator index = _threadUFSchedulerMap.find(tid);
-        if(index == _threadUFSchedulerMap.end())
-        {
-            cerr<<"couldnt find the scheduler associated with "<<tid<<" for uf = "<<*beg<<endl;
-            ThreadUFSchedulerMap::iterator beg = _threadUFSchedulerMap.begin();
-            return false;
-        }
-
-        UFScheduler* ufs = index->second;
-        pthread_mutex_lock(&(ufs->_mutexToNominateToActiveList));
-        for(; beg != ending; ++beg)
-        {
-            uf = *beg;
-            ufs->_nominateToAddToActiveRunningList.push_back(uf);
-        }
-        pthread_cond_signal(&(ufs->_condToNominateToActiveList));
-        pthread_mutex_unlock(&(ufs->_mutexToNominateToActiveList));
-        ufs->notifyUF();
-    }
-
+        return addFiberToAnotherThread(ufList, tid);
     return true;
 }
 
@@ -283,11 +312,14 @@ void UFScheduler::runScheduler()
             UF* uf = *ufBeg;
             _currentFiber = uf;
             uf->_status = RUNNING;
+#if __WORDSIZE == 64
+            uf_swapcontext(&_mainContext, &(uf->_UFContext));
+#else
             swapcontext(&_mainContext, &(uf->_UFContext));
+#endif
             _currentFiber = 0;
 
-            if(uf->_status == RUNNING) { }
-            else if(uf->_status == BLOCKED)
+            if(uf->_status == BLOCKED)
             {
                 ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
                 continue;
@@ -299,6 +331,7 @@ void UFScheduler::runScheduler()
                 continue;
             }
 
+            //else uf->_status == RUNNING
             uf->_status = WAITING_TO_RUN;
             ++ufBeg;
         }
@@ -771,7 +804,7 @@ void* setupThread(void* args)
 
     list<UF*>* ufsToStartWith = (list<UF*>*) args;
     UFScheduler ufs;
-    ufs.addFibersToScheduler(*ufsToStartWith, 0);
+    ufs.addFiberToScheduler(*ufsToStartWith, 0);
     delete ufsToStartWith;
 
     //run the scheduler

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFAres.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFAres.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFAres.C Mon Jun 14 15:55:08 2010
@@ -3,7 +3,7 @@
 
 static void printHost(struct hostent* host , struct ares_addrttl *ttls = 0, int nttl = 0)
 {
-    return;
+  //  return;
         int i;
 	for(i = 0 ;  host->h_addr_list[i] != NULL; i++)
 	{
@@ -97,6 +97,7 @@ UFAresUFIO* UFAres::GetNewAresUFIO()
     if( list_ares_ufio_.empty() == true)
     {
 	aresio = new UFAresUFIO();
+	aresio->Init(mycall,aresio);
     }
     else
     {
@@ -114,7 +115,7 @@ void UFAres::ReleaseAresUFIO(UFAresUFIO 
     if(aresio)
     {
 	aresio->set_complete(false);
-	aresio->destroy();
+	//aresio->destroy();
 	list_ares_ufio_.push_back(aresio);
     }
     else
@@ -123,7 +124,7 @@ void UFAres::ReleaseAresUFIO(UFAresUFIO 
 }
 
 
-unsigned long int  UFAres::GetHostByName(const char *name, uint32_t timeout)
+UFHostEnt* UFAres::GetHostByName(const char *name, uint32_t timeout)
 {
     UFHostEnt* myhostent_ = GetCachedHostent(name);
     if(myhostent_ == NULL)
@@ -140,14 +141,13 @@ unsigned long int  UFAres::GetHostByName
     if(ip != 0)
     {
             myhostent_->unlock(UFScheduler::getUF());
-	    return ip;
+	    return myhostent_;
     }
     else
     {
-	 myhostent_->ReleaseEntries();
+	myhostent_->ReleaseEntries();
 
 	UFAresUFIO *aresio = GetNewAresUFIO();
-	aresio->Init(mycall,aresio);
 	aresio->set_myhostent(myhostent_);
 	
 	ares_search(aresio->get_channel(),name,1, 1,arescallback,aresio);
@@ -165,7 +165,7 @@ unsigned long int  UFAres::GetHostByName
         myhostent_ = aresio->get_myhostent(); 
         ReleaseAresUFIO(aresio);
         myhostent_->unlock(UFScheduler::getUF());
-        return myhostent_->GetFirstIP();
+        return myhostent_;
 	
     }
 }

Added: trafficserver/traffic/branches/UserFiber/core/src/UFConf.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConf.C?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConf.C (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConf.C Mon Jun 14 15:55:08 2010
@@ -0,0 +1,301 @@
+#include "UFConf.H"
+
+#include <fstream>
+#include <iostream>
+#include <sstream>
+
+/** set string value in conf
+ *  Converts the string value that is passed in to a ConfValue and stores it in the conf hash
+ */
+void UFConf::setString(const string &type, const string &value)
+{
+    ConfValueBase *existingValue = get(type);
+    if(existingValue != NULL) {
+        delete existingValue;
+    }
+    ConfValue<string> *sharedString = new ConfValue<string>;
+    sharedString->mElement = value;
+    _data[type] = sharedString;
+}
+
+/** set int value in conf
+ *  Converts the int value that is passed in to a ConfValue and stores it in the conf hash
+ */
+void UFConf::setInt(const string &type, int value)
+{
+    ConfValueBase *existingValue = get(type);
+    if(existingValue != NULL) {
+        delete existingValue;
+    }
+    ConfValue<int> *sharedInt = new ConfValue<int>;
+    sharedInt->mElement = value;
+    _data[type] = sharedInt;
+}
+
+/** set bool value in conf
+ *  Converts the bool value that is passed in to a ConfValue and stores it in the conf hash
+ */
+void UFConf::setBool(const string &type, bool value)
+{
+    ConfValueBase *existingValue = get(type);
+    if(existingValue != NULL) {
+        delete existingValue;
+    }
+    ConfValue<bool> *sharedBool = new ConfValue<bool>;
+    sharedBool->mElement = value;
+    _data[type] = sharedBool;
+}
+
+/** set double value in conf
+ *  Converts double value that is passed in to a ConfValue and stores it in the conf hash
+ */
+void UFConf::setDouble(const string &type, double value)
+{
+    ConfValueBase *existingValue = get(type);
+    if(existingValue != NULL) {
+        delete existingValue;
+    }
+    ConfValue<double> *sharedDouble = new ConfValue<double>;
+    sharedDouble->mElement = value;
+    _data[type] = sharedDouble;
+}
+
+/** Get the string value associated with the key
+ *  Looks at local conf hash for the key
+ *  If key is not found in the local conf, forwards request to parent conf
+ *  If key is not found in either local or parent conf, NULL is returned
+ */
+string *UFConf::getString(const string &key)
+{
+    ConfValue<string> *sharedString = (ConfValue<string> *)get(key);
+    if(sharedString != NULL) {
+        return &sharedString->mElement;
+    }
+    if(_parent == NULL)
+        return NULL;
+    return _parent->getString(key);
+}
+
+/** Get the int value associated with the key
+ *  Looks at local conf hash for the key
+ *  If key is not found in the local conf, forwards request to parent conf
+ *  If key is not found in either local or parent conf, NULL is returned
+ */
+int *UFConf::getInt(const string &key)
+{
+    ConfValue<int> *sharedInt = (ConfValue<int> *)get(key);
+    if(sharedInt != NULL) {
+        return &sharedInt->mElement;
+    }
+    if(_parent == NULL)
+        return NULL;
+    return _parent->getInt(key);
+}
+
+/** Get the bool value associated the with key
+ *  Looks at local conf hash for the key
+ *  If key is not found in the local conf, forwards request to parent conf
+ *  If key is not found in either local or parent conf, NULL is returned
+ */
+bool *UFConf::getBool(const string &key)
+{
+    ConfValue<bool> *sharedBool = (ConfValue<bool> *)get(key);
+    if(sharedBool != NULL) {
+        return &sharedBool->mElement;
+    }
+    if(_parent == NULL)
+        return NULL;
+    return _parent->getBool(key);
+}
+
+/** Get the double value associated with the key
+ *  Looks at local conf hash for the key
+ *  If key is not found in the local conf, forwards request to parent conf
+ *  If key is not found in either local or parent conf, NULL is returned
+ */
+double *UFConf::getDouble(const string &type)
+{
+    ConfValue<double> *sharedDouble = (ConfValue<double> *)get(type);
+    if(sharedDouble != NULL) {
+        return &sharedDouble->mElement;
+    }
+    if(_parent == NULL)
+        return NULL;
+    return _parent->getDouble(type);
+}
+
+/** get value associated with the key that is passed in
+ *  Looks at local conf hash for the key
+ *  If key is not found in the local conf, forwards request to parent conf
+ *  If key is not found in either local or parent conf, NULL is returned
+ */
+ConfValueBase *UFConf::get(const string &key)
+{
+    hash_map<string, ConfValueBase *>::iterator it = _data.find(key);
+    if(it != _data.end())
+        return it->second;
+    if(_parent == NULL)
+        return NULL;
+    return _parent->get(key);
+}
+
+/** Parse config file and store in conf hash
+ *  Looks for config values of type STRING, INT, DOUBLE and BOOL
+ *  Skips over lines beginning with '#'
+ */
+bool UFConf::parse(const std::string &conf_file)
+{
+    ifstream infile;
+    infile.open(conf_file.c_str());
+    if(!infile.is_open()) 
+        return false; // Could not open file
+
+    string line;
+    istringstream instream;
+    while(getline(infile, line))
+    {
+        instream.clear(); // Reset from possible previous errors.
+        instream.str(line);  // Use s as source of input.
+        string conf_key, conf_key_type;
+        if (instream >> conf_key >> conf_key_type) 
+        {
+            // skip lines starting with #
+            if(conf_key[0] == '#')
+                continue;
+
+            // get type from config file, read into corresponding value and store  
+            string string_value;
+            int int_value;
+            double double_value;
+            bool bool_value;
+            if(conf_key_type == "STRING")
+            {
+                if(instream >> string_value)
+                    setString(conf_key, string_value);
+            }
+            if(conf_key_type == "INT")
+            {
+                if(instream >> int_value)
+                    setInt(conf_key, int_value);
+            }
+            if(conf_key_type == "DOUBLE")
+            {
+                if(instream >> double_value)
+                    setDouble(conf_key, double_value);
+            }
+            if(conf_key_type == "BOOL")
+            {
+                if(instream >> bool_value)
+                    setBool(conf_key, bool_value);
+            }
+        }
+    }
+
+    infile.close();
+    return true;
+}
+
+/**
+ *  Dump out config
+ */
+ostream& operator<<(ostream& output, const UFConf &conf)
+{
+
+    for(std::hash_map<std::string, ConfValueBase *>::const_iterator it = conf._data.begin();
+        it != conf._data.end();
+        it++)
+    {
+        cerr << it->first << " ";
+        it->second->dump(cerr);
+        cerr << endl;
+    }
+    return output;
+}
+
+std::hash_map<std::string, UFConf *> UFConfManager::_configs;
+
+/** Add new child conf
+ *  Create new conf and set parent to conf object corresponding to the parent_conf that is passed in
+ */
+UFConf* UFConfManager::addChildConf(const string &conf_file, const string &parent_conf_file)
+{
+    hash_map<string, UFConf*>::iterator it = _configs.find(conf_file);
+    if(it != _configs.end())
+    {
+        // Conf already exists
+        return it->second;
+    }
+
+    // Check if parent config was created
+    it = _configs.find(parent_conf_file);
+    if(it == _configs.end())
+        return NULL; // Parent config was not created
+
+    // Create conf
+    UFConf *conf_created = addConf(conf_file);
+
+    // Set parent conf
+    if(conf_created != NULL)
+        conf_created->setParent(it->second);
+    return conf_created;
+}
+
+/** Add new conf
+ *  Creates new conf, sets parent if 'parent' key is present and stores in the conf in the config system
+ */
+UFConf* UFConfManager::addConf(const string &conf_file)
+{
+    hash_map<string, UFConf*>::iterator it = _configs.find(conf_file);
+    if(it != _configs.end())
+    {
+        // Conf already exists
+        return it->second;
+    }
+
+    // Create new UFConf
+    UFConf *conf = new UFConf;
+
+    // Parse default config
+    string conf_file_default = conf_file + ".default";
+    conf->parse(conf_file_default);
+    
+    // Parse overrides
+    conf->parse(conf_file);
+
+    string *conf_file_parent = conf->getString("parent");
+    if(conf_file_parent != NULL) 
+    {
+        conf->setParent(getConf(*conf_file_parent));
+    }
+
+    // Store in conf map
+    _configs[conf_file] = conf;
+
+    return conf;
+}
+
+/**
+ *  Get config object pointer associated with the conf file that is passed in
+ */
+UFConf* UFConfManager::getConf(const string &conf_file)
+{
+    hash_map<string, UFConf *>::iterator it = _configs.find(conf_file);
+    if(it == _configs.end())
+        return NULL; // config was not created
+    return it->second;
+}
+
+/**
+ *  Print out all configs in the system to cerr
+ */
+void UFConfManager::dump()
+{
+    for(hash_map<string, UFConf *>::iterator it = _configs.begin();
+        it != _configs.end();
+        it++)
+    {
+        cerr << "=============CONF " << it->first << " STARTS" << endl;
+        cerr << *(it->second);
+        cerr << "=============CONF " << it->first << " ENDS" << endl;
+    }
+}

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C Mon Jun 14 15:55:08 2010
@@ -17,145 +17,47 @@
 #include <netdb.h>
 #include <string.h>
 
+#ifdef USE_CARES
+#include "UFAres.H"
+#endif
+
 const unsigned short int PERCENT_LOGGING_SAMPLING = 5;
 
 using namespace std;
 
-UFConnectionIpInfo::UFConnectionIpInfo(const string& ip, bool persistent, int maxSimultaneousConns, int timeOutPerTransaction)
+UFConnIPInfo::UFConnIPInfo(const string& ip, 
+                                       unsigned int port,
+                                       bool persistent, 
+                                       int maxSimultaneousConns, 
+                                       TIME_IN_US connectTimeout,
+                                       TIME_IN_US timeToFailOutIPAfterFailureInSecs)
 {
     _ip = ip;
+    _port = port;
     _persistent = persistent;
     _maxSimultaneousConns = maxSimultaneousConns;
-    if(timeOutPerTransaction > 0)
-        _timeOutPerTransaction = timeOutPerTransaction*1000;
-    else
-        _timeOutPerTransaction = -1;
+    _timeToFailOutIPAfterFailureInSecs = timeToFailOutIPAfterFailureInSecs;
+    _connectTimeout = connectTimeout;
     _timedOut = 0;
     _inProcessCount = 0;
+    _timeToExpireDNSInfo = 0;
 
-    size_t index = _ip.find_last_of(':'); 
-    string ip_to_connect = (index == string::npos ) ? _ip : _ip.substr(0, index);
-    string port = (index == string::npos ) ? "0" : _ip.substr(index+1);
-    
     memset(&_sin, 0, sizeof(_sin));
     _sin.sin_family = AF_INET;
-    _sin.sin_addr.s_addr = inet_addr(ip_to_connect.c_str());
-    _sin.sin_port = htons(atoi(port.c_str()));
-}
-
-static void read_address(const char *str, struct sockaddr_in *sin)
-{
-    char host[128], *p;
-    struct hostent *hp;
-    short port;
-
-    strcpy(host, str);
-    if ((p = strchr(host, ':')) == NULL)
-    {
-        cerr<<"invalid host: "<<host<<endl;
-        exit(1);
-    }
-    *p++ = '\0';
-    port = (short) atoi(p);
-    if (port < 1)
-    {
-
-        cerr<<"invalid port: "<<port<<endl;
-        exit(1);
-    }
-
-    memset(sin, 0, sizeof(struct sockaddr_in));
-    sin->sin_family = AF_INET;
-    sin->sin_port = htons(port);
-    if (host[0] == '\0')
-    {
-        sin->sin_addr.s_addr = INADDR_ANY;
-        return;
-    }
-    sin->sin_addr.s_addr = inet_addr(host);
-    if (sin->sin_addr.s_addr == INADDR_NONE)
-    {
-        /* not dotted-decimal */
-        if ((hp = gethostbyname(host)) == NULL)
-        {
-            cerr<<"cant resolve address "<<host<<endl;
-            exit(1);
-        }
-        memcpy(&sin->sin_addr, hp->h_addr, hp->h_length);
-    }
-}
-
-UFConnectionIpInfo* UFConnectionGroupInfo::removeIP(const string& ip)
-{
-    UFConnectionIpInfoList::iterator beg = _ipInfoList.begin();
-    for(; beg != _ipInfoList.end(); ++beg)
-    {
-        if((*beg)->_ip == ip)
-        {
-            UFConnectionIpInfo* info = *beg;
-            _ipInfoList.erase(beg);
-            return info;
-        }
-    }
-    return NULL;
+    _sin.sin_addr.s_addr = inet_addr(_ip.c_str());
+    _sin.sin_port = htons(_port);
+    _currentlyUsedCount = 0;
+    _lastUsed = 0;
 }
 
-bool UFConnectionGroupInfo::addIP(UFConnectionIpInfo* stIpInfo)
-{
-    if(!stIpInfo)
-    {
-        cerr<<"empty/invalid stIpInfo obj passed in "<<endl;
-        return false;
-    }
-
-    read_address(stIpInfo->_ip.c_str(), &(stIpInfo->_sin));
-    if(stIpInfo->_sin.sin_addr.s_addr == INADDR_ANY)   
-    {
-        cerr<<"couldnt resolve address:port = "<<stIpInfo->_ip<<endl;
-        return false;
-    }
-
-    _ipInfoList.push_back(stIpInfo);
-    return true;
-}
-
-double UFConnectionGroupInfo::getAvailability() const
-{
-    int timed_out_count = 0;
-    int total_count = 0;
-    UFConnectionIpInfoList::const_iterator itr = _ipInfoList.begin();
-    for(;itr != _ipInfoList.end(); ++itr)
-    {
-        if(!(*itr))
-            continue;
-
-        total_count++;
-        if((*itr)->_timedOut)
-        {
-            if ( ((*itr)->_timedOut + UFConnectionPoolImpl::_timeoutIP) < time(0) )
-                timed_out_count++;
-            else
-                (*itr)->_timedOut = 0;
-        }
-    }
-
-    return ((total_count > 0 ) ? ((total_count-timed_out_count)*100)/total_count : 0);
-}
-
-UFConnectionGroupInfo::UFConnectionGroupInfo(const std::string& name)
+UFConnGroupInfo::UFConnGroupInfo(const std::string& name)
 {
     _name = name;
+    _timeToExpireAt = 0;
 }
 
-UFConnectionGroupInfo::~UFConnectionGroupInfo()
-{
-    unsigned int ipInfoListSize = _ipInfoList.size();
-    for(unsigned int i = 0; i < ipInfoListSize; ++i)
-        delete _ipInfoList[i];
-}
 
-time_t UFConnectionPoolImpl::_timeoutIP = DEFAULT_TIMEOUT_OF_IP_ON_FAILURE;
-bool UFConnectionPoolImpl::addGroup(UFConnectionGroupInfo* groupInfo)
+bool UFConnectionPoolImpl::addGroup(UFConnGroupInfo* groupInfo)
 {
     if(!groupInfo)
     {
@@ -163,89 +65,157 @@ bool UFConnectionPoolImpl::addGroup(UFCo
         return false;
     }
 
-    if(!groupInfo->_name.length())
+    if(!groupInfo->getName().length())
     {
         cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"empty group name passed"<<endl;
         return false;
     }
 
-    if(_groupIpMap.find(groupInfo->_name) != _groupIpMap.end())
+    if(_groupIpMap.find(groupInfo->getName()) != _groupIpMap.end())
     {
-        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"group with name "<<groupInfo->_name <<" already exists"<<endl;
+        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"group with name "<<groupInfo->getName() <<" already exists"<<endl;
         return false;
     }
 
-    _groupIpMap[groupInfo->_name] = groupInfo;
+    _groupIpMap[groupInfo->getName()] = groupInfo;
     return true;
 }
 
 //TODO: figure out whether we want to delete the group object on the removeGroup and the destructor fxn calls
-UFConnectionGroupInfo* UFConnectionPoolImpl::removeGroup(const std::string& name)
+void UFConnectionPoolImpl::removeGroup(const std::string& name)
 {
     GroupIPMap::iterator foundItr = _groupIpMap.find(name);
     if(foundItr == _groupIpMap.end())
-        return NULL;
+        return;
 
-    UFConnectionGroupInfo* removedObj = (*foundItr).second;
+    UFConnGroupInfo* removedObj = (*foundItr).second;
+    delete removedObj;
     _groupIpMap.erase(foundItr);
-    return removedObj;
-}
-
-UFIO* UFConnectionPoolImpl::getConnection(const std::string& groupName)
-{
-    return getConnection(groupName, false);
+    return;
 }
 
-UFConnectionGroupInfo* UFConnectionPoolImpl::addGroupImplicit(const std::string& groupName)
+UFConnGroupInfo* UFConnectionPoolImpl::addGroupImplicit(const std::string& groupName)
 {
-    UFConnectionGroupInfo* group = new UFConnectionGroupInfo(groupName);
+    UFConnGroupInfo* group = new UFConnGroupInfo(groupName);
     if(!group)
     {
         cerr<<getpid()<<" "<<time(NULL)<<" couldnt allocate memory to create group obj"<<endl;
         return NULL;
     }
     
-    UFConnectionIpInfo* ip = new UFConnectionIpInfo(groupName,
+    UFConnIPInfo* ip = new UFConnIPInfo(groupName,
                                                     true, 
-                                                    UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST,
-                                                    UFConnectionPool::TIMEOUT_PER_REQUEST);
+                                                    _maxSimulConnsPerHost,
+                                                    0,
+                                                    _timeToTimeoutIPAfterFailure);
     if(!ip)
     {
         cerr<<getpid()<<" "<<time(NULL)<<" couldnt create the ip obj"<<endl;
         delete group;
         return NULL;
     }
-    group->addIP(ip);
+    //group->addIP(ip);
     addGroup(group);
     return group;
 }
 
+bool UFConnectionPoolImpl::createIPInfo(const string& groupName, UFConnGroupInfo* groupInfo)
+{
+    UFConnIPInfoList& ipInfoList = groupInfo->getIpInfoList();
+
+    size_t indexOfColon = groupName.find(':');
+    string hostName = groupName;
+    unsigned int port = 0;
+    if(indexOfColon != string::npos)
+    {
+        hostName = groupName.substr(0, indexOfColon);
+        port = atoi(groupName.substr(indexOfColon+1).c_str());
+    }
+
+    //have to figure out the hosts listed w/ 
+    //
+    int lowestTTL = 0;
+    UFConnIPInfo* ipInfo = 0;
+    string ipString;
+
+#ifdef USE_CARES
+    UFAres ufares;
+    UFHostEnt* ufhe = ufares.GetHostByName(hostName.c_str());
+    unsigned int numIpsFound = 0;
+    if(!ufhe || !(numIpsFound = ufhe->get_nttl()))
+        return false;
+
+    ares_addrttl* results = ufhe->get_aresttl();
+    for(unsigned int i = 0; i < numIpsFound; ++i)
+    {
+        ipString = inet_ntoa(results[i].ipaddr);
+#else
+    struct hostent* h = gethostbyname(hostName.c_str());
+    if(!h)
+        return false;
+
+    for(unsigned int i = 0; 1; i++)
+    {
+        if(!h->h_addr_list[i])
+            break;
+        ipString = inet_ntoa(*((in_addr*)h->h_addr_list[i]));
+#endif
+        //check if the ip already exists in the system
+        IPInfoStore::iterator index = _ipInfoStore.find(ipString);
+        if(index == _ipInfoStore.end())
+        {
+            ipInfo = new UFConnIPInfo(ipString, port, true, 0, 0, 30);
+            _ipInfoStore[ipString] = ipInfo;
+            index = _ipInfoStore.find(ipString);
+        }
+        ipInfo = index->second;
+
+#ifdef USE_CARES
+        if(lowestTTL > results[i].ttl || !lowestTTL)
+            lowestTTL = results[i].ttl;
+#else
+        lowestTTL = 300; //default to 60 secs
+#endif
+        ipInfoList.push_back(ipString);
+    }
+
+    time_t timeToExpireAt = lowestTTL + time(0);
+    if(lowestTTL) 
+        groupInfo->setTimeToExpireAt(timeToExpireAt);
+
+    return true;
+}
+
+//TODO: return a ResultStructure which includes the UFConnIPInfo* along w/ the UFIO* so that the map doesnt have to be looked up on every return back of the structure
 UFIO* UFConnectionPoolImpl::getConnection(const std::string& groupName, bool waitForConnection)
 {
     if(!groupName.length())
-        return NULL;
+        return 0;
 
     GroupIPMap::iterator foundItr = _groupIpMap.find(groupName);
-    UFConnectionGroupInfo* groupInfo = NULL;
+    UFConnGroupInfo* groupInfo = NULL;
     if((foundItr == _groupIpMap.end()) || !((*foundItr).second))
-    {
         groupInfo = addGroupImplicit(groupName);
-        if(!groupInfo) 
-            return NULL;
-    }
     else
-    {
         groupInfo = (*foundItr).second;
-    }
+    if(!groupInfo)
+        return 0;
+
 
-    UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
-    UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
+    UFConnIPInfoList& ipInfoList = groupInfo->getIpInfoList();
+    time_t currTime = time(0);
+    if((groupInfo->getTimeToExpireAt() && (groupInfo->getTimeToExpireAt() < currTime)) || ipInfoList.empty())
+    {
+        if(!ipInfoList.empty()) //clear the existing set since the ttl expired
+            ipInfoList.clear();
 
+        if(!createIPInfo(groupName, groupInfo))
+            return 0;
+    }
 
 
-    UFIO* returnConn = NULL;
     map<unsigned int,unsigned int> alreadySeenIPList; //this list will keep track of the ips that we've already seen
-    unsigned int groupIpSize = groupInfo->_ipInfoList.size();
+    unsigned int groupIpSize = ipInfoList.size();
     while(alreadySeenIPList.size() < groupIpSize) //bail out if we've seen all the ips already
     {
         //1a. first try to find a connection that already might exist - after that we'll try randomly picking an ip
@@ -255,8 +225,8 @@ UFIO* UFConnectionPoolImpl::getConnectio
             if(alreadySeenIPList.find(i) != alreadySeenIPList.end()) //already seen this IP
                 continue;
 
-            UFConnectionIpInfo* ipInfo = groupInfo->_ipInfoList[i]; 
-            if(ipInfo && ipInfo->_currentlyAvailableConnections.size())
+            UFConnIPInfo* ipInfo = getIPInfo(ipInfoList[i]);
+            if(ipInfo && !ipInfo->_currentlyAvailableConnections.empty())
             {
                 elementNum = i;
                 alreadySeenIPList[elementNum] = 1;
@@ -273,85 +243,106 @@ UFIO* UFConnectionPoolImpl::getConnectio
             alreadySeenIPList[elementNum] = 1;
         }
 
-        UFConnectionIpInfo* ipInfo = groupInfo->_ipInfoList[elementNum]; 
-        if(!ipInfo)
-            //TODO: remove this empty ipInfo obj
+        UFConnIPInfo* ipInfo = getIPInfo(ipInfoList[elementNum]);
+        if(!ipInfo) //TODO: remove this empty ipInfo obj
             continue;
 
-        //2. while the host is timedout - pick another one (put into the list of already seen ips)
-        if(ipInfo->_timedOut && ((ipInfo->_timedOut + _timeoutIP) > time(NULL)) )
-            continue;
-        ipInfo->_timedOut = 0;
+        UFIO* conn = ipInfo->getConnection(_ufConnIPInfoMap);
+        if(conn)
+            return conn;
+    }
+    
+    return NULL;
+}
+
+UFConnIPInfo* UFConnectionPoolImpl::getIPInfo(const std::string& name)
+{
+    IPInfoStore::iterator index = _ipInfoStore.find(name);
+    return ((index != _ipInfoStore.end()) ? index->second : 0);
+}
 
+UFIO* UFConnIPInfo::getConnection(UFConnIPInfoMap& _ufConnIPInfoMap)
+{
+    //2. while the host is timedout - pick another one (put into the list of already seen ips)
+    time_t currTime = time(0);
+    _lastUsed = currTime;
+    if(getTimedOut() && ((unsigned int)(getTimedOut() + getTimeToFailOutIPAfterFailureInSecs()) > (unsigned int) currTime) )
+        return 0;
+    setTimedOut(0);
 
-        while(1) {
-            if(ipInfo->_currentlyAvailableConnections.size()) {
-                //3. pick a connection from the currently available conns
-
-                UFIOIntMap::iterator beg = ipInfo->_currentlyAvailableConnections.begin();
-                for(; beg != ipInfo->_currentlyAvailableConnections.end(); 
-                    beg = ipInfo->_currentlyAvailableConnections.begin()  // we're resetting to the beginning to avoid
-                        // the case of two threads ending up getting 
-                        // the same connection
-                    )
+    UFIO* returnConn = NULL;
+    while(1) 
+    {
+        if(!_currentlyAvailableConnections.empty())
+        {
+            //3. pick a connection from the currently available conns
+            UFIOIntMap::iterator beg = _currentlyAvailableConnections.begin();
+            for(; beg != _currentlyAvailableConnections.end(); 
+                beg = _currentlyAvailableConnections.begin()  // we're resetting to the beginning to avoid
+                    // the case of two threads ending up getting 
+                    // the same connection
+                )
+            {
+                returnConn = beg->first;
+                _currentlyAvailableConnections.erase(beg);
+                if(!returnConn)
                 {
-                    returnConn = NULL;
-                    if(!((*beg).first))
-                    {
-                        ipInfo->_currentlyAvailableConnections.erase(beg);
-                        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
-                        continue;
-                    }
-                    returnConn = (*beg).first;
-                    //take the found connection away from the curentlyAvaliableConnections list
-                    //since validConnection now actually checks to see the content thats within the channel to verify the validity of the connection
-                    //it may be that the thread gets switched out and some other thread comes into this section
-                    ipInfo->_currentlyAvailableConnections.erase(beg);
-                    ipInfo->_currentlyUsedConnections[returnConn] = time(NULL);
-                    return returnConn;
+                    cerr<<time(0)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
+                    continue;
                 }
-            }
-            else {
-                // if _maxSimultaneousConns is hit, wait for a connection to become available
-                if(ipInfo->_maxSimultaneousConns && 
-                   (ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns)) {
-                    // wait for a connection to be released
-                    ipInfo->_someConnectionAvailable.lock(this_user_fiber);
-                    ipInfo->_someConnectionAvailable.condWait(this_user_fiber);
-                    ipInfo->_someConnectionAvailable.unlock(this_user_fiber);
+
+                if(returnConn->_markedActive) //this indicates that the conn. had some activity while sleeping - thats no good
+                {
+                    UFConnIPInfoMap::iterator index = _ufConnIPInfoMap.find(returnConn);
+                    _ufConnIPInfoMap.erase(index);
+                    delete returnConn;
                     continue;
                 }
-                else {
-                    // Create new connection
-                    ipInfo->_inProcessCount++;
-                    returnConn = createConnection(ipInfo);
-                    ipInfo->_inProcessCount--;
-                    if(returnConn)
-                    {
-                        time_t currTime = time(NULL);
-                        ipInfo->_currentlyUsedConnections[returnConn] = currTime;
-                        _UFConnectionIpInfoMap[returnConn] = make_pair(ipInfo, make_pair(true, currTime));
-                    }
-                    else
-                    {
-                        if((random() % 100) < PERCENT_LOGGING_SAMPLING)
-                            cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt create a connection to "<<ipInfo->_ip<<" "<<strerror(errno)<<endl;
-                    }
-                    
-                    return returnConn;
+                returnConn->_active = true;
+                _currentlyUsedCount++;
+                return returnConn;
+            }
+        }
+        else 
+        {
+            // if _maxSimultaneousConns is hit, wait for a connection to become available
+            if(getMaxSimultaneousConns() && 
+               (_currentlyUsedCount + getInProcessCount() >= (unsigned int) getMaxSimultaneousConns())) 
+            {
+                // wait for a connection to be released
+                UF* this_user_fiber = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+                getMutexToCheckSomeConnection()->lock(this_user_fiber);
+                getMutexToCheckSomeConnection()->condWait(this_user_fiber);
+                getMutexToCheckSomeConnection()->unlock(this_user_fiber);
+                continue;
+            }
+            else 
+            {
+                // Create a new connection
+                incInProcessCount(1);
+                returnConn = createConnection();
+                incInProcessCount(-1);
+                if(returnConn)
+                {
+                    _currentlyUsedCount++;
+                    _ufConnIPInfoMap[returnConn] = this;
+                }
+                else
+                {
+                    if((random() % 100) < PERCENT_LOGGING_SAMPLING)
+                        cerr<<time(0)<<" "<<__LINE__<<" "<<"couldnt create a connection to "<<getIP()<<" "<<strerror(errno)<<endl;
                 }
+                
+                return returnConn;
             }
         }
     }
-    
-    return NULL;
+
+    return 0;
 }
 
-UFIO* UFConnectionPoolImpl::createConnection(UFConnectionIpInfo* ipInfo)
+UFIO* UFConnIPInfo::createConnection()
 {
-    if(!ipInfo)
-        return NULL;
-    
     UFIO* ufio = new UFIO(UFScheduler::getUF());
     if(!ufio)
     {
@@ -359,40 +350,73 @@ UFIO* UFConnectionPoolImpl::createConnec
         return NULL;
     }
 
-    int rc = ufio->connect((struct sockaddr *) &ipInfo->_sin, sizeof(ipInfo->_sin), 1600000);
+    int rc = ufio->connect((struct sockaddr*) &_sin, sizeof(_sin), getConnectTimeout());
+    if(rc)
+        return ufio;
 
-    if(rc != 0) {
-        delete ufio;
-        return NULL;
-    }
-    return ufio;
+    cerr<<"couldnt connect to "<<getIP()<<" due to "<<strerror(ufio->getErrno())<<endl;
+    delete ufio;
+    return NULL;
 }
 
-//This fxn helps remove conns. that may have been invalidated while being in the waiting to be used state
-const unsigned int LAST_USED_TIME_DIFF = 300;
-void UFConnectionPoolImpl::clearBadConnections()
+const unsigned int DEFAULT_LAST_USED_TIME_INTERVAL_FOR_IP = 600;
+//This fxn helps remove conns. that havent been used for a while
+void UFConnectionPoolImpl::clearUnusedConnections(TIME_IN_US lastUsedTimeDiff, unsigned long long int coverListTime)
 {
-    time_t currTime = time(NULL);
-    UFConnectionIpInfoMap::iterator beg = _UFConnectionIpInfoMap.begin();
-    for(; beg != _UFConnectionIpInfoMap.end(); )
+    if(!lastUsedTimeDiff || _ipInfoStore.empty())
+        return;
+
+    unsigned long long int sleepBetweenListElements = coverListTime / _ipInfoStore.size();
+    if(sleepBetweenListElements < 1000) //atleast wait 1ms between elements in the list
+        sleepBetweenListElements = 1000;
+
+    UF* this_uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    time_t currTime = time(0);
+    IPInfoStore::iterator beg = _ipInfoStore.begin();
+    //walk all the ipinfo structures and then walk their available conns.
+    for(; beg != _ipInfoStore.end(); )
     {
-        UFConnectionIpInfo* ip = beg->second.first;
-        bool currentlyUsed = beg->second.second.first;
-        if(currentlyUsed) //we dont remove conns that are in use
+        this_uf->usleep(sleepBetweenListElements);
+
+        UFConnIPInfo* ipInfo = beg->second;
+        if(!ipInfo)
         {
-            ++beg;
+            _ipInfoStore.erase(beg++);
             continue;
         }
 
-        time_t lastUsed = beg->second.second.second;
-        UFIO* ufio = beg->first;
-        if(!ip || 
-           !ufio || 
-           ((lastUsed + LAST_USED_TIME_DIFF) < (unsigned int) currTime))
+        //walk the available connection list to see if any conn. hasnt been used for a while
+        UFIOIntMap::iterator conBeg = ipInfo->_currentlyAvailableConnections.begin();
+        for(; conBeg != ipInfo->_currentlyAvailableConnections.end(); )
         {
-            ++beg;
-            releaseConnection(ufio, false);
-            continue;
+            UFIO* conn = conBeg->first;
+            if(!conn) //TODO
+            {
+                ipInfo->_currentlyAvailableConnections.erase(conBeg++);
+                continue;
+            }
+
+            if((int)(conBeg->second + lastUsedTimeDiff) < (int)currTime /*the conn. has expired*/ ||
+               (conBeg->first->_markedActive)/*somehow this conn. had some update*/)
+            {
+                //remove the conn from the _UFConnIPInfoMap list
+                UFConnIPInfoMap::iterator index = _ufConnIPInfoMap.find(conBeg->first);
+                if(index != _ufConnIPInfoMap.end())
+                    _ufConnIPInfoMap.erase(index);
+                delete conBeg->first; //delete the connection
+                ipInfo->_currentlyAvailableConnections.erase(conBeg++);
+                continue;
+            }
+            ++conBeg;
+        }
+        if(ipInfo->_currentlyAvailableConnections.empty())
+        {
+            //check the last time this ipinfo was ever used - if its > than 300s remove it
+            if(ipInfo->getLastUsed() + DEFAULT_LAST_USED_TIME_INTERVAL_FOR_IP < (unsigned int) currTime)
+            {
+                _ipInfoStore.erase(beg++);
+                continue;
+            }
         }
         ++beg;
     }
@@ -404,150 +428,46 @@ void UFConnectionPoolImpl::releaseConnec
         return;
 
     //find the ipinfo associated w/ this connection
-    UFConnectionIpInfoMap::iterator ufIOIpInfoLocItr = _UFConnectionIpInfoMap.find(ufIO);
-    if((ufIOIpInfoLocItr == _UFConnectionIpInfoMap.end()) || !(*ufIOIpInfoLocItr).second.first)
+    UFConnIPInfoMap::iterator ufIOIpInfoLocItr = _ufConnIPInfoMap.find(ufIO);
+    if((ufIOIpInfoLocItr == _ufConnIPInfoMap.end()) || !ufIOIpInfoLocItr->second)
     {
         cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt find the associated ipinfo object or the object was empty - not good"<<endl;
-        if(ufIOIpInfoLocItr != _UFConnectionIpInfoMap.end())
-            _UFConnectionIpInfoMap.erase(ufIOIpInfoLocItr);
-        ufIO = NULL;
-        return;
-    }
-
-    UFConnectionIpInfo* ipInfo = (*ufIOIpInfoLocItr).second.first;
-    //remove the conn from the ipInfo->_currentlyUsedConnections list 
-    UFIOIntMap::iterator currUsedConnItr = ipInfo->_currentlyUsedConnections.find(ufIO);
-    if(currUsedConnItr != ipInfo->_currentlyUsedConnections.end())
-        ipInfo->_currentlyUsedConnections.erase(currUsedConnItr);
-    else
-    {
-        //see if the conn is in the available connection list
-        currUsedConnItr = ipInfo->_currentlyAvailableConnections.find(ufIO);
-        if(currUsedConnItr != ipInfo->_currentlyAvailableConnections.end())
-            ipInfo->_currentlyAvailableConnections.erase(currUsedConnItr);
-        else
-            cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt find the release connection in either the used or available list - not good"<<endl;
-
+        if(ufIOIpInfoLocItr != _ufConnIPInfoMap.end())
+            _ufConnIPInfoMap.erase(ufIOIpInfoLocItr);
         delete ufIO;
-        ufIO = NULL;
-        _UFConnectionIpInfoMap.erase(ufIOIpInfoLocItr);
         return;
     }
 
+    UFConnIPInfo* ipInfo = ufIOIpInfoLocItr->second;
+    ipInfo->_currentlyUsedCount--;
 
-    if(connOk && ipInfo->_persistent)
+    //add to the available list
+    if(connOk && ipInfo->getPersistent())
     {
-        time_t currTime = time(NULL);
-        (*ufIOIpInfoLocItr).second.second.first = false;
-        (*ufIOIpInfoLocItr).second.second.second = currTime;
+        time_t currTime = time(0);
         ipInfo->_currentlyAvailableConnections[ufIO] = currTime;
+        ufIO->_markedActive = false;
+        ufIO->_active = false;
     }
     else
     {
+        _ufConnIPInfoMap.erase(ufIOIpInfoLocItr);
         delete ufIO;
-        ufIO = NULL;
-        _UFConnectionIpInfoMap.erase(ufIOIpInfoLocItr);
-    }
-
-    //signal to all the first waiting threads that there might be a connection available
-    UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
-    UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
-                        
-    ipInfo->_someConnectionAvailable.lock(this_user_fiber);
-    ipInfo->_someConnectionAvailable.signal();
-    ipInfo->_someConnectionAvailable.unlock(this_user_fiber);
-}
-
-const unsigned int PRINT_BUFFER_LENGTH = 256*1024;
-string UFConnectionPoolImpl::fillInfo(string& data, bool detailed) const
-{
-    char* printBuffer = new char[256*1024];
-    if(!printBuffer)
-        return data;
-
-    if(!detailed)
-        snprintf(printBuffer, 1024, "ConnectionInfo:\n%15s%10s%10s\n", "GroupName", "IpCount", "IP Avail.");
-    else
-        snprintf(printBuffer, 1024,  
-            "ConnectionInfo:\n%15s%10s%10s%35s%10s%10s%10s%10s%10s\n", 
-            "GroupName", 
-            "IpCount", 
-            "IP Avail.", 
-            "IP name", 
-            "isPersis", 
-            "TimedOut", 
-            "#Run", 
-            "#InProc.", 
-            "#Avail.");
-
-
-    //1. list out the current groups 
-    GroupIPMap::const_iterator groupIpMapItr = _groupIpMap.begin();
-    for(; groupIpMapItr != _groupIpMap.end() ; ++groupIpMapItr)
-    {
-        UFConnectionGroupInfo* tmpGroup = (*groupIpMapItr).second;
-        if(!tmpGroup)
-            continue;
-
-        unsigned int amtCopied = strlen(printBuffer);
-        if((amtCopied + 1024) >= PRINT_BUFFER_LENGTH) //we can't add anymore
-            break;
-
-        snprintf(printBuffer+strlen(printBuffer), 
-            1024, 
-            "%15s%10d%10d\n", 
-            (*groupIpMapItr).first.c_str(), 
-            (int) tmpGroup->_ipInfoList.size(), 
-            (int) tmpGroup->getAvailability());
-
-
-        if(detailed)
-        {
-            UFConnectionIpInfoList::const_iterator _ipInfoListItr = tmpGroup->_ipInfoList.begin();
-            for(; _ipInfoListItr != tmpGroup->_ipInfoList.end(); ++_ipInfoListItr)
-            {
-                UFConnectionIpInfo* ipInfo = (*_ipInfoListItr);
-                if(!ipInfo)
-                    continue;
-
-                snprintf(printBuffer+strlen(printBuffer), 
-                         1024, 
-                         "%70s%10d%10d%10d%10d%10d\n", 
-                         ipInfo->_ip.c_str(), 
-                         (int) ipInfo->_persistent, 
-                         (int) ipInfo->_timedOut, 
-                         (int) ipInfo->_currentlyUsedConnections.size(), 
-                         (int) ipInfo->_inProcessCount, 
-                         (int) ipInfo->_currentlyAvailableConnections.size());
-            }
-        }
+        return;
     }
 
-    data.append(printBuffer);
-    delete printBuffer;
-    return data;
-}
-
-double UFConnectionPoolImpl::getGroupAvailability(const std::string& name) const
-{
-    double result = 0;
-    if(!name.length())
-        return result;
-
-    GroupIPMap::const_iterator foundItr = _groupIpMap.find(name);
-    if((foundItr == _groupIpMap.end()) || !((*foundItr).second))
-        return result;
-
-    return (*foundItr).second->getAvailability();
+    //signal to all the waiting threads that there might be a connection available
+    UF* this_user_fiber = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    ipInfo->getMutexToCheckSomeConnection()->lock(this_user_fiber);
+    ipInfo->getMutexToCheckSomeConnection()->broadcast();
+    ipInfo->getMutexToCheckSomeConnection()->unlock(this_user_fiber);
 }
 
-UFConnectionPoolImpl::~UFConnectionPoolImpl()
+UFConnectionPoolImpl::~UFConnectionPoolImpl() 
 { 
-    for(GroupIPMap::iterator beg = _groupIpMap.begin(); beg != _groupIpMap.end(); ++beg)
-        delete beg->second;
-    _groupIpMap.clear();
 }
 
+const unsigned int DEFAULT_COVER_LIST_TIME_IN_US = 60*1000*1000;
 void UFConnectionPoolCleaner::run()
 {
     UF* this_uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
@@ -562,14 +482,21 @@ void UFConnectionPoolCleaner::run()
 
     while(1)
     {
-        this_uf->usleep(300*1000*1000);
-        ufcp->clearBadConnections();
+        this_uf->usleep(1000000);
+        ufcp->clearUnusedConnections(300*1000*1000, DEFAULT_COVER_LIST_TIME_IN_US);
     }
 }
 
+const int MAX_SIMUL_CONNS_PER_HOST = 0;
+const int DEFAULT_TIMEOUT_IN_SEC_ON_FAILURE = 10;
 UFConnectionPoolImpl::UFConnectionPoolImpl()
 {
+    _maxSimulConnsPerHost = MAX_SIMUL_CONNS_PER_HOST;
+    _timeToTimeoutIPAfterFailure = DEFAULT_TIMEOUT_IN_SEC_ON_FAILURE;
+}
 
+UFConnGroupInfo::~UFConnGroupInfo() 
+{
 }
 
 void UFConnectionPoolImpl::init()
@@ -582,182 +509,56 @@ void UFConnectionPoolImpl::init()
     }
 }
 
-int UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST = 0;
-int UFConnectionPool::TIMEOUT_PER_REQUEST = 10;
-
 UFConnectionPool::UFConnectionPool() 
 { 
-    impl = new UFConnectionPoolImpl(); 
+    _impl = new UFConnectionPoolImpl(); 
+    if(_impl)
+        _impl->init();
 }
 
-void UFConnectionPool::init()
+bool UFConnectionPool::addGroup(UFConnGroupInfo* groupInfo)
 {
-    UFConnectionPoolImpl::init();
-}
-
-UFConnectionGroupInfo* UFConnectionPool::removeGroup(const string& name)
-{
-    if(!impl)
-        return NULL;
-    return impl->removeGroup(name);
-}
-
-bool UFConnectionPool::addGroup(UFConnectionGroupInfo* stGroupInfo)
-{
-    if(!impl)
-        return false;
-    return impl->addGroup(stGroupInfo);
+    return (_impl ? _impl->addGroup(groupInfo) : false);
 }
 
 UFIO* UFConnectionPool::getConnection(const std::string& groupName, bool waitForConnection)
 {
-    if(!impl)
-        return NULL;
-    return impl->getConnection(groupName, waitForConnection);
-}
-
-UFIO* UFConnectionPool::getConnection(const string& groupName)
-{
-    if(!impl)
-        return NULL;
-    return impl->getConnection(groupName);
+    return (_impl ? _impl->getConnection(groupName, waitForConnection) : 0);
 }
 
 void UFConnectionPool::releaseConnection(UFIO* ufIO, bool connOk)
 {
-    if(!impl)
-        return;
-    return impl->releaseConnection(ufIO, connOk);
+    if(_impl && ufIO)
+        return _impl->releaseConnection(ufIO, connOk);
+    else if(ufIO)
+        delete ufIO;
 }
 
-void UFConnectionPool::setTimeoutIP(int timeout) 
+void UFConnectionPool::setTimeToTimeoutIPAfterFailure(TIME_IN_US timeout) 
 { 
-    if(!impl)
-        return;
-    if(timeout > -1)
-        impl->_timeoutIP = timeout; 
-    else
-        impl->_timeoutIP = 60;
-}
-
-string UFConnectionPool::fillInfo(string& data, bool detailed) const
-{
-    if(!impl)
-        return string("");
-    return impl->fillInfo(data, detailed);
-}
-
-double UFConnectionPool::getGroupAvailability(const std::string& name) const
-{
-    if(!impl)
-        return 0;
-    return impl->getGroupAvailability(name);
+    if(_impl)
+        return _impl->setTimeToTimeoutIPAfterFailure(timeout);
 }
 
-void UFConnectionPool::clearBadConnections()
+void UFConnectionPool::clearUnusedConnections(TIME_IN_US lastUsedTimeDiff, unsigned long long int coverListTime)
 {
-    if(!impl)
-        return;
-    return impl->clearBadConnections();
+    if(_impl)
+        return _impl->clearUnusedConnections(lastUsedTimeDiff, coverListTime);
 }
 
-string StringUtil::trim_ws(const string& input)
-{
-    if(!input.length())
-        return input;
-
-    size_t beg_position = input.find_first_not_of(" \n\r\t\r");
-    size_t end_position = input.find_last_not_of(" \n\t\r");
-
-    if(beg_position == string::npos)
-        beg_position = 0;
-    if(end_position == string::npos)
-        end_position = input.length();
-
-    return (input.substr(beg_position, (end_position-beg_position+1)));
-}
 
-unsigned int StringUtil::split(const string& input, const string& splitOn, StringVector& output)
+TIME_IN_US UFConnectionPool::getTimeToTimeoutIPAfterFailure()
 {
-    unsigned int copyStringBegin = 0;
-    output.clear();
-
-    while(copyStringBegin < input.length())
-    {
-        string::size_type findLoc = input.find(splitOn, copyStringBegin);
-        if(copyStringBegin != findLoc)      
-        {
-            string subStr = input.substr(copyStringBegin, (findLoc == string::npos) ? input.length()-copyStringBegin : findLoc - copyStringBegin);
-            if(subStr.length())
-                output.push_back(subStr);
-            if(findLoc == string::npos)
-                break;
-            copyStringBegin += subStr.length();
-        }
-        else
-            copyStringBegin += splitOn.length();
-    }
-
-    return output.size();
+    return (_impl ? _impl->getTimeToTimeoutIPAfterFailure() : 0);
 }
 
-unsigned int UFConnectionPool::loadConfigFile(const string& fileName)
+void UFConnectionPool::setMaxSimulConnsPerHost(int input)
 {
-    return loadConfigFile(fileName, -1);
+    if(_impl)
+        _impl->setMaxSimulConnsPerHost(input);
 }
 
-unsigned int UFConnectionPool::loadConfigFile(const string& fileName, int maxSimultaneousConns)
+int UFConnectionPool::getMaxSimulConnsPerHost()
 {
-    ifstream infile(fileName.c_str());
-    if(!infile)
-        return false;
-
-    int num_groups_added = 0;
-    string line;
-    while(getline(infile, line))
-    {
-        if(line.find('#') != string::npos) //bail if we see # in the line
-            continue;
-
-        //split on ':'
-        StringUtil::StringVector compVec;
-        int numFound = StringUtil::split(line, ":", compVec);
-        if(numFound < 3)
-            continue;
-        string farmId = StringUtil::trim_ws(compVec[0]);
-        string timeOut = StringUtil::trim_ws(compVec[1]);
-        string ipList = StringUtil::trim_ws(compVec[2]);
-
-        if(!farmId.length() || !ipList.length() || (!timeOut.length()) || (timeOut == "*"))
-            continue;
-
-        StringUtil::StringVector ipVec;
-        numFound = StringUtil::split(ipList, ",", ipVec);
-        if(!numFound)
-        {
-            cerr<<"couldnt add "<<farmId<<" because the number of ips found was 0"<<endl;
-            continue;
-        }
-
-        //create the group
-        UFConnectionGroupInfo* tmpGroupInfo = new UFConnectionGroupInfo(farmId);
-        //create the ips and add them to the group
-        for(unsigned int i = 0; i < ipVec.size(); i++)
-        {
-            string ip = StringUtil::trim_ws(ipVec[i]);
-            ip.append(":1971");
-            UFConnectionIpInfo* tmpIpInfo = new UFConnectionIpInfo(ip, ((timeOut == "-1") ? true : false), maxSimultaneousConns);
-
-            tmpGroupInfo->addIP(tmpIpInfo);
-        }
-
-        //add the group to the connection pool
-        if(!addGroup(tmpGroupInfo))
-            cerr<<"couldnt add group "<<farmId<<" to the connection pool"<<endl;
-
-        num_groups_added++;
-    }
-
-    return num_groups_added;
+    return (_impl ? _impl->getMaxSimulConnsPerHost() : 0);
 }
-



Mime
View raw message