trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aku...@apache.org
Subject svn commit: r985334 [1/3] - in /trafficserver/traffic/branches/UserFiber: core/include/ core/src/ samples/
Date Fri, 13 Aug 2010 19:43:10 GMT
Author: akundu
Date: Fri Aug 13 19:43:09 2010
New Revision: 985334

URL: http://svn.apache.org/viewvc?rev=985334&view=rev
Log:
Steve Jiang
    - UFIO
        - support for readline
        - change TIME_IN_US to default to -1 and be of type ssize_t
        - update IO fxns to use TIME_IN_US

Bryan Call    
    - add UFSwapContext.H - expose the internal swap context fxn
    - add setter+getter fxns in UFIO + UF

Raghav Jeyaraman
    - UFStats
        - support for current connections count
    - UFConf
        - take the value as an input type into templated getValue fxns
    - UFServer
        - per thread initializer support

Anirban Kundu
    - UFHTTPLoader
        - increased default options to support exiting early
        - put an interval between the threads starting to do work
        - use new timeout mode of -1 meaning no timeout
    - UF.H
        - use a deque to track the lock instead of a list
        - support UF objects being created and destroyed using a factory
        - ability to set the stack size for future created UFs
        - run all active (and newly created) UFs first before sleeping
    - UFPC.H
        - support for joinable and non-joinable producers/consumers
        - support to not use a lock in managing the producer and consumer
    - UFIO.H
        - support for writev (similar to state threads writev implementation)
    - UFConnectionPool
        - support to remove unused connections after a little while

Manjesh Nilange
        - support for listening to multiple ports from the same server


Added:
    trafficserver/traffic/branches/UserFiber/core/include/UFSwapContext.H   (props changed)
      - copied unchanged from r985331, trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
Removed:
    trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
Modified:
    trafficserver/traffic/branches/UserFiber/core/include/UF.H
    trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
    trafficserver/traffic/branches/UserFiber/core/include/UFConf.H
    trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H
    trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
    trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
    trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
    trafficserver/traffic/branches/UserFiber/core/include/UFServer.H
    trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H
    trafficserver/traffic/branches/UserFiber/core/include/UFStats.H
    trafficserver/traffic/branches/UserFiber/core/src/Makefile
    trafficserver/traffic/branches/UserFiber/core/src/UF.C
    trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConf.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
    trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
    trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
    trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
    trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
    trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
    trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C

Modified: trafficserver/traffic/branches/UserFiber/core/include/UF.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UF.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UF.H Fri Aug 13 19:43:09 2010
@@ -2,7 +2,6 @@
 #define USERTHREADS_H
 
 #include <sys/time.h>
-
 #include <iostream>
 #include <map>
 #include <stdint.h>
@@ -10,12 +9,16 @@
 #include <set>
 #include <vector>
 #include <list>
+#include <deque>
 #include <ucontext.h>
 #include <pthread.h>
 #include <errno.h>
 
-using namespace std;
+#include <UFSwapContext.H>
+//#include <ufutil/Factory.H>
+
 namespace std { using namespace __gnu_cxx; }
+std::string getPrintableTime();
 
 enum UFStatus
 {
@@ -23,17 +26,20 @@ enum UFStatus
     WAITING_TO_RUN      = 1,
     BLOCKED             = 2,
     RUNNING             = 3,
-    COMPLETED           = 4
+    COMPLETED           = 4,
+    YIELDED             = 5
 };
 
-typedef unsigned long long int TIME_IN_US;
+typedef long long int TIME_IN_US;
 
 //create the type of UF you'd like to pass into the accept handler
 typedef unsigned long long int UFId;
 struct UFScheduler;
-struct UFMutex;
-struct UF
+
+struct UFFact;
+class UF
 {
+public:
     friend class UFScheduler;
     friend class UFMutex;
 
@@ -52,34 +58,72 @@ struct UF
     //otherwise behavior is unexpected
     void                 yield();
     ///must be called after the fiber is added to a scheduler
-    void                 usleep(unsigned long long int sleepAmtInUs);
-    static void          gusleep(unsigned long long int sleepAmtInUs);
+    void                 usleep(TIME_IN_US sleepAmtInUs);
+    static void          gusleep(TIME_IN_US sleepAmtInUs);
     ///simply block the fiber
     void                 block();
     UFStatus             getStatus() const;
+    unsigned long long int getLastRun() const;
 
 
     UFStatus             _status;
     void*                _startingArgs;
+    static unsigned int  DEFAULT_STACK_SIZE;
+    UFFact*              _myFactory;
+    void reset();
 
-protected:
+private:
     static UFId          _globalId;
-    UFId                 _myId;
     UFScheduler*         _parentScheduler;
+    UFId                 _myId;
     ucontext_t           _UFContext;
     bool                 _UFObjectCreatedStack;
+    unsigned long long int _lastRun;
 
-private:
     void waitOnLock();
 };
+inline void UF::reset() 
+{
+    _startingArgs = 0;
+    _myFactory = 0; 
+    _parentScheduler = 0; 
+    _lastRun = 0; 
+    _status = NOT_STARTED; 
+}
 inline UFStatus UF::getStatus() const { return _status; }
+inline unsigned long long int UF::getLastRun() const { return _lastRun; }
+
+class UFFact
+{
+public:
+    virtual ~UFFact() {}
+    virtual UF* getUF();
+    virtual void releaseUF(UF* uf);
+
+protected:
+    virtual UF* createUF() = 0;
+    virtual void destroyUF(UF* uf) = 0;
+};
+inline UF* UFFact::getUF()
+{
+    UF* uf = createUF();
+    if(!uf)
+        return 0;
+    uf->reset();
+    uf->_myFactory = this;
+    return uf;
+}
+inline void UFFact::releaseUF(UF* uf)
+{
+    destroyUF(uf);
+}
 
 struct UFFactory
 {
     static UFFactory* getInstance();
     UFFactory();
 
-    UF* selectUF(unsigned int location);;
+    UF* selectUF(unsigned int location);
     int registerFunc(UF* uf);
 
 protected:
@@ -92,9 +136,9 @@ inline UFFactory* UFFactory::getInstance
 inline UF* UFFactory::selectUF(unsigned int location) { return _objMapping[location]; }
 
 struct UFWaitInfo;
-typedef std::list<UFWaitInfo*> UFWaitList;
-typedef map<UF*, UFWaitInfo*> UFWLHash;
-typedef std::list<UF*>          UFList;
+typedef std::map<UF*, UFWaitInfo*>  UFWLHash;
+typedef std::list<UF*>              UFList;
+typedef std::deque<UF*>             UFDeque;
 struct UFMutex
 {
     UFMutex() 
@@ -106,7 +150,7 @@ struct UFMutex
 
     bool lock(UF* uf);
     bool unlock(UF* uf);
-    bool tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS = 0);
+    bool tryLock(UF* uf, TIME_IN_US autoRetryIntervalInUS = 0);
 
     //THE CALLER MUST get the lock before calling this fxn
     //THE CALLER MUST release the lock after this fxn is called
@@ -119,7 +163,7 @@ struct UFMutex
     void signal();
     //THE CALLER MUST get the lock before calling this fxn
     //THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
-    int condTimedWait(UF *uf, unsigned long long int sleepAmtInUs);
+    bool condTimedWait(UF *uf, TIME_IN_US sleepAmtInUs);
 
     void releaseSpinLock(bool spinCPU = false);
     void getSpinLock(bool spinCPU = false);
@@ -127,7 +171,6 @@ struct UFMutex
 protected:
     int                 _lockActive;
     UFList              _listOfClientsWaitingOnLock;
-    //UFWaitList          _listOfClientsWaitingOnCond;
     UFWLHash            _listOfClientsWaitingOnCond;
     bool                _lockCurrentlyOwned;
     UF*                 _mustRunUF;
@@ -144,10 +187,10 @@ struct UFWaitInfo
 inline void UFWaitInfo::reset() { _uf = 0; _sleeping = false; _waiting = false; }
 
 
-typedef std::multimap<unsigned long long int, UFWaitInfo*> MapTimeUF;
+typedef std::multimap<TIME_IN_US, UFWaitInfo*> MapTimeUF;
 //typedef std::map<pthread_t,UFScheduler*> ThreadUFSchedulerMap;
 //per thread scheduler
-typedef hash_map<pthread_t, UFScheduler*, hash<uintptr_t> > ThreadUFSchedulerMap;
+typedef std::hash_map<pthread_t, UFScheduler*, std::hash<uintptr_t> > ThreadUFSchedulerMap;
 
 struct UFScheduler
 {
@@ -164,7 +207,7 @@ struct UFScheduler
     bool addFiberToScheduler(UF* uf,      /* the UF to add */
                              pthread_t tid = 0); /* the thread to add the UF to */
     //add the fxn to add multiple ufs in one shot (if they're on one tid)
-    bool addFiberToScheduler(const std::list<UF*>& ufList, 
+    bool addFiberToScheduler(const UFList& ufList, 
                               pthread_t tid = 0);
 
 
@@ -181,10 +224,10 @@ struct UFScheduler
     static bool                  _inThreadedMode;
 
     UF* getRunningFiberOnThisThread();
-    const ucontext_t& getMainContext() const;
+    ucontext_t* getMainContext();
     void setSpecific(void* args);
     void* getSpecific() const;
-    unsigned long long int getAmtToSleep() const;
+    TIME_IN_US getAmtToSleep() const;
     static void setExit(bool exit = true);
     bool shouldExit() const;
     void setExitJustMe(bool exit = true);
@@ -203,29 +246,31 @@ struct UFScheduler
     //to allow to identify the thread running now
     static pthread_key_t        _specific_key;
 
-    static void ufCreateThread(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+    static void ufCreateThread(pthread_t* tid, UFList* ufsToStartWith);
 
     static bool                 _exit;
     bool                        _exitJustMe;
+    unsigned long long int      getRunCounter() const;
+
 protected:
+    unsigned long long int      _runCounter;
     UF*                         _currentFiber;
     ucontext_t                  _mainContext;
 
     //no lock for active running list - cause only the running
     //thread can add to it
-    UFList                      _activeRunningList;
-    size_t                      _activeRunningListSize;
+    UFDeque                     _activeRunningList;
 
     //nominate to add to a thread's running list
-    UFList                      _nominateToAddToActiveRunningList;
+    UFDeque                     _nominateToAddToActiveRunningList;
     pthread_mutex_t             _mutexToNominateToActiveList;
     pthread_cond_t              _condToNominateToActiveList;
     
     //the sleep tree
     MapTimeUF                   _sleepList;
-    unsigned long long int      _earliestWakeUpFromSleep;
+    TIME_IN_US                  _earliestWakeUpFromSleep;
     //store the shortest sleep interval
-    unsigned long long int      _amtToSleep;
+    TIME_IN_US                  _amtToSleep;
 
 
     //store thread specific content
@@ -234,20 +279,18 @@ protected:
 
     void notifyUF();
     
-    list<UFWaitInfo*>  _availableWaitInfo;
+    std::deque<UFWaitInfo*>     _availableWaitInfo;
     UFWaitInfo* getWaitInfo();
     void releaseWaitInfo(UFWaitInfo& ufsi);
     bool addFiberToSelf(UF* uf);
-    bool addFiberToAnotherThread(const std::list<UF*>& ufList, pthread_t tid);
-
-public:
-    UFMutex testingCondTimedWait;
+    bool addFiberToAnotherThread(const UFList& ufList, pthread_t tid);
 };
-inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningListSize; }
+inline unsigned long long int UFScheduler::getRunCounter() const { return _runCounter; }
+inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningList.size(); }
 inline bool UFScheduler::shouldExit() const { return (_exitJustMe || _exit) ? true : false; }
-inline unsigned long long int UFScheduler::getAmtToSleep() const { return _amtToSleep; }
+inline TIME_IN_US UFScheduler::getAmtToSleep() const { return _amtToSleep; }
 inline UF* UFScheduler::getRunningFiberOnThisThread(){ return _currentFiber; }
-inline const ucontext_t& UFScheduler::getMainContext() const { return _mainContext; };
+inline ucontext_t* UFScheduler::getMainContext() { return &_mainContext; }
 inline void UFScheduler::setSpecific(void* args) { _specific = args; }
 inline void* UFScheduler::getSpecific() const { return _specific; }
 inline void UFScheduler::setExit(bool exit) { _exit = exit; }
@@ -275,17 +318,17 @@ inline UFScheduler* UF::getParentSchedul
 
 inline void UF::waitOnLock() { block(); }
 
-inline void UF::gusleep(unsigned long long int sleepAmtInUs)
+inline void UF::gusleep(TIME_IN_US sleepAmtInUs)
 {
     UFScheduler::getUFScheduler()->getRunningFiberOnThisThread()->usleep(sleepAmtInUs);
 }
 
-inline unsigned long long int timeInUS(timeval& t)
+inline TIME_IN_US timeInUS(timeval& t)
 {
-    return ((unsigned long long int)(((unsigned long long int) t.tv_sec)*1000000)+(unsigned long long int) t.tv_usec);
+    return ((TIME_IN_US)(((TIME_IN_US) t.tv_sec)*1000000)+(TIME_IN_US) t.tv_usec);
 }
 
-inline void UF::usleep(unsigned long long int sleepAmtInUs)
+inline void UF::usleep(TIME_IN_US sleepAmtInUs)
 {
     if(!sleepAmtInUs)
     {
@@ -296,7 +339,7 @@ inline void UF::usleep(unsigned long lon
     struct timeval now;
     gettimeofday(&now, 0);
     
-    unsigned long long int timeToWakeUp = timeInUS(now) + sleepAmtInUs;
+    TIME_IN_US timeToWakeUp = timeInUS(now) + sleepAmtInUs;
     if( _parentScheduler->_earliestWakeUpFromSleep > timeToWakeUp ||
        !_parentScheduler->_earliestWakeUpFromSleep)
         _parentScheduler->_earliestWakeUpFromSleep = timeToWakeUp;
@@ -318,7 +361,11 @@ inline void UF::block()
 inline void UF::yield()
 {
     //switch context back to the main scheduler
-    swapcontext(&_UFContext, &(_parentScheduler->getMainContext()));
+#if __WORDSIZE == 64
+    uf_swapcontext(&_UFContext, _parentScheduler->getMainContext());
+#else
+    swapcontext(&_UFContext, _parentScheduler->getMainContext());
+#endif
 }
 
 inline void UFMutex::releaseSpinLock(bool spinCPU)

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFAres.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFAres.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFAres.H Fri Aug 13 19:43:09 2010
@@ -9,6 +9,9 @@
 #include <ares.h>
 #include "UFDNS.H"
 #include "UFHostEnt.H"
+#include <iostream>
+
+using namespace std;
 
 // for ares less then version 1.7.0
 //#ifndef ares_addrttl

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFConf.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFConf.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFConf.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFConf.H Fri Aug 13 19:43:09 2010
@@ -3,6 +3,10 @@
 
 #include <string>
 #include <vector>
+#include <list>
+#include <ostream>
+
+#include <iostream>
 #include <ext/hash_map>
 
 using namespace std;
@@ -13,9 +17,9 @@ namespace __gnu_cxx
     template<> struct hash< std::string >
     {
         size_t operator()( const std::string& x ) const
-            {
-                return hash< const char* >()( x.c_str() );
-            }
+        {
+            return hash< const char* >()( x.c_str() );
+        }
     };
 }
 
@@ -25,10 +29,37 @@ namespace __gnu_cxx
 class ConfValueBase
 {
 public:
-    virtual void dump(ostream &output)=0;
+    virtual void dump(std::ostream &output)=0;
     virtual ~ConfValueBase() { }
+    std::string type;
 };
 
+/** Helper for printing out vector
+ */   
+template <typename T>
+std::ostream& operator <<  (std::ostream& output, const std::vector<T> &value)
+{
+    if(!value.size())
+        return output;
+    
+    for(typename std::vector<T>::const_iterator it = value.begin(); it != value.end(); it++)
+        output << *it << " ";
+    return output;
+}
+
+/** Helper for printing out vector
+ */   
+template <typename T>
+std::ostream& operator <<  (std::ostream& output, const std::list<T> &value)
+{
+    if(!value.size())
+        return output;
+    
+    for(typename std::list<T>::const_iterator it = value.begin(); it != value.end(); it++)
+        output << *it << " ";
+    return output;
+}
+
 /** Template conf value 
  *  This covers all built in types.
  *  This is used by UFConf for string, int, bool and double
@@ -38,56 +69,154 @@ template <class T>
 class ConfValue : public ConfValueBase {
 public:
     T mElement;
-    void dump(ostream& output) { output << mElement; }
-    friend ostream& operator <<  (ostream& output, const ConfValue<T>& value)
+
+    void dump(std::ostream& output) { output << mElement; }
+    friend std::ostream& operator <<  (std::ostream& output, const ConfValue<T>& value)
     {
         output << value.mElement;
         return output;
     }
 };
 
+/** Get conf value from ConfValueBase
+ *  This is a helper function which makes it easier to get conf values
+ *  _testBool = &(((ConfValue<bool> *)conf_val)->mElement) will now become _testBool = confValueGet<bool>(conf_val)
+ */
+template <class T>
+T* confValueGet(ConfValueBase *conf_val)
+{
+    return &(((ConfValue<T> *)conf_val)->mElement);
+}
+
+/// Forward declaration
+class UFConfManager;
+
 /** Holds config data for a given file
  *  The class has a parent conf variable.
  *  the get* functions look at _data. If the requested key is not found, they lookup the parent
  */
 class UFConf
 {
+    friend class UFConfManager;
 public:
-    UFConf() : _parent(NULL) { }
+    UFConf(const std::string &conf_file);
+    void init();
     ConfValueBase *get(const std::string &key);
     int *getInt(const std::string &key);
     double *getDouble(const std::string &key);
-    string *getString(const std::string &key);
+    std::string *getString(const std::string &key);
     bool *getBool(const std::string &key);
+    std::vector<std::string> *getStringVector(const std::string &key);
 
-    void setInt(const std::string &key, int value);
-    void setDouble(const std::string &key, double value);
-    void setString(const std::string &key, const std::string &value);
-    void setBool(const std::string &key, bool value);
+    void setInt(const std::string &key, const std::string &key_type, int value);
+    void setDouble(const std::string &key, const std::string &key_type, double value);
+    void setString(const std::string &key, const std::string &key_type, const std::string &value);
+    void setBool(const std::string &key, const std::string &key_type, bool value);
+    void setStringVector(const std::string &key, const std::string &key_type, const std::vector<std::string> &value);
     
-    void setParent(UFConf *parent) { _parent = parent; }
     bool parse(const std::string &conf_file);
-    ~UFConf();
+    virtual bool parseLine(const std::string &line);
+    virtual void clear();
+    virtual void cache(const std::string &conf_key, ConfValueBase *conf_val) { }
+    virtual void afterParse() { }
+    virtual ~UFConf() { }
+
+    std::string conf_file;
 
-    friend ostream& operator<<(ostream& output, const UFConf &conf);
+    friend std::ostream& operator<<(std::ostream& output, const UFConf &conf);
+    
+protected:
+    template<typename T>
+    void _set(const std::string &conf_key, const std::string &conf_key_type, const T &value)
+        {
+            // Check if a value already exists for the key
+            ConfValueBase *existingValue = get(conf_key);
+            if(existingValue != NULL) {
+                delete existingValue;
+            }
+            
+            // Create new value. Set type. Copy value.
+            ConfValue<T> *confValue = new ConfValue<T>;
+            confValue->type = conf_key_type;
+            confValue->mElement = value;
+            _data[conf_key] = confValue;
+
+            // call cache so that derived classes can cache whatever value they need to
+            cache(conf_key, confValue);
+        }
+
+    template<typename T>
+    T* _get(const std::string &key)
+        {
+            ConfValue<T> *confValue = (ConfValue<T> *)get(key);
+            if(confValue != NULL) {
+                return &confValue->mElement;
+            }
+            if(_parent == NULL)
+                return NULL;
+            return _parent->_get<T>(key);
+        }
+        
+    void _setParent(UFConf *parent);
     
-private:
     UFConf *_parent;
     std::hash_map<std::string, ConfValueBase *> _data;
 };
 
+/** Factory to create conf objects
+ *  This is needed because the cache method is virtual and needs to be called from the constructor
+ */   
+template< class T >
+T* UFConfFactory(const std::string &conf_file)
+{
+    T* confObject = new T(conf_file);
+    confObject->init();
+
+    return confObject;
+}
+
 /** Manages config objects in the system 
  * 
  */
 class UFConfManager
 {
 public:
-    static UFConf* addChildConf(const std::string &conf_file, const std::string &parent_conf_file="/home/y/conf/UF/uf.conf");
-    static UFConf* addConf(const std::string &conf_file);
-    static UFConf* getConf(const std::string &conf_file);
-    static void dump();
+    UFConfManager();
+    void reload();
+    UFConf* addChildConf(const std::string &conf_file, const std::string &parent_conf_file="/home/y/conf/UF/uf.conf");
+    UFConf* addConf(const std::string &conf_file);
+    bool addConf(UFConf *conf, const string &conf_file);
+    bool addChildConf(UFConf *child_conf, const string &conf_file, const string &parent_conf_file="/home/y/conf/UF/uf.conf");
+    UFConf* getConf(const std::string &conf_file);
+    void dump();
+
+    static long getRefreshTime();
+    static void setRefreshTime(long);
+    static UFConfManager* getConfManager();
+    
+    static pthread_key_t threadSpecificKey;
+    
 private:
-    static std::hash_map<std::string, UFConf *> _configs;
+    /// force children to pickup changes from a parent
+    void _reloadChildren(UFConf *conf);
+
+    /// add conf file to list of files being monitored for changes
+    void _addWatch(const std::string &conf);
+
+    /// conf object associated with a conf file
+    std::hash_map<std::string, UFConf *> _configs;
+
+    /// list of children for a given conf
+    std::hash_map<std::string, vector<std::string> > _child_map;
+
+    /// map of fds being monitored for changes to the filename
+    std::hash_map<int, std::string> _watch_fd_map;
+    int _notify_fd;
+
+    /// Time between checking for conf file changes
+    static long _refreshTime;
+
+    static pthread_key_t _createThreadKey();
 };
 
 #endif

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H Fri Aug 13 19:43:09 2010
@@ -11,7 +11,7 @@
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <netdb.h>
-#include "UF.H"
+#include <UF.H>
 
 class UFConnGroupInfo;
 class UFConnIPInfo;
@@ -23,7 +23,7 @@ struct UFConnectionPool
 
     bool addGroup(UFConnGroupInfo* groupInfo);
     UFConnGroupInfo* removeGroup(const std::string& groupName);
-    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true);
+    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true, TIME_IN_US connectTimeout = -1);
     void releaseConnection(UFIO* ufIO, bool connOk = true);
     void clearUnusedConnections(TIME_IN_US lastUsedTimeDiff = 300000000 /*300 secs*/, unsigned long long int coverListTime = 60*1000*1000);
 
@@ -40,5 +40,13 @@ protected:
     UFConnectionPoolImpl*           _impl;
 };
 
+struct UFConnectionPoolCleaner : public UF
+{
+    void run();
+    UFConnectionPoolCleaner (bool registerMe = false);
+    UF* createUF() { return new UFConnectionPoolCleaner(); }
+};
+inline UFConnectionPoolCleaner::UFConnectionPoolCleaner (bool registerMe) { /*if(registerMe) _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);*/ }
+
 
 #endif

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H Fri Aug 13 19:43:09 2010
@@ -1,8 +1,8 @@
 #ifndef UF_DNS_H
 #define UF_DNS_H
 
-#include "UF.H"
-#include "UFIO.H"
+#include <UF.H>
+#include <UFIO.H>
 class UFHostEnt;
 class UFDNS : public UFIO
 {

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFIO.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFIO.H Fri Aug 13 19:43:09 2010
@@ -4,15 +4,13 @@
 #include <string>
 #include <ext/hash_map>
 #include <stdint.h>
-#include "UF.H"
+#include <UF.H>
 
-using namespace std;
 namespace std { using namespace __gnu_cxx; }
 
 
 
 
-
 struct UFIOAcceptThreadChooser
 {
     virtual std::pair<UFScheduler*,pthread_t> pickThread(int listeningFd) = 0;
@@ -60,40 +58,66 @@ struct UFIO
     //TODO: create ipv6 accept model
     void accept(UFIOAcceptThreadChooser* ufiotChooser,
                 unsigned short int ufLocation,
+                unsigned short int port,
                 void* startingArgs,
                 void* stackPtr = 0,
                 unsigned int stackSize = 0);
 
     //The fxn will call isSetup which will make the connection request non-blocking and setup the socket
     //TODO: support ipv6 connect
-    bool connect(const struct sockaddr *addr, int addrlen, TIME_IN_US timeout);
+    bool connect(const struct sockaddr *addr, socklen_t addrlen, TIME_IN_US timeout);
 
-    //since this is most likely going to be run on an edge trigger system
-    //reads should be done in such a manner so that if the read call
-    //returns back w/ the same number specified, then a following read call
-    //should be made to ensure that nothing is left in the network buffer
-    //since in edge trigger no more events will be generated for data that was
-    //already seen earlier as being in the network. Therefore always make 
-    //another call to this fxn if the return was == to nbyte
-    ssize_t read(void *buf, size_t nbyte, TIME_IN_US timeout = 0);
-    ssize_t write(const void *buf, size_t nbyte, TIME_IN_US timeout = 0);
+    /**
+     * @brief read until delimiter is encountered or n-1 bytes has been read
+     * @param buf the output buffer. The buffer is null terminated if readLine was successful.
+     * @param n the size of the buffer pointed to by buf in bytes
+     * @return number of bytes read, not including the delimiter on success. -1 on error, 
+     *         getErrno() will return the underlying error.
+     */
+    ssize_t readLine(char* buf, size_t n, char delim='\n');
+    /**
+     * @brief read until delimiter is encountered or n-1 bytes has been read
+     * @param out the output string
+     * @param n readLine will attempt to read at most n-1 bytes
+     * @return number of bytes read, not including the delimiter on success or -1 on error. 
+     *         getErrno() will return the underlying error.
+     */
+    ssize_t readLine(std::string &out, size_t n, char delim='\n');
+
+    /**
+     * @brief attempt to read nbytes from the underlying fd
+     * @param buf the output buffer
+     * @param timeout the timeout in microseconds. 0 indicates a non-blocking call, -1 is no timeout
+     * @returns number of bytes read, or -1 on error. getErrno() will return the underlying error
+     * 
+     * since this is most likely going to be run on an edge trigger system
+     * reads should be done in such a manner so that if the read call
+     * returns back nbyte, then a following read call
+     * should be made to ensure that nothing is left in the network buffer
+     * since in edge trigger no more events will be generated for data that was
+     * already seen earlier as being in the network. Therefore always make 
+     * another call to this fxn if the return was == to nbyte
+     */
+    ssize_t read(void *buf, size_t nbyte, TIME_IN_US timeout = -1);
+    ssize_t write(const void *buf, size_t nbyte, TIME_IN_US timeout = -1);
+    ssize_t writev(const struct iovec *iov, int iov_size, TIME_IN_US timeout = -1);
     int sendto(const char *msg, 
-                  int len,
-	              const struct sockaddr *to, 
-                  int tolen, 
-                  TIME_IN_US timeout);
+               size_t len,
+               const struct sockaddr *to, 
+               socklen_t tolen, 
+               TIME_IN_US timeout);
     int sendmsg(const struct msghdr *msg, 
-                   int flags,
-	               TIME_IN_US timeout);
+                int flags,
+                TIME_IN_US timeout);
     int recvfrom(char *buf, 
-                    int len, 
-                    struct sockaddr *from,
-		            int *fromlen, 
-                    TIME_IN_US timeout);
+                 size_t len, 
+                 struct sockaddr *from,
+                 socklen_t *fromlen, 
+                 TIME_IN_US timeout);
     int recvmsg(struct msghdr *msg, 
-                   int flags,
-	               TIME_IN_US timeout);
-
+                int flags,
+                TIME_IN_US timeout);
+    
     bool close();
 
     bool setFd(int fd, bool makeNonBlocking = true);
@@ -107,17 +131,23 @@ struct UFIO
     unsigned int getRemotePort() const;
     UFIOScheduler* getUFIOScheduler() const;
 
-    static void ufCreateThreadWithIO(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+    static void ufCreateThreadWithIO(pthread_t* tid, UFList* ufsToStartWith);
 
     UFSleepInfo*                _sleepInfo;
     bool                        _markedActive;
     bool                        _active;
 
+    static int                  RECV_SOCK_BUF;
+    static int                  SEND_SOCK_BUF;
+
 protected:
     int                         _fd;
     unsigned int                _errno;
     UF*                         _uf;
     UFIOScheduler*              _ufios;
+    char*                       _readLineBuf;
+    size_t                      _readLineBufPos;
+    size_t                      _readLineBufSize;
 
     UFIO() { reset(); }
     void reset();
@@ -139,8 +169,8 @@ inline unsigned int UFIO::getRemotePort(
 
 
 struct UFIOScheduler;
-//typedef map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
-typedef hash_map<pthread_t, UFIOScheduler*, hash<uintptr_t> > ThreadFiberIOSchedulerMap;
+//typedef std::map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
+typedef std::hash_map<pthread_t, UFIOScheduler*, std::hash<uintptr_t> > ThreadFiberIOSchedulerMap;
 
 struct UFConnectionPool;
 struct UFIOScheduler
@@ -148,13 +178,13 @@ struct UFIOScheduler
     UFIOScheduler();
     virtual ~UFIOScheduler();
 
-    virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0) = 0;
-    virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0) = 0;
-    virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = 0) = 0;
-    virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0) = 0;
+    virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = -1) = 0;
+    virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = -1) = 0;
+    virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = -1) = 0;
+    virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = -1) = 0;
     virtual bool closeConnection(UFIO* ufio) = 0;
     //TODO: support regular poll behavior
-    virtual bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0) = 0;
+    virtual bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = -1) = 0;
 
     virtual bool isSetup() { return false; }
     virtual void waitForEvents(TIME_IN_US timeToWait) = 0;
@@ -174,7 +204,7 @@ inline UFConnectionPool* UFIOScheduler::
 
 #define MAX_FDS_FOR_EPOLL 128*1024-1
 //typedef map<int, UFIO*> IntUFIOMap;
-typedef hash_map<int, UFIO*, hash<int> > IntUFIOMap;
+typedef std::hash_map<int, UFIO*, std::hash<int> > IntUFIOMap;
 typedef std::multimap<TIME_IN_US, UFSleepInfo*> MapTimeUFIO;
 struct EpollUFIOScheduler : public UFIOScheduler
 {
@@ -185,15 +215,15 @@ struct EpollUFIOScheduler : public UFIOS
     bool isSetup(); //call after the c'tor to verify that the structure is correctly setup
 
     //inputInfo is the flags info (its an int*) for this addition
-    bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0);
-    bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0);
-    bool setupForRead(UFIO* ufio, TIME_IN_US to = 0);
-    bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0);
+    bool setupForConnect(UFIO* ufio, TIME_IN_US to = -1);
+    bool setupForAccept(UFIO* ufio, TIME_IN_US to = -1);
+    bool setupForRead(UFIO* ufio, TIME_IN_US to = -1);
+    bool setupForWrite(UFIO* ufio, TIME_IN_US to = -1);
     bool closeConnection(UFIO* ufio);
-    bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0);
+    bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = -1);
 
 
-    void waitForEvents(TIME_IN_US timeToWait = -1);
+    void waitForEvents(TIME_IN_US timeToWait = 0);
 
     bool                            _interruptedByEventFd;
 
@@ -208,16 +238,16 @@ protected:
 
 
     MapTimeUFIO                     _sleepList;
-    unsigned long long int          _earliestWakeUpFromSleep;
+    TIME_IN_US          _earliestWakeUpFromSleep;
 
     bool addToScheduler(UFIO* ufio, 
                         void* inputInfo /*flags to identify how ot add*/, 
-                        TIME_IN_US to = 0,
+                        TIME_IN_US to = -1,
                         bool wait = true, 
                         bool runEpollCtl = false);
 
 
-    list<UFSleepInfo*>  _availableSleepInfo;
+    std::deque<UFSleepInfo*>        _availableSleepInfo;
     UFSleepInfo* getSleepInfo();
     void releaseSleepInfo(UFSleepInfo& ufsi);
 };

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFPC.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFPC.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFPC.H Fri Aug 13 19:43:09 2010
@@ -3,67 +3,200 @@
 
 #include <set>
 #include <stack>
-#include "UF.H"
+#include <deque>
+#include <UF.H>
+//#include <Factory.H>
 
 struct UFMutex;
 struct UFProducerData;
 struct UFConsumer;
 struct UFProducer;
 
+
 struct UFConsumer
 {
     friend class UFProducer;
-    UFConsumer(bool notifyOnExitOnly = false);
-    virtual ~UFConsumer();
+    friend class UFJoinableProducer;
+    friend class UFNonJoinableProducer;
+
     //user needs to call input->releaseMe after consuming the data
-    UFProducerData* waitForData(UF* uf = 0);
+    UFProducerData* waitForData(UF* uf = 0, size_t* numRemaining = 0, TIME_IN_US timeToWait=0);
     bool hasData(UF* uf = 0);
 
-    bool joinProducer(UFProducer* ufp);
-    bool removeProducer(UFProducer* ufp);
-    void reset();
-
-    std::list<UFProducerData*>      _queueOfDataToConsume;
-    UFMutex                         _queueOfDataToConsumeLock;
     bool getNotifyOnExitOnly() const;
-
     UF* getUF() const;
+
     bool                            _requireLockToWaitForUpdate; //if the developer is aware that both the producer and all the consumers are going to run in the same thread - only then set this variable to false to gain some perf. benefits
-protected:
-    std::set<UFProducer*>           _consumersProducerSet;
+
+    UFConsumer();
+    virtual ~UFConsumer() { reset(); }
+    void reset();
+
+    virtual std::string getMyType() = 0;
+
     UFMutex                         _consumersProducerSetLock;
     bool                            _notifyOnExitOnly;
+
+
+protected:
     UF*                             _currUF;
+    std::string                     _myType;
+    std::deque<UFProducerData*>     _queueOfDataToConsume;
+    UFMutex                         _queueOfDataToConsumeLock;
+
+    void clearDataToConsume();
 };
 inline UF* UFConsumer::getUF() const { return _currUF; }
 inline bool UFConsumer::getNotifyOnExitOnly() const { return _notifyOnExitOnly; }
 
+
+struct UFJoinableProducer;
+struct UFJoinableConsumer : public UFConsumer
+{
+    friend class UFJoinableProducer;
+    friend class UFProducer;
+    UFJoinableConsumer(bool notifyOnExitOnly = false);
+    ~UFJoinableConsumer() { resetMe(); };
+    bool joinProducer(UFJoinableProducer* ufp);
+    bool removeProducer(UFJoinableProducer* ufp);
+
+    virtual std::string getMyType() { return "UFJoinableConsumer"; }
+
+protected:
+    std::deque<UFJoinableProducer*>         _consumersProducerSet;
+    void resetMe();
+};
+
+
+struct UFNonJoinableProducer;
+struct UFNonJoinableConsumer : public UFConsumer
+{
+    friend class UFNonJoinableProducer;
+    friend class UFProducer;
+    UFNonJoinableConsumer(UFNonJoinableProducer* ufp, bool notifyOnExitOnly = false);
+    ~UFNonJoinableConsumer() { resetMe(); }
+    void resetMe();
+
+    virtual std::string getMyType() { return "UFNonJoinableConsumer"; }
+
+protected:
+    bool removeProducer();
+    UFNonJoinableProducer*         _ufp;
+};
+
+
+
+
+
+
+struct UFDataObject;
 struct UFProducer
 {
+    friend class UFProducerConsumerPair;
     friend class UFConsumer;
     UFProducer();
-    virtual ~UFProducer();
-    bool removeConsumer(UFConsumer* ufc);
-    bool produceData(void* data, unsigned int size, int ufpcCode, bool freeDataOnExit = true, UF* uf = 0);
+    virtual ~UFProducer() {}
+    size_t produceData(UFDataObject* data, int ufpcCode, bool freeDataOnExit = true, UF* uf = 0);
+    size_t produceData(UFProducerData* ufpd, UF* uf = 0);
+    virtual size_t getConsumerCount() = 0;
     void reset();
     void init();
+    bool                            _sendEOFAtEnd;
     bool                            _requireLockToUpdateConsumers;//if the developer is aware that both the producer and the consumers are going to run in the same thread - only then set this variable to false to gain some perf. benefits
 
 protected:
-    std::set<UFConsumer*>           _producersConsumerSet;
-    size_t                          _producersConsumerSetSize;
     UFConsumer*                     _mostRecentConsumerAdded;
     UFMutex                         _producersConsumerSetLock; //needed when the consumers are adding or removing themselves from the consumerList
     bool                            _acceptNewConsumers;
 
-    bool addConsumer(UFConsumer* ufc);
+    virtual bool addConsumer(UFConsumer* ufc) = 0;
+    virtual bool removeConsumer(UFConsumer* ufc) = 0;
+    virtual size_t updateConsumers(UFProducerData* ufpd, UF* uf) = 0;
+    virtual void removeAllConsumers() = 0;
+
     UF*                             _uf;
 };
+inline UFProducer::UFProducer() { init(); }
+inline void UFProducer::init() 
+{ 
+    _acceptNewConsumers = true; 
+    _requireLockToUpdateConsumers = true; 
+    _mostRecentConsumerAdded = 0;
+    _sendEOFAtEnd = true;
+}
+
+struct UFJoinableProducer : public UFProducer
+{
+    UFJoinableProducer() {}
+    ~UFJoinableProducer() { reset(); };
+    bool addConsumer(UFConsumer* ufc);
+    bool removeConsumer(UFConsumer* ufc);
+    size_t getConsumerCount();
+
+protected:
+    std::deque<UFConsumer*>         _producersConsumerSet;
+    size_t updateConsumers(UFProducerData* ufpd, UF* uf);
+    void removeAllConsumers();
+};
+inline size_t UFJoinableProducer::getConsumerCount()
+{
+    if(!_requireLockToUpdateConsumers)
+        return _producersConsumerSet.size();
+
+    size_t count = 0;
+    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    _producersConsumerSetLock.lock(uf);    
+    count = _producersConsumerSet.size();
+    _producersConsumerSetLock.unlock(uf);
+
+    return count;
+}
+
+struct UFNonJoinableProducer : public UFProducer
+{
+    friend class UFNonJoinableConsumer;
+    UFNonJoinableProducer(UFNonJoinableConsumer* ufc) { if(ufc) _mostRecentConsumerAdded = ufc; }
+    ~UFNonJoinableProducer() { reset(); }
+    size_t getConsumerCount();
+
+protected:
+    UFConsumer*     _ufc;
+    UFNonJoinableProducer() {}
+    size_t updateConsumers(UFProducerData* ufpd, UF* uf);
+    void removeAllConsumers();
+    bool addConsumer(UFConsumer* ufc);
+    bool removeConsumer(UFConsumer* ufc);
+};
+inline size_t UFNonJoinableProducer::getConsumerCount()
+{
+    if(!_requireLockToUpdateConsumers)
+        return _mostRecentConsumerAdded ? 1 : 0;
+
+    size_t count = 0;
+    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    _producersConsumerSetLock.lock(uf);    
+    count = _mostRecentConsumerAdded ? 1 : 0;
+    _producersConsumerSetLock.unlock(uf);
+
+    return count;
+}
+
+
+
+
+
+
+struct UFDataObject
+{
+    UFDataObject();
+    virtual ~UFDataObject();
+};
+inline UFDataObject::UFDataObject() {}
+inline UFDataObject::~UFDataObject() {}
 
 struct UFProducerData
 {
-    void*                           _data;
-    unsigned int                    _size;
+    UFDataObject*                   _data;
     UFProducer*                     _producerWhichInserted;
     int                             _ufpcCode;
     bool                            _freeDataOnExit;
@@ -76,10 +209,10 @@ struct UFProducerData
     static UFProducerData* getObj();
     static void releaseObj(UFProducerData* obj);
 
-
-protected:
     ~UFProducerData();
     UFProducerData() { reset(); }
+
+protected:
     UFMutex                         _controlReferenceCount; //control the ref. count of this data
     size_t                          _referenceCount;
     static std::stack<UFProducerData*> _objList;
@@ -92,67 +225,34 @@ inline void UFProducerData::reset()
     _lockToUpdate = true;
 }
 
-inline bool UFProducer::addConsumer(UFConsumer* ufc)
+inline void UFProducerData::addRef(size_t numToAdd)
 {
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    if(!_acceptNewConsumers)
+    if(!_lockToUpdate)
     {
-        if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-        return false;
+        _referenceCount += numToAdd;
+        return;
     }
-    _producersConsumerSet.insert(ufc); //check insertion
-    _producersConsumerSetSize++;
-    _mostRecentConsumerAdded = ufc;
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-    return true;
+
+    _controlReferenceCount.getSpinLock();
+    _referenceCount += numToAdd;
+    _controlReferenceCount.releaseSpinLock();
 }
 
-inline bool UFProducer::removeConsumer(UFConsumer* ufc)
+inline void UFProducerData::reduceRef()
 {
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    _producersConsumerSet.erase(ufc);
-    _producersConsumerSetSize--;
-    if(_requireLockToUpdateConsumers) 
-        _producersConsumerSetLock.signal();
-    else
+    if(!_lockToUpdate)
     {
-        if(_uf)
-            UFScheduler::getUFScheduler()->addFiberToScheduler(_uf);
+        --_referenceCount;
+        return;
     }
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-    return true;
-}
-
-inline void UFProducerData::addRef(size_t numToAdd)
-{
-    if(_lockToUpdate) _controlReferenceCount.getSpinLock();
-    _referenceCount = numToAdd;
-    if(_lockToUpdate) _controlReferenceCount.releaseSpinLock();
-}
 
-inline void UFProducerData::reduceRef()
-{
-    if(_lockToUpdate) _controlReferenceCount.getSpinLock();
+    _controlReferenceCount.getSpinLock();
     --_referenceCount;
-    if(_lockToUpdate) _controlReferenceCount.releaseSpinLock();
+    _controlReferenceCount.releaseSpinLock();
 }
 
 inline UFProducerData* UFProducerData::getObj()
 {
-    /*
-    _objListMutex.getSpinLock();
-    if(!_objList.empty())
-    {
-        UFProducerData* retVal = _objList.top();
-        _objList.pop();
-        _objListMutex.releaseSpinLock();
-        retVal->reset();
-        return retVal;
-    }
-    _objListMutex.releaseSpinLock();
-    */
     return new UFProducerData();
 }
 
@@ -164,11 +264,6 @@ inline void UFProducerData::releaseObj(U
     if(obj->_lockToUpdate) obj->_controlReferenceCount.getSpinLock();
     if(!--obj->_referenceCount)
     {
-        /*
-        _objListMutex.getSpinLock();
-        _objList.push(obj);
-        _objListMutex.releaseSpinLock();
-        */
         delete obj;
         return;
     }
@@ -178,19 +273,40 @@ inline void UFProducerData::releaseObj(U
 inline UFProducerData::~UFProducerData()
 {
     if(_freeDataOnExit && _data)
-        free (_data);
+        delete (_data);
 }
 
-inline UFConsumer::~UFConsumer() { reset(); } 
 
-inline UFProducer::~UFProducer() { reset(); }
-inline void UFProducer::init() 
-{ 
-    _acceptNewConsumers = true; 
-    _requireLockToUpdateConsumers = true; 
-    _producersConsumerSetSize = 0;
-    _mostRecentConsumerAdded = 0;
+
+struct UFProducerConsumerPair
+{
+    UFProducerConsumerPair();
+    ~UFProducerConsumerPair();
+    UFNonJoinableConsumer* getConsumer() const { return _c; }
+    UFNonJoinableProducer* getProducer() const { return _p; }
+
+protected:
+    UFNonJoinableConsumer*      _c;
+    UFNonJoinableProducer*      _p;
+};
+inline UFProducerConsumerPair::UFProducerConsumerPair()
+{
+    _p = new UFNonJoinableProducer(0);
+    _c = new UFNonJoinableConsumer(_p);
+}
+inline UFProducerConsumerPair::~UFProducerConsumerPair()
+{
+    if(_p)
+    {
+        _p->_requireLockToUpdateConsumers = false; //dont waste time on locking
+        _p->_uf = 0; //the producer shouldnt be associated w/ any uf anymore
+    }
+    if(_c) 
+    { 
+        _c->_requireLockToWaitForUpdate = false; //dont waste time on locking
+        delete _c; 
+    }
+    if(_p) delete _p; //the producer is killed after the consumer, so that the producer doesnt have to wait for the consumer to die and simply keep the uf alive
 }
-inline UFProducer::UFProducer() { init(); }
 
 #endif

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFServer.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFServer.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFServer.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFServer.H Fri Aug 13 19:43:09 2010
@@ -4,9 +4,10 @@
 #include <pthread.h>
 #include <map>
 #include <vector>
+#include <list>
 
-#include "UF.H"
-#include "UFIO.H"
+#include <UF.H>
+#include <UFIO.H>
 
 typedef std::map<std::string, std::vector<pthread_t>* > StringThreadMapping;
 struct UFServerThreadChooser;
@@ -26,10 +27,16 @@ public:
     unsigned int getProcessCount() const { return _childProcesses.size(); }
     unsigned int UF_STACK_SIZE;
     const char* getBindingInterface() const { return _addressToBindTo.c_str() ; }
-    unsigned int getPort() const { return _port ; }
-    unsigned int getListenFd() const { return _listenFd; }
-    unsigned int setListenFd(int fd) { return (_listenFd = fd); }
 
+    struct ListenSocket
+    {
+        unsigned short int port;
+        int fd;
+        ListenSocket(unsigned short int p = 0, int f = -1) : port(p), fd(f) { };
+    };
+    typedef std::list<ListenSocket> ListenSocketList;
+
+    const ListenSocketList &getListenSockets() const { return _listenSockets; }
 
     //functions that are allowed to be modified by the 
     //inherited class to provide customizable functionalities 
@@ -38,7 +45,9 @@ public:
     virtual void postBetweenFork(int childPid) {}
     virtual void postForkPreRun() {}
     virtual void preThreadRun() {}
+    virtual void postThreadRun() {}
     virtual void preThreadCreation() {}
+    virtual void postThreadCreation() {}
     virtual void preAccept() {}
 
 
@@ -50,15 +59,14 @@ public:
 
     StringThreadMapping* getThreadList() { return &_threadList; };
     std::vector<pthread_t>* getThreadType(const std::string& type);
-    void addThread(const std::string& type, UFScheduler* ufScheduler);
+    void addThread(const std::string& type, UFScheduler* ufScheduler, pthread_t tid=0);
     void run();
 
 protected:
     void reset();
 
     std::string                     _addressToBindTo;
-    unsigned int                    _port;
-    int                             _listenFd;
+    ListenSocketList                _listenSockets;
     unsigned int                    _listenBackLog;
 
     time_t                          _creationTime;
@@ -72,30 +80,39 @@ protected:
 
     void startThreads();
 
+    void _addListenPort(unsigned short int port)
+    {
+        _listenSockets.push_back(ListenSocket(port));
+    }
+
 private:
     //start processing
 };
 
+
+
 struct UFServerThreadChooser : public UFIOAcceptThreadChooser
 {
     UFServerThreadChooser() { }
 
     std::pair<UFScheduler*, pthread_t> pickThread(int listeningFd);
-    void add(UFScheduler* ufs, pthread_t tid)
-    {
-        _threadList.push_back(make_pair(ufs, tid));
-    }
+    void add(UFScheduler* ufs, pthread_t tid);
 
 protected:
-    std::vector<pair<UFScheduler*, pthread_t> > _threadList;
+    std::vector<std::pair<UFScheduler*, pthread_t> > _threadList;
 };
 
+inline void UFServerThreadChooser::add(UFScheduler* ufs, pthread_t tid)
+{
+    _threadList.push_back(std::make_pair(ufs, tid));
+}
+
 inline std::pair<UFScheduler*, pthread_t> UFServerThreadChooser::pickThread(int listeningFd)
 {
     static unsigned int lastLocUsed = 0;
     if(!_threadList.size())
     {
-        cerr<<"there has to be some fabric to hand the request to"<<endl;
+        std::cerr<<"there has to be some fabric to hand the request to"<<std::endl;
         exit(1);
     }
 

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H Fri Aug 13 19:43:09 2010
@@ -11,7 +11,7 @@
 #include <vector>
 #include <utility>
 #include <sstream>
-#include "UF.H"
+#include <UF.H>
 
 struct Stat
 {

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFStats.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStats.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFStats.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFStats.H Fri Aug 13 19:43:09 2010
@@ -6,6 +6,7 @@
 namespace UFStats
 {
     void registerStats(bool lock_needed = false);
+    extern uint32_t currentConnections;
     extern uint32_t connectionsHandled;
     extern uint32_t txnSuccess;
     extern uint32_t txnFail;

Propchange: trafficserver/traffic/branches/UserFiber/core/include/UFSwapContext.H
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: trafficserver/traffic/branches/UserFiber/core/src/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/Makefile?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/Makefile Fri Aug 13 19:43:09 2010
@@ -33,20 +33,20 @@ $(ARES_SRC):
 	   tar xzf ./$(ARES_SRC_FILE); \
 	fi
 
-$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H UFSwapContext.H
+$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UF.o UF.C
 
 $(LIB_DIR)/UFPC.o: UFPC.C $(INCLUDE_DIR)/UFPC.H $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFPC.o UFPC.C
 
-$(LIB_DIR)/UFConnectionPoolImpl.o: $(LIB_DIR)/UFAres.o UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
+$(LIB_DIR)/UFConnectionPoolImpl.o: UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConnectionPoolImpl.o UFConnectionPoolImpl.C
 
 $(LIB_DIR)/UFIO.o: UFIO.C $(INCLUDE_DIR)/UFIO.H $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFIO.o UFIO.C
 
-$(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_DIR)/UFAres.H $(INCLUDE_DIR)/UFDNS.H $(INCLUDE_DIR)/UFHostEnt.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o $(ARES_SRC)
-	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFAres.o UFAres.C
+#$(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_DIR)/UFAres.H $(INCLUDE_DIR)/UFDNS.H $(INCLUDE_DIR)/UFHostEnt.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o $(ARES_SRC)
+#	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFAres.o UFAres.C
 
 $(LIB_DIR)/UFStatSystem.o: UFStatSystem.C $(INCLUDE_DIR)/UFStatSystem.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFStatSystem.o UFStatSystem.C
@@ -63,8 +63,9 @@ $(LIB_DIR)/UFServer.o: UFServer.C $(INCL
 $(LIB_DIR)/UFSwapContext.o: UFSwapContext.S
 	$(CC) -c -o $@ $^
 
-$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
-	$(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $(ARESDIR)/$(ARES)/.libs/*.o $^
+#$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
+$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o
+	$(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $^
 	$(RANLIB) $(LIB_DIR)/libUF.a
 
 clean: 

Modified: trafficserver/traffic/branches/UserFiber/core/src/UF.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UF.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UF.C Fri Aug 13 19:43:09 2010
@@ -1,5 +1,4 @@
-#include "UF.H"
-#include "UFConnectionPool.H"
+#include <UF.H>
 
 #include <string.h>
 #include <iostream>
@@ -10,7 +9,7 @@
 #include <stdio.h>
 #include <malloc.h>
 #include <sys/mman.h>
-#include "UFSwapContext.H"
+#include <UFSwapContext.H>
 
 using namespace std;
 
@@ -33,12 +32,13 @@ static void runFiber(void* args)
 
 ///////////////UF/////////////////////
 UFFactory* UFFactory::_instance = 0;
-const unsigned int DEFAULT_STACK_SIZE = 4*4096;
+unsigned int UF::DEFAULT_STACK_SIZE = 4*4096;
 UFId UF::_globalId = 0;
 
 UF::UF()
 { 
-    _startingArgs = 0;
+    reset();
+    _myId = ++_globalId;  //TODO: make atomic
     setup();
 }
 
@@ -59,17 +59,14 @@ bool UF::setup(void* stackPtr, size_t st
     }
 #endif
 
-    _myId = ++_globalId;  //TODO: make atomic
-    _status = NOT_STARTED;
-
     if(!stackPtr || !stackSize)
     {
 #ifndef DEBUG
-        _UFContext.uc_stack.ss_size = (stackSize) ? stackSize : DEFAULT_STACK_SIZE;
+        _UFContext.uc_stack.ss_size = (stackSize) ? stackSize : UF::DEFAULT_STACK_SIZE;
         _UFContext.uc_stack.ss_sp = (void*) malloc (_UFContext.uc_stack.ss_size);
 #else
-        _UFContext.uc_stack.ss_size = DEFAULT_STACK_SIZE;
-        _UFContext.uc_stack.ss_sp = (void*) memalign (pageSize, DEFAULT_STACK_SIZE);
+        _UFContext.uc_stack.ss_size = UF::DEFAULT_STACK_SIZE;
+        _UFContext.uc_stack.ss_sp = (void*) memalign (pageSize, UF::DEFAULT_STACK_SIZE);
         if(!_UFContext.uc_stack.ss_sp)
         {
             cerr<<"couldnt allocate space from memalign "<<strerror(errno)<<endl;
@@ -91,8 +88,6 @@ bool UF::setup(void* stackPtr, size_t st
     }
     _UFContext.uc_stack.ss_flags = 0;
 
-    _parentScheduler = 0;
-
     return true;
 }
 
@@ -116,7 +111,6 @@ static pthread_key_t getThreadKey()
 pthread_key_t UFScheduler::_specific_key = getThreadKey();
 UFScheduler::UFScheduler()
 {
-    _activeRunningListSize = 0;
     _earliestWakeUpFromSleep = 0;
     _exitJustMe = false;
     _specific = 0;
@@ -161,25 +155,39 @@ UFScheduler::UFScheduler()
 
     pthread_setspecific(_specific_key, this);
     _amtToSleep = 0;
+    _runCounter = 1;
 }
 
-UFScheduler::~UFScheduler() { /*pthread_key_delete(_specific_key);*/ }
+UFScheduler::~UFScheduler() 
+{ 
+    //remove the UFScheduler associated w/ this thread
+    pthread_mutex_lock(&_mutexToCheckFiberSchedulerMap);
+    ThreadUFSchedulerMap::iterator index = _threadUFSchedulerMap.find(pthread_self());
+    if(index != _threadUFSchedulerMap.end())
+        _threadUFSchedulerMap.erase(index);
+    pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
+
+    /*pthread_key_delete(_specific_key);*/ 
+}
 
 bool UFScheduler::addFiberToSelf(UF* uf)
 {
-    if(uf->_status == WAITING_TO_RUN) //UF is already in the queue
+    if(!uf)
+        return false;
+    if(uf->_status == WAITING_TO_RUN || 
+       uf->_status == YIELDED) //UF is already in the queue
         return true;
     uf->_status = WAITING_TO_RUN;
-    if(uf->_parentScheduler) //probably putting back an existing uf into the active list
+    if(uf->getParentScheduler()) //probably putting back an existing uf into the active list
     {
-        if(uf->_parentScheduler == this) //check that we're scheduling for the same thread
+        if(uf->getParentScheduler() == this) //check that we're scheduling for the same thread
         {
-            _activeRunningList.push_back(uf); ++_activeRunningListSize;
+            _activeRunningList.push_front(uf);
             return true;
         }
         else
         {
-            cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->_parentScheduler<<endl;
+            cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->getParentScheduler()<<endl;
             abort(); //TODO: remove the abort
             return false;
         }
@@ -202,12 +210,15 @@ bool UFScheduler::addFiberToSelf(UF* uf)
         cerr<<"error while trying to run makecontext"<<endl;
         return false;
     }
-    _activeRunningList.push_back(uf); ++_activeRunningListSize;
+    _activeRunningList.push_front(uf);
     return true;
 }
 
 bool UFScheduler::addFiberToAnotherThread(const list<UF*>& ufList, pthread_t tid)
 {
+    if(ufList.empty())
+        return false;
+
     //find the other thread -- 
     //TODO: have to lock before looking at this map - 
     //since it could be changed if more threads are added later - not possible in the test that is being run (since the threads are created before hand)
@@ -260,15 +271,13 @@ bool UFScheduler::addFiberToScheduler(co
         return true;
 
     //adding to the same scheduler and as a result thread as the current job
-    UF* uf = 0;
-    list<UF*>::const_iterator beg = ufList.begin();
-    list<UF*>::const_iterator ending = ufList.end();
     if(!tid || (tid == pthread_self()))
     {
+        list<UF*>::const_iterator beg = ufList.begin();
+        list<UF*>::const_iterator ending = ufList.end();
         for(; beg != ending; ++beg)
         {
-            uf = *beg;
-            if(addFiberToSelf(uf))
+            if(addFiberToSelf(*beg))
                 continue;
             else
                 return false;
@@ -295,23 +304,31 @@ void UFScheduler::runScheduler()
     _amtToSleep = DEFAULT_SLEEP_IN_USEC;
     bool ranGetTimeOfDay = false;
 
-    UFList::iterator beg;
     struct timeval now;
     struct timeval start,finish;
     gettimeofday(&start, 0);
-    unsigned long long int timeNow = 0;
+    TIME_IN_US timeNow = 0;
 
-    UFList::iterator ufBeg;
-    UFList::iterator nBeg;
     MapTimeUF::iterator slBeg;
     bool waiting = false;
+    //unsigned long long int runCounter = 1;
     while(!shouldExit())
     {
-        for(ufBeg = _activeRunningList.begin(); ufBeg != _activeRunningList.end(); )
+        ++_runCounter;
+        while(!_activeRunningList.empty())
         {
-            UF* uf = *ufBeg;
-            _currentFiber = uf;
+            if(shouldExit())
+                break;
+
+            UF* uf = _activeRunningList.front();
+            if(uf->_status == YIELDED &&
+               uf->_lastRun == _runCounter) //we have looped back
+                break;
+            //printf("%lu - running uf %lu on iter %llu\n", pthread_self(), (uintptr_t)uf, _runCounter);
+            _activeRunningList.pop_front();
+            uf->_lastRun = _runCounter;
             uf->_status = RUNNING;
+            _currentFiber = uf;
 #if __WORDSIZE == 64
             uf_swapcontext(&_mainContext, &(uf->_UFContext));
 #else
@@ -320,20 +337,18 @@ void UFScheduler::runScheduler()
             _currentFiber = 0;
 
             if(uf->_status == BLOCKED)
-            {
-                ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
                 continue;
-            }
             else if(uf->_status == COMPLETED) 
             {
-                delete uf;
-                ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
+                if(uf->_myFactory)
+                    uf->_myFactory->releaseUF(uf);
+                else
+                    delete uf;
                 continue;
             }
-
             //else uf->_status == RUNNING
-            uf->_status = WAITING_TO_RUN;
-            ++ufBeg;
+            uf->_status = YIELDED;
+            _activeRunningList.push_back(uf);
         }
 
 
@@ -354,20 +369,18 @@ void UFScheduler::runScheduler()
             //TODO: do atomic comparison to see if there is anything in 
             //_nominateToAddToActiveRunningList before getting the lock
             pthread_mutex_lock(&_mutexToNominateToActiveList);
-            for(nBeg = _nominateToAddToActiveRunningList.begin();
-                nBeg != _nominateToAddToActiveRunningList.end(); )
+            do
             {
-                UF* uf = *nBeg;
-                if(uf->_parentScheduler)
+                UF* uf = _nominateToAddToActiveRunningList.front();
+                if(uf->getParentScheduler())
                 {
                     uf->_status = WAITING_TO_RUN;
-                    _activeRunningList.push_front(uf); ++_activeRunningListSize;
+                    _activeRunningList.push_front(uf);
                 }
                 else //adding a new fiber
                     addFiberToScheduler(uf, 0);
-                nBeg = _nominateToAddToActiveRunningList.erase(nBeg);
-            }
-
+                _nominateToAddToActiveRunningList.pop_front();
+            }while(!_nominateToAddToActiveRunningList.empty());
             pthread_mutex_unlock(&_mutexToNominateToActiveList);
         }
 
@@ -393,7 +406,7 @@ void UFScheduler::runScheduler()
                         if(ufwi->_uf)
                         {
                             ufwi->_uf->_status = WAITING_TO_RUN;
-                            _activeRunningList.push_front(ufwi->_uf); ++_activeRunningListSize;
+                            _activeRunningList.push_front(ufwi->_uf);
                             ufwi->_uf = NULL;
                         }
                         waiting = ufwi->_waiting;
@@ -419,13 +432,13 @@ void UFScheduler::runScheduler()
         }
 
         //see if there is anything to do or is it just sleeping time now
-        if(!_notifyFunc && !_activeRunningListSize && !shouldExit())
+        if(!_notifyFunc && _activeRunningList.empty() && !shouldExit())
         {
             if(_inThreadedMode) //go to conditional wait (in threaded mode)
             {
                 struct timespec ts;
-                unsigned long long int nSecToIncrement = (int)(_amtToSleep/1000000);
-                unsigned long long int nUSecToIncrement = (int)(_amtToSleep%1000000);
+                int nSecToIncrement = (int)(_amtToSleep/1000000);
+                TIME_IN_US nUSecToIncrement = (TIME_IN_US)(_amtToSleep%1000000);
                 if(!ranGetTimeOfDay)
                     gettimeofday(&now, 0);
                 ts.tv_sec = now.tv_sec + nSecToIncrement;
@@ -442,7 +455,7 @@ void UFScheduler::runScheduler()
     }
     gettimeofday(&finish, 0);
 
-    unsigned long long int diff = (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec - start.tv_usec);
+    TIME_IN_US diff = (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec - start.tv_usec);
     cerr<<pthread_self()<<" time taken in this thread = "<<diff<<"us"<<endl;
 }
 
@@ -498,10 +511,10 @@ int UFFactory::registerFunc(UF* uf)
     return _size++;
 }
 
-const unsigned int CONSECUTIVE_LOCK_FAILURES_ALLOWED = 3;
+const unsigned int CONSECUTIVE_LOCK_FAILURES_ALLOWED = 15;
 bool UFMutex::lock(UF* uf)
 {
-    if(!uf || !uf->_parentScheduler)
+    if(!uf || !uf->getParentScheduler())
         return false;
 
     getSpinLock();
@@ -575,38 +588,28 @@ bool UFMutex::unlock(UF* uf)
     if(!uf)
         return false;
 
-    UFList::iterator beg;
     getSpinLock();
 
-    beg = _listOfClientsWaitingOnLock.begin();
-    if(uf == *beg) //check if this uf is the current owner of this lock
+    if(uf == _listOfClientsWaitingOnLock.front()) //check if this uf is the current owner of this lock
     {
+        _listOfClientsWaitingOnLock.pop_front();
         _lockCurrentlyOwned = false;
-        beg = _listOfClientsWaitingOnLock.erase(beg);
 #ifdef LOCK_DEBUG
         printf("%lu u %d\n", (unsigned long int) ((uintptr_t)(void*)uf), _listOfClientsWaitingOnLock.size());
 #endif
 
-        bool releasedLock = false;
         //notify the next UF in line
         while(!_listOfClientsWaitingOnLock.empty())
         {
-            UF* tmpUf = *beg;
-            if(!tmpUf || !tmpUf->_parentScheduler) //invalid tmpuf - cant wake it up
+            UF* tmpUf = _listOfClientsWaitingOnLock.front();
+            if(!tmpUf || !tmpUf->getParentScheduler()) //invalid tmpuf - cant wake it up
             {
 #ifdef LOCK_DEBUG
                 printf("%lu nf1\n", (unsigned long int) ((uintptr_t)(void*)uf));
 #endif
-                beg = _listOfClientsWaitingOnLock.erase(beg);
-                if(beg == _listOfClientsWaitingOnLock.end())
-                    break;
+                _listOfClientsWaitingOnLock.pop_front();
                 continue;
             }
-            /*
-            if(tmpUf->getStatus() == WAITING_TO_RUN) //this uf has already been put into the waiting to run list
-                break;
-                */
-
 
 #ifdef LOCK_DEBUG
             printf("%lu wk %lu\n", 
@@ -615,14 +618,11 @@ bool UFMutex::unlock(UF* uf)
 #endif
 
             releaseSpinLock();
-            releasedLock = true;
-            uf->_parentScheduler->addFiberToScheduler(tmpUf, tmpUf->_parentScheduler->_tid);
-            break;
+            uf->getParentScheduler()->addFiberToScheduler(tmpUf, tmpUf->_parentScheduler->_tid);
+            return true;
         }
 
-        if(!releasedLock)
-            releaseSpinLock();
-
+        releaseSpinLock();
         return true;
     }
     else
@@ -635,7 +635,7 @@ bool UFMutex::unlock(UF* uf)
     return false;
 }
 
-bool UFMutex::tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS)
+bool UFMutex::tryLock(UF* uf, TIME_IN_US autoRetryIntervalInUS)
 {
     while(1)
     {
@@ -668,7 +668,7 @@ bool UFMutex::condWait(UF* uf)
     //the object is already in the hash
     if(_listOfClientsWaitingOnCond.find(uf) == _listOfClientsWaitingOnCond.end())
     {
-        UFWaitInfo *ufwi = uf->_parentScheduler->getWaitInfo();
+        UFWaitInfo *ufwi = uf->getParentScheduler()->getWaitInfo();
         ufwi->_uf = uf;
         ufwi->_waiting = true;
     
@@ -708,7 +708,7 @@ void UFMutex::broadcast()
         // If uf is not NULL, schedule it and make sure no one else can schedule it again
         if(ufwi->_uf) 
         {
-            ufs->addFiberToScheduler(ufwi->_uf, ufwi->_uf->_parentScheduler->_tid);
+            ufs->addFiberToScheduler(ufwi->_uf, ufwi->_uf->getParentScheduler()->_tid);
             ufwi->_uf = NULL;
         }
         
@@ -760,10 +760,10 @@ void UFMutex::signal()
     }
 
     if(uf_to_signal)
-        ufs->addFiberToScheduler(uf_to_signal, uf_to_signal->_parentScheduler->_tid);
+        ufs->addFiberToScheduler(uf_to_signal, uf_to_signal->getParentScheduler()->_tid);
 }
 
-int UFMutex::condTimedWait(UF* uf, unsigned long long int sleepAmtInUs)
+bool UFMutex::condTimedWait(UF* uf, TIME_IN_US sleepAmtInUs)
 {
     bool result = false;
     if(!uf)
@@ -783,9 +783,9 @@ int UFMutex::condTimedWait(UF* uf, unsig
     // Add to sleep queue
     struct timeval now;
     gettimeofday(&now, 0);
-    unsigned long long int timeNow = timeInUS(now);
+    TIME_IN_US timeNow = timeInUS(now);
     ufwi->_sleeping = true;
-    uf->_parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), ufwi));
+    uf->getParentScheduler()->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), ufwi));
     
     uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal, broadcast or timeout has occurred
 
@@ -802,11 +802,11 @@ void* setupThread(void* args)
     if(!args)
         return 0;
 
-    list<UF*>* ufsToStartWith = (list<UF*>*) args;
+    UFList* ufsToStartWith = (UFList*) args;
     UFScheduler ufs;
     ufs.addFiberToScheduler(*ufsToStartWith, 0);
     delete ufsToStartWith;
-
+    
     //run the scheduler
     ufs.runScheduler();
 
@@ -819,10 +819,24 @@ void UFScheduler::ufCreateThread(pthread
     pthread_attr_t attr;
     pthread_attr_init(&attr);
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
+    
     if(pthread_create(tid, &attr, setupThread, (void*)ufsToStartWith) != 0)
     {
         cerr<<"couldnt create thread "<<strerror(errno)<<endl;
         exit(1);
     }
 }
+
+string getPrintableTime()
+{
+    char asctimeDate[32];
+    asctimeDate[0] = '\0';
+    time_t now = time(0);
+    asctime_r(localtime(&now), asctimeDate);
+
+    string response = asctimeDate;
+    size_t loc = response.find('\n');
+    if(loc != string::npos)
+        response.replace(loc, 1, "");
+    return response;
+}

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFAres.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFAres.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFAres.C Fri Aug 13 19:43:09 2010
@@ -1,6 +1,8 @@
-#include "UFAres.H"
+#include <UFAres.H>
 #include <stdio.h>
 
+using namespace std;
+
 static void printHost(struct hostent* host , struct ares_addrttl *ttls = 0, int nttl = 0)
 {
   //  return;



Mime
View raw message