trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aku...@apache.org
Subject svn commit: r942228 [1/2] - in /trafficserver/traffic/branches/UserFiber: Makefile UF.C UF.H UFIO.C UFIO.H UFServer.C UFServer.H UFStatSystem.C UFStatSystem.H ufHTTPServer.C
Date Fri, 07 May 2010 21:10:36 GMT
Author: akundu
Date: Fri May  7 21:10:35 2010
New Revision: 942228

URL: http://svn.apache.org/viewvc?rev=942228&view=rev
Log:
Early prototype of UserFiber code
    - support for various process models
        - support for running in single process, with 'n' accept threads and 'm' I/O threads
        - support for running in process mode (w/ & w/out threads)
        - support for multiple accept threads (note: atleast one accept thread is needed per process (if in process mode))
    - scheduler (to schedule UserFibers)
    - first go at I/O system
    - User Locking system - support for
        - lock
        - unlock
        - try_lock (altho the need for this is questioned - as the locking system is queue based)
        - signal
        - broadcast
        - conditional_wait (NO conditional timed wait yet (soon to come))
    - sample http server (more additions to follow)
    - Stats system (borrowed from the new stats system support in ATS)
        - support to provide results on a separate port + thread
        - aggregate stats across threads
        - allow to add new stats in real-time

Currently the support is only for Linux w/ epoll. Other I/O scheduling systems to follow.

Known things that have to be worked on
    - stop calling sigprocmask on every setcontext
    - makecontext doesnt allow to set a 64-bit parameter as one of the args (have to replace w/ a different scheme)
    - conditional timed wait for User Locking system
    - Producer/Consumer module
    - ipv6 support


Added:
    trafficserver/traffic/branches/UserFiber/Makefile
    trafficserver/traffic/branches/UserFiber/UF.C
    trafficserver/traffic/branches/UserFiber/UF.H
    trafficserver/traffic/branches/UserFiber/UFIO.C
    trafficserver/traffic/branches/UserFiber/UFIO.H
    trafficserver/traffic/branches/UserFiber/UFServer.C
    trafficserver/traffic/branches/UserFiber/UFServer.H
    trafficserver/traffic/branches/UserFiber/UFStatSystem.C
    trafficserver/traffic/branches/UserFiber/UFStatSystem.H
    trafficserver/traffic/branches/UserFiber/ufHTTPServer.C

Added: trafficserver/traffic/branches/UserFiber/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/Makefile?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/Makefile (added)
+++ trafficserver/traffic/branches/UserFiber/Makefile Fri May  7 21:10:35 2010
@@ -0,0 +1,26 @@
+CPP=c++
+BUILD_FLAGS=-g -O3 -Wall -DPIPE_NOT_EFD -Wno-deprecated
+ARCH=x86-64
+
+all:	ufHTTPServer
+
+UF.o:	UF.C UF.H
+	$(CPP) $(BUILD_FLAGS) -c -o UF.o UF.C -march=$(ARCH) 
+
+UFIO.o:	UFIO.C UFIO.H
+	$(CPP) $(BUILD_FLAGS) -c -o UFIO.o UFIO.C -march=$(ARCH)
+
+UFStatSystem.o: UFStatSystem.C UFStatSystem.H
+	$(CPP) $(BUILD_FLAGS) -c -o UFStatSystem.o UFStatSystem.C -march=$(ARCH)
+
+UFServer.o: UFServer.C UFServer.H
+	$(CPP) $(BUILD_FLAGS) -c -o UFServer.o UFServer.C -march=$(ARCH)
+
+ufHTTPServer.o:	ufHTTPServer.C UF.o UFIO.o UFStatSystem.o UFServer.o
+	$(CPP) $(BUILD_FLAGS) -c -o ufHTTPServer.o ufHTTPServer.C -march=$(ARCH)
+
+ufHTTPServer:	ufHTTPServer.o
+	$(CPP) $(BUILD_FLAGS) -o ufHTTPServer UF.o UFIO.o UFStatSystem.o UFServer.o ufHTTPServer.o -lpthread -march=$(ARCH)
+
+clean: 
+	rm *.o ufHTTPServer

Added: trafficserver/traffic/branches/UserFiber/UF.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UF.C?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UF.C (added)
+++ trafficserver/traffic/branches/UserFiber/UF.C Fri May  7 21:10:35 2010
@@ -0,0 +1,425 @@
+#include "UF.H"
+#include <string.h>
+
+#include <iostream>
+#include <errno.h>
+#include <stdlib.h>
+
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <malloc.h>
+#include <sys/mman.h>
+
+using namespace std;
+
+static void runFiber(void* args)
+{
+    if(!args)
+        return;
+
+    UF* uf = (UF*)args;
+    uf->run();
+    uf->_status = COMPLETED;
+}
+
+///////////////UF/////////////////////
+UFFactory* UFFactory::_instance = 0;;
+const unsigned int DEFAULT_STACK_SIZE = 4*4096;
+UFId UF::_globalId = 0;
+UF::UF()
+{ 
+    _startingArgs = 0;
+    setup();
+}
+
+UF::~UF()
+{
+    if(_UFObjectCreatedStack && _UFContext.uc_stack.ss_sp)
+        free(_UFContext.uc_stack.ss_sp);
+}
+
+bool UF::setup(void* stackPtr, size_t stackSize)
+{
+#ifdef DEBUG
+    static int pageSize = sysconf(_SC_PAGE_SIZE);
+    if(pageSize == -1)
+    {
+        cerr<<"couldnt get sysconf for pageSize "<<strerror(errno)<<endl;
+        exit(1);
+    }
+#endif
+
+    _myId = ++_globalId;  //TODO: make atomic
+    _status = NOT_STARTED;
+
+    if(!stackPtr || !stackSize)
+    {
+#ifndef DEBUG
+        _UFContext.uc_stack.ss_size = (stackSize) ? stackSize : DEFAULT_STACK_SIZE;
+        _UFContext.uc_stack.ss_sp = (void*) malloc (_UFContext.uc_stack.ss_size);
+#else
+        _UFContext.uc_stack.ss_size = DEFAULT_STACK_SIZE;
+        _UFContext.uc_stack.ss_sp = (void*) memalign (pageSize, DEFAULT_STACK_SIZE);
+        if(!_UFContext.uc_stack.ss_sp)
+        {
+            cerr<<"couldnt allocate space from memalign "<<strerror(errno)<<endl;
+            exit(1);
+        }
+        if (mprotect((char*)_UFContext.uc_stack.ss_sp+(pageSize*3), pageSize, PROT_NONE) == -1)
+        {
+            cerr<<"couldnt mprotect location "<<strerror(errno)<<endl;
+            exit(1);
+        }
+#endif
+        _UFObjectCreatedStack = true;
+    }
+    else
+    {
+        _UFContext.uc_stack.ss_size = stackSize;
+        _UFContext.uc_stack.ss_sp = stackPtr;
+        _UFObjectCreatedStack = false;
+    }
+    _UFContext.uc_stack.ss_flags = 0;
+
+    _parentScheduler = 0;
+
+    return true;
+}
+
+
+
+
+
+
+///////////////UFScheduler/////////////////////
+ThreadUFSchedulerMap UFScheduler::_threadUFSchedulerMap;
+pthread_mutex_t UFScheduler::_mutexToCheckFiberScheduerMap = PTHREAD_MUTEX_INITIALIZER;
+static pthread_key_t getThreadKey()
+{
+    if(pthread_key_create(&UFScheduler::_specific_key, 0) != 0)
+    {
+        cerr<<"couldnt create thread specific key "<<strerror(errno)<<endl;
+        exit(1);
+    }
+    return UFScheduler::_specific_key;
+}
+pthread_key_t UFScheduler::_specific_key = getThreadKey();
+UFScheduler::UFScheduler()
+{
+    _specific = 0;
+    _currentFiber = 0;
+    //_exit = false;
+    if(_inThreadedMode)
+    {
+        pthread_mutex_init(&_mutexToNominateToActiveList, NULL);
+        pthread_cond_init(&_condToNominateToActiveList, NULL);
+    }
+
+
+    //check that there are no other schedulers already running in this thread
+    if(_inThreadedMode)
+    {
+        pthread_t currThreadId = pthread_self();
+
+        pthread_mutex_lock(&_mutexToCheckFiberScheduerMap);
+        if(_threadUFSchedulerMap.find(currThreadId) != _threadUFSchedulerMap.end())
+        {
+            cerr<<"cannot have more than one scheduler per thread"<<endl;
+            exit(1);
+        }
+        _threadUFSchedulerMap[currThreadId] = this;
+        pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+    }
+    else
+    {
+        if(_threadUFSchedulerMap.find(0) != _threadUFSchedulerMap.end())
+        {
+            cerr<<"cannot have more than one scheduler per thread"<<endl;
+            exit(1);
+        }
+
+        //for non-threaded mode we consider the pthread_t id to be 0
+        _threadUFSchedulerMap[0] = this;
+    }
+
+    _tid = (_inThreadedMode) ? pthread_self() : 0;
+    _notifyFunc = 0;
+    _notifyArgs = 0;
+
+    pthread_setspecific(_specific_key, this);
+}
+
+UFScheduler::~UFScheduler()
+{
+    pthread_key_delete(_specific_key);
+}
+
+
+bool UFScheduler::addFiberToScheduler(UF* uf, pthread_t tid)
+{
+    if(!uf)
+    {
+        cerr<<"returning cause there is a scheduler already"<<endl;
+        return false;
+    }
+
+    list<UF*> ufList;
+    ufList.push_front(uf);
+    return addFibersToScheduler(ufList, tid);
+}
+
+bool UFScheduler::addFibersToScheduler(const list<UF*>& ufList, pthread_t tid)
+{
+    if(ufList.empty())
+        return true;
+
+    list<UF*>::const_iterator beg = ufList.begin();
+    list<UF*>::const_iterator ending = ufList.end();
+    //adding to the same scheduler and as a result thread as the current job
+    if(!tid || (tid == pthread_self()))
+    {
+        for(; beg != ending; ++beg)
+        {
+            UF* uf = *beg;
+            uf->_status = WAITING_TO_RUN;
+            if(uf->_parentScheduler) //probably putting back an existing uf into the active list
+            {
+                if(uf->_parentScheduler != this) //cant schedule for some other thread
+                {
+                    cerr<<"uf is not part of this scheduler"<<endl;
+                    return false;
+                }
+                _activeRunningList.push_back(uf);
+                continue;
+            }
+
+            //create a new context
+            uf->_parentScheduler = this;
+            uf->_UFContext.uc_link = &_mainContext;
+
+            getcontext(&(uf->_UFContext));
+            errno = 0;
+            makecontext(&(uf->_UFContext), (void (*)(void)) runFiber, 1, (void*)uf);
+            if(errno != 0)
+            {
+                cerr<<"error while trying to run makecontext"<<endl;
+                return false;
+            }
+            _activeRunningList.push_back(uf);
+        }
+    }
+    else //adding to some other threads' scheduler
+    {
+        //find the other thread -- 
+        //TODO: have to lock before looking at this map - 
+        //since it could be changed if more threads are added later - not possible in the test that is being run (since the threads are created before hand)
+        ThreadUFSchedulerMap::iterator index = _threadUFSchedulerMap.find(tid);
+        if(index == _threadUFSchedulerMap.end())
+        {
+            cerr<<"couldnt find the scheduler associated with "<<tid<<" for uf = "<<*beg<<endl;
+            ThreadUFSchedulerMap::iterator beg = _threadUFSchedulerMap.begin();
+            return false;
+        }
+
+        UFScheduler* ufs = index->second;
+        pthread_mutex_lock(&(ufs->_mutexToNominateToActiveList));
+        for(; beg != ending; ++beg)
+            ufs->_nominateToAddToActiveRunningList.push_back(*beg);
+        pthread_cond_signal(&(ufs->_condToNominateToActiveList));
+        pthread_mutex_unlock(&(ufs->_mutexToNominateToActiveList));
+        ufs->notifyUF();
+    }
+
+    return true;
+}
+
+void UFScheduler::notifyUF()
+{
+    if(_notifyFunc)
+        _notifyFunc(_notifyArgs);
+}
+
+
+bool UFScheduler::_exit = false;
+const unsigned int DEFAULT_SLEEP_IN_USEC = 1000000;
+void UFScheduler::runScheduler()
+{
+    errno = 0;
+
+    unsigned long long int amtToSleep = DEFAULT_SLEEP_IN_USEC;
+    bool ranGetTimeOfDay = false;
+    bool firstRun = true;
+
+    UFList::iterator beg;
+    struct timeval now;
+    struct timeval start,finish;
+    gettimeofday(&start, 0);
+    while(!_exit)
+    {
+        UFList::iterator beg = _activeRunningList.begin();
+        for(; beg != _activeRunningList.end(); )
+        {
+            UF* uf = *beg;
+            _currentFiber = uf;
+            uf->_status = RUNNING;
+            swapcontext(&_mainContext, &(uf->_UFContext));
+            _currentFiber = 0;
+
+            if(uf->_status == RUNNING) { }
+            else if(uf->_status == BLOCKED)
+            {
+                beg = _activeRunningList.erase(beg);
+                continue;
+            }
+            else if(uf->_status == COMPLETED) 
+            {
+                delete uf;
+                beg = _activeRunningList.erase(beg);
+                continue;
+            }
+
+            uf->_status = WAITING_TO_RUN;
+            ++beg;
+        }
+
+
+        //check if some other thread has nominated some user fiber to be
+        //added to this thread's list -
+        //can happen in the foll. situations
+        //1. the main thread is adding a new user fiber
+        //2. some fiber has requested to move to another thread
+        if(!_nominateToAddToActiveRunningList.empty() /*TODO: take this out later w/ the atomic size count*/ &&
+           _inThreadedMode)
+
+        {
+            //TODO: do atomic comparison to see if there is anything in 
+            //_nominateToAddToActiveRunningList before getting the lock
+            pthread_mutex_lock(&_mutexToNominateToActiveList);
+            UFList::iterator beg = _nominateToAddToActiveRunningList.begin();
+            for(; beg != _nominateToAddToActiveRunningList.end(); )
+            {
+                UF* uf = *beg;
+                if(uf->_parentScheduler)
+                    _activeRunningList.push_back(uf);
+                else //adding a new fiber
+                    addFiberToScheduler(uf, 0);
+                beg = _nominateToAddToActiveRunningList.erase(beg);
+            }
+
+            pthread_mutex_unlock(&_mutexToNominateToActiveList);
+        }
+
+
+        ranGetTimeOfDay = false;
+        amtToSleep = DEFAULT_SLEEP_IN_USEC;
+        //pick up the fibers that may have completed sleeping
+        //look into the sleep list;
+        if(!_sleepList.empty())
+        {
+            gettimeofday(&now, 0);
+            ranGetTimeOfDay = true;
+            unsigned long long int timeNow = (now.tv_sec*1000000)+now.tv_usec;
+            firstRun = true;
+            for(MapTimeUF::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
+            {
+                //TODO: has to be cleaned up
+                //1. see if anyone has crossed the sleep timer - add them to the active list
+                if(beg->first <= timeNow) //sleep time is over
+                {
+                    _activeRunningList.push_back(beg->second);
+                    _sleepList.erase(beg);
+                    beg = _sleepList.begin();
+                    continue;
+                }
+                else
+                {
+                    if(firstRun)
+                        amtToSleep = beg->first-timeNow;
+                    break;
+                }
+                firstRun = false;
+                ++beg;
+            }
+        }
+
+        //see if there is anything to do or is it just sleeping time now
+        if(!_notifyFunc && _activeRunningList.empty() && !_exit)
+        {
+            if(_inThreadedMode) //go to conditional wait (in threaded mode)
+            {
+                struct timespec ts;
+                unsigned long long int nSecToIncrement = (int)(amtToSleep/1000000);
+                unsigned long long int nUSecToIncrement = (int)(amtToSleep%1000000);
+                if(!ranGetTimeOfDay)
+                    gettimeofday(&now, 0);
+                ts.tv_sec = now.tv_sec + nSecToIncrement;
+                ts.tv_nsec = (now.tv_usec + nUSecToIncrement)*1000; //put in nsec
+
+                pthread_mutex_lock(&_mutexToNominateToActiveList);
+                if(_nominateToAddToActiveRunningList.empty())
+                    pthread_cond_timedwait(&_condToNominateToActiveList, &_mutexToNominateToActiveList, &ts);
+                pthread_mutex_unlock(&_mutexToNominateToActiveList);
+            }
+            else //sleep in non-threaded mode
+                usleep(amtToSleep);
+        }
+    }
+    gettimeofday(&finish, 0);
+
+    unsigned long long int diff = (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec - start.tv_usec);
+    cerr<<pthread_self()<<" time taken in this thread = "<<diff<<"us"<<endl;
+}
+
+
+bool UFScheduler::_inThreadedMode = true;
+UFScheduler* UFScheduler::getUFScheduler(pthread_t tid)
+{
+    if(!tid || tid == pthread_self())
+        return (UFScheduler*)pthread_getspecific(_specific_key);
+
+    pthread_mutex_lock(&_mutexToCheckFiberScheduerMap);
+    ThreadUFSchedulerMap::const_iterator index = _threadUFSchedulerMap.find(tid);
+    if(index == _threadUFSchedulerMap.end())
+    {
+        pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+        return 0;
+    }
+    pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+
+    return const_cast<UFScheduler*>(index->second);
+}
+
+UF* UFScheduler::getUF(pthread_t tid)
+{
+    return const_cast<UF*>(getUFScheduler(tid)->getRunningFiberOnThisThread());
+}
+
+UFFactory::UFFactory()
+{
+    _size = 0;
+    _capacity = 0;
+    _objMapping = 0;
+}
+
+int UFFactory::registerFunc(UF* uf)
+{
+    //not making this code thread safe - since this should only happen at init time
+    if(_size == _capacity)
+    {
+        _capacity  = _capacity ? _capacity : 5 /*start w/ 5 slots*/;
+        _capacity *= 2; //double each time
+        UF** tmpObjMapping = (UF**) malloc (sizeof(UF*)*_capacity);
+
+        for(unsigned int i = 0; i < _size; ++i)
+            tmpObjMapping[i] = _objMapping[i];
+        if(_objMapping)
+            free(_objMapping);
+
+        _objMapping = tmpObjMapping;
+    }
+
+    _objMapping[_size] = uf;
+    return _size++;
+}
+

Added: trafficserver/traffic/branches/UserFiber/UF.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UF.H?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UF.H (added)
+++ trafficserver/traffic/branches/UserFiber/UF.H Fri May  7 21:10:35 2010
@@ -0,0 +1,424 @@
+#ifndef USERTHREADS_H
+#define USERTHREADS_H
+
+#include <sys/time.h>
+
+#include <iostream>
+#include <map>
+#include <stdint.h>
+#include <ext/hash_map>
+#include <set>
+#include <vector>
+#include <list>
+#include <ucontext.h>
+#include <pthread.h>
+
+
+using namespace std;
+namespace std { using namespace __gnu_cxx; }
+
+enum UFStatus
+{
+    NOT_STARTED         = 0,
+    WAITING_TO_RUN      = 1,
+    BLOCKED             = 2,
+    RUNNING             = 3,
+    COMPLETED           = 4
+};
+
+//create the type of UF you'd like to pass into the accept handler
+typedef unsigned long long int UFId;
+struct UFScheduler;
+struct UF
+{
+    friend class UFScheduler;
+    friend class UFMutex;
+
+    ///the constructor
+    UF();
+    bool setup(void* stackPtr = 0, size_t stackSize = 0);
+    virtual ~UF();
+
+    virtual void run() = 0; 
+    virtual UF* createUF() = 0; //instructs the derived classes on how to create this object
+
+
+    UFScheduler*         getParentScheduler() const;
+    UFId                 self() const { return _myId; }
+    //must be called after the fiber is added to a scheduler
+    //otherwise behavior is unexpected
+    void                 yield();
+    ///must be called after the fiber is added to a scheduler
+    void                 usleep(unsigned long long int sleepAmtInUs);
+    ///simply block the fiber
+    void                 block();
+
+
+    UFStatus             _status;
+    void*                _startingArgs;
+
+protected:
+    static UFId          _globalId;
+    UFId                 _myId;
+    UFScheduler*         _parentScheduler;
+    ucontext_t           _UFContext;
+    bool                 _UFObjectCreatedStack;
+
+private:
+    void waitOnLock();
+};
+
+struct UFFactory
+{
+    static UFFactory* getInstance();
+    UFFactory();
+
+    UF* selectUF(unsigned int location);;
+    int registerFunc(UF* uf);
+
+protected:
+    static UFFactory*   _instance;
+    UF**                _objMapping;
+    size_t              _capacity;
+    size_t              _size;
+};
+inline UFFactory* UFFactory::getInstance()
+{
+    return (_instance ? _instance : (_instance = new UFFactory()));
+}
+inline UF* UFFactory::selectUF(unsigned int location)
+{
+    return _objMapping[location];
+}
+
+
+
+typedef std::list<UF*>          UFList;
+struct UFMutex
+{
+    UFMutex() 
+    {
+        _lockCurrentlyOwned = false;
+        _pendingLockNotification = 0;
+        _lockActive = 0;
+    }
+
+    bool lock(UF* uf);
+    bool unlock(UF* uf);
+    bool tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS = 0);
+
+    //THE CALLER MUST get the lock before calling this fxn
+    //THE CALLER MUST release the lock after this fxn is called
+    bool condWait(UF* uf);
+    //THE CALLER MUST get the lock before calling this fxn
+    //THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
+    void broadcast();
+    //THE CALLER MUST get the lock before calling this fxn
+    //THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
+    void signal();
+
+
+protected:
+    int             _lockActive;
+    UFList          _listOfClientsWaitingOnLock;
+    UFList          _listOfClientsWaitingOnCond;
+    bool            _lockCurrentlyOwned;
+    UF*             _pendingLockNotification;
+
+    void releaseLocalLock();
+    void getLocalLock();
+};
+
+
+//per thread scheduler
+typedef std::multimap<unsigned long long int, UF*> MapTimeUF;
+//typedef std::map<pthread_t,UFScheduler*> ThreadUFSchedulerMap;
+typedef hash_map<pthread_t, UFScheduler*, hash<uintptr_t> > ThreadUFSchedulerMap;
+struct UFScheduler
+{
+    friend class UF;
+    friend class UFMutex;
+
+    UFScheduler();
+    ~UFScheduler();
+    void runScheduler();
+
+
+    //call this fxn the first time you're adding a UF 
+    //(not after that - currently cant move an existing UF to a different thread)
+    bool addFiberToScheduler(UF* uf,      /* the UF to add */
+                                pthread_t tid = 0); /* the thread to add the UF to */
+    //add the fxn to add multiple ufs in one shot (if they're on one tid)
+    bool addFibersToScheduler(const std::list<UF*>& ufList, pthread_t tid = 0);
+
+
+
+    static ThreadUFSchedulerMap  _threadUFSchedulerMap;
+    static pthread_mutex_t       _mutexToCheckFiberScheduerMap;
+            
+    //returns the fiber scheduler on this thread or other threads;
+    static UFScheduler*          getUFScheduler(pthread_t tid = 0); 
+    //returns the UF running in the thread provided or the current thread
+    static UF*                   getUF(pthread_t tid = 0); 
+
+    //asks the system to work in threaded mode or not (default is yes)
+    static bool                  _inThreadedMode;
+
+    UF* getRunningFiberOnThisThread();
+    const ucontext_t& getMainContext() const;
+    void setSpecific(void* args);
+    void* getSpecific() const;
+    void setExit(bool exit = true);
+
+    ///the variable that says whether the scheduler should be handling the sleep or
+    //if its handled w/in the UserFabrics
+    void*                       (*_notifyFunc)(void*);
+    void*                       _notifyArgs;
+
+
+    //to allow to identify the thread running now
+    static pthread_key_t        _specific_key;
+
+    static bool                 _exit;
+protected:
+    UF*                         _currentFiber;
+    ucontext_t                  _mainContext;
+
+    //no lock for active running list - cause only the running
+    //thread can add to it
+    UFList                      _activeRunningList;
+
+    //nominate to add to a thread's running list
+    UFList                      _nominateToAddToActiveRunningList;
+    pthread_mutex_t             _mutexToNominateToActiveList;
+    pthread_cond_t              _condToNominateToActiveList;
+
+    //the sleep tree
+    MapTimeUF                   _sleepList;
+
+    //store thread specific content
+    void*                       _specific;
+    pthread_t                   _tid;
+
+    void notifyUF();
+
+public:
+    //stats for thread
+    std::vector<long long> _stats;
+    UFMutex _stats_lock;
+
+};
+
+inline UF* UFScheduler::getRunningFiberOnThisThread(){ return _currentFiber; }
+inline const ucontext_t& UFScheduler::getMainContext() const { return _mainContext; };
+inline void UFScheduler::setSpecific(void* args) { _specific = args; }
+inline void* UFScheduler::getSpecific() const { return _specific; }
+inline void UFScheduler::setExit(bool exit) { _exit = exit; }
+
+inline UFScheduler* UF::getParentScheduler() const { return _parentScheduler; }
+
+inline void UF::waitOnLock() { block(); }
+
+inline void UF::usleep(unsigned long long int sleepAmtInUs)
+{
+    if(!sleepAmtInUs)
+    {
+        //yield(); //just give up control of the CPU
+        return;
+    }
+
+    struct timeval now;
+    gettimeofday(&now, 0);
+    unsigned long long int timeNow = now.tv_sec*1000000+now.tv_usec;
+    _parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), this));
+    block();
+}
+
+inline void UF::block()
+{
+    _status = BLOCKED;
+    yield();
+}
+
+inline void UF::yield()
+{
+    //switch context back to the main scheduler
+    swapcontext(&_UFContext, &(_parentScheduler->getMainContext()));
+}
+
+inline void UFMutex::releaseLocalLock()
+{
+    while(!__sync_bool_compare_and_swap(&_lockActive, 1, 0)) { sched_yield(); }
+    //while(!__sync_bool_compare_and_swap(&_lockActive, 1, 0)) { }
+}
+
+inline void UFMutex::getLocalLock()
+{
+    while(!__sync_bool_compare_and_swap(&_lockActive, 0, 1)) { sched_yield(); }
+    //while(!__sync_bool_compare_and_swap(&_lockActive, 0, 1)) { }
+}
+
+inline bool UFMutex::lock(UF* uf)
+{
+    if(!uf || !uf->_parentScheduler)
+        return false;
+
+    getLocalLock();
+    if(_listOfClientsWaitingOnLock.empty()) //probably the most common case (no UF has the lock)
+    {
+        _listOfClientsWaitingOnLock.push_back(uf);
+        _lockCurrentlyOwned = true;
+        releaseLocalLock();
+        return true;
+    }
+
+    //see if any UF is holding the lock right now - if not get the lock
+    //this is the case where between the time that an UF is woken up
+    //(after another UF releases the lock)
+    //and it actually runs this requesting UF might be able to procure the lock
+    if(!_lockCurrentlyOwned)
+    {
+        _listOfClientsWaitingOnLock.push_front(uf);
+        _lockCurrentlyOwned = true;
+        releaseLocalLock();
+        return true;
+    }
+
+    //for the rest of the UFs that didnt meet the above criteria 
+    //and didnt get the lock they have to wait
+    _listOfClientsWaitingOnLock.push_back(uf);
+    releaseLocalLock();
+
+    while(1) //try to get the lock
+    {
+        //simply yield - since the uf will be woken up once it gets the lock
+        uf->waitOnLock();
+
+        //since this uf got woken up - check if it can get the lock now
+        getLocalLock();
+
+        if(_pendingLockNotification != uf) //there can only be one pending notification out at any time - since this lock was woken up - this must have gotten the pending notification
+        {
+            releaseLocalLock();
+            continue;
+        }
+        _pendingLockNotification = 0; 
+
+        //check if any other UF has gotten the lock between the time that this UF 
+        //got the notification and actually acted on it
+        if(!_lockCurrentlyOwned && (_listOfClientsWaitingOnLock.front() == uf))
+        {
+            _lockCurrentlyOwned = true;
+            releaseLocalLock();
+            return true;
+        }
+
+        releaseLocalLock();
+    }
+
+    return true;
+}
+
+inline bool UFMutex::unlock(UF* uf)
+{
+    if(!uf)
+        return false;
+
+    UFList::iterator beg;
+    getLocalLock();
+
+    beg = _listOfClientsWaitingOnLock.begin();
+    if(uf == *beg) //check if this uf is the current owner of this lock
+    {
+        _lockCurrentlyOwned = false;
+        beg = _listOfClientsWaitingOnLock.erase(beg);
+
+        //notify the next UF in line
+        if(!_listOfClientsWaitingOnLock.empty() && !_pendingLockNotification)
+        {
+            _pendingLockNotification = (*beg);
+            releaseLocalLock();
+            (*beg)->_parentScheduler->addFiberToScheduler((*beg), (*beg)->_parentScheduler->_tid);
+        }
+        else
+            releaseLocalLock();
+
+        return true;
+    }
+
+    releaseLocalLock();
+    return false;
+}
+
+inline bool UFMutex::tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS)
+{
+    while(1)
+    {
+        getLocalLock();
+        if(_listOfClientsWaitingOnLock.empty())
+        {
+            _listOfClientsWaitingOnLock.push_back(uf);
+            _lockCurrentlyOwned = true;
+            releaseLocalLock();
+            return true;
+        }
+
+        releaseLocalLock();
+
+        if(!autoRetryIntervalInUS)
+            break;
+
+        usleep(autoRetryIntervalInUS);
+    }
+
+    return false;
+}
+
+
+
+inline bool UFMutex::condWait(UF* uf)
+{
+    if(!uf)
+        return false;
+    _listOfClientsWaitingOnCond.push_back(uf) ;
+    unlock(uf);
+    uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal or broadcast has occurred
+    lock(uf);
+
+    return true;
+}
+
+inline void UFMutex::broadcast()
+{
+    if(_listOfClientsWaitingOnCond.empty())
+        return;
+
+    //notify all the waiters to wake up
+    UFList::iterator beg = _listOfClientsWaitingOnCond.begin();
+    for(; beg != _listOfClientsWaitingOnCond.end(); )
+    {
+        (*beg)->_parentScheduler->addFiberToScheduler(*beg, (*beg)->_parentScheduler->_tid);
+        beg = _listOfClientsWaitingOnCond.erase(beg);
+    }
+}
+
+inline void UFMutex::signal()
+{
+    if(_listOfClientsWaitingOnCond.empty())
+        return;
+
+    UFList::iterator beg = _listOfClientsWaitingOnCond.begin();
+    (*beg)->_parentScheduler->addFiberToScheduler(*beg, (*beg)->_parentScheduler->_tid);
+    _listOfClientsWaitingOnCond.erase(beg);
+}
+
+//TODO: later
+/*
+inline bool UFMutex::condTimedWait(UF* uf)
+{
+    if(!uf)
+        return false;
+}
+*/
+
+#endif

Added: trafficserver/traffic/branches/UserFiber/UFIO.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UFIO.C?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFIO.C (added)
+++ trafficserver/traffic/branches/UserFiber/UFIO.C Fri May  7 21:10:35 2010
@@ -0,0 +1,866 @@
+#include "UFIO.H"
+#include <netdb.h>
+#include <sys/socket.h> 
+#include <sys/time.h> 
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <iostream>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <string>
+#include <string.h>
+
+using namespace std;
+
+static int makeSocketNonBlocking(int fd)
+{
+    int flags = 1;
+    if ((flags = fcntl(fd, F_GETFL, 0)) < 0 ||
+        fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
+        return -1;
+
+    return fd;
+}
+
+void UFIO::reset()
+{
+    _fd = -1;
+    _uf = 0;
+    _errno = 0; 
+    _ufios = 0; 
+    _lastEpollFlag = 0;
+    _amtReadLastTimeEqualToAskedAmt = false;
+    _sleepInfo = 0;
+}
+
+UFIO::~UFIO()
+{
+    close();
+}
+
+UFIO::UFIO(UF* uf, int fd)
+{ 
+    reset();
+    _uf = (uf) ? uf : UFScheduler::getUF(pthread_self());
+
+    if(fd != -1)
+        setFd(fd);
+}
+
+bool UFIO::close()
+{
+    if(_ufios)
+        _ufios->closeConnection(this);
+    _ufios = 0;
+
+    if(_fd != -1)
+        ::close(_fd);
+    _fd = -1;
+
+    return true;
+}
+
+bool UFIO::setFd(int fd, bool makeNonBlocking)
+{
+    if(_fd != -1)
+        close();
+
+    _fd = fd;
+    if(makeNonBlocking)
+        return ((makeSocketNonBlocking(_fd) != -1) ? true : false);
+    return true;
+}
+
+bool UFIO::isSetup(bool makeNonBlocking)
+{
+    if(_fd != -1)
+        return true;
+
+    if ((_fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
+    {
+        cerr<<"couldnt setup socket "<<strerror(errno)<<endl;
+        return false;
+    } 
+
+    if(makeNonBlocking)
+        return ((makeSocketNonBlocking(_fd) != -1) ? true : false);
+    return true;
+}
+
+int UFIO::setupConnectionToAccept(const char* i_a, 
+                                  unsigned short int port, 
+                                  unsigned short int backlog,
+                                  bool makeSockNonBlocking)
+{
+    int fd = -1;
+    if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
+    {
+        cerr<<"couldnt setup socket "<<strerror(errno)<<endl;
+        return false;
+    } 
+
+    if(makeSockNonBlocking && (makeSocketNonBlocking(fd) == -1))
+    {
+        ::close(fd);
+        return -1;
+    }
+
+    char* interface_addr = ((i_a) && strlen(i_a)) ? const_cast<char*>(i_a) : const_cast<char*>(string("0.0.0.0").c_str());
+
+    int n = 1;
+    if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n)) < 0) 
+    {
+        cerr<<"couldnt setup reuseaddr for accept connection"<<endl;
+        errno = EINVAL;
+        ::close(fd);
+        return -1;
+    }
+   
+    struct sockaddr_in serv_addr;
+    memset(&serv_addr, 0, sizeof(serv_addr));
+    serv_addr.sin_family = AF_INET;
+    serv_addr.sin_port = htons(port);
+    serv_addr.sin_addr.s_addr = inet_addr(interface_addr);
+    if (serv_addr.sin_addr.s_addr == INADDR_NONE) //interface given as a name
+    {
+        struct hostent *hp;
+        if ((hp = gethostbyname(interface_addr)) == NULL)
+        {
+            cerr<<"couldnt resolve name "<<strerror(errno)<<endl;
+            ::close(fd);
+            errno = EINVAL;
+            return -1;
+        }
+        memcpy(&serv_addr.sin_addr, hp->h_addr, hp->h_length);
+    }
+
+    if (bind(fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) != 0)
+    {
+        cerr<<"couldnt bind to "<<interface_addr<<" on port "<<port<<" - "<<strerror(errno)<<endl;
+
+        ::close(fd);
+        errno = EINVAL;
+        return -1;
+    }
+
+    if (listen(fd, backlog) != 0)
+    {
+        cerr<<"couldnt setup listen to "<<interface_addr<<" on port "<<port<<" - "<<strerror(errno)<<endl;
+        ::close(fd);
+        errno = EINVAL;
+        return false;
+    }
+
+    return fd;
+}
+
+void UFIO::accept(UFIOAcceptThreadChooser* ufiotChooser,
+                  unsigned short int ufLocation,
+                  void* startingArgs,
+                  void* stackPtr,
+                  unsigned int stackSize)
+{
+    if(!_uf)
+    {
+        cerr<<"no user fiber associated with accept request"<<endl;
+        return;
+    }
+    if(!ufiotChooser)
+    {
+        cerr<<"have to provide a fxn to pick the thread to assign the new task to"<<endl;
+        return;
+    }
+
+    //setup the UFIOScheduler* for this UFIO
+    UFIOScheduler* tmpUfios = _ufios;
+    if(!tmpUfios)
+    {
+        //find the ufios for this thread - this map operation should only be done once
+        ThreadFiberIOSchedulerMap::iterator index = UFIOScheduler::_tfiosscheduler.find(pthread_self());
+        if(index != UFIOScheduler::_tfiosscheduler.end())
+            tmpUfios = index->second;
+        else
+        {
+            cerr<<"couldnt find thread io scheduler for thread "<<pthread_self()<<" - please create one first and assign to the thread - current size of that info = "<<UFIOScheduler::_tfiosscheduler.size()<<endl;
+            exit(1); //TODO: may not be necessary to exit here
+        }
+    }
+
+    int acceptFd = 0;
+    struct sockaddr_in cli_addr;
+    int sizeof_cli_addr = sizeof(cli_addr);
+    UFScheduler* ufs = 0;
+    pthread_t tToAddTo = 0;
+    pair<UFScheduler*, pthread_t> result;
+    bool breakFromMainLoop = false;
+    list<UF*> listOfUFsToAdd;
+    unsigned int listOfUFsToAddSize = 0;
+    while(!breakFromMainLoop)
+    {
+        //add to the scheduler to see if there was any read activity on it
+        //also sets the _ufios of this object 
+        if(!tmpUfios->setupForAccept(this))
+        {
+            cerr<<"couldnt setup for accept - "<<strerror(errno)<<endl;
+            exit(1);
+        }
+
+        while(1)
+        {
+            errno = 0;
+            acceptFd = ::accept(_fd, (struct sockaddr *)&cli_addr, (socklen_t*)&sizeof_cli_addr);
+            if(acceptFd == 0) //hit the timeout
+                break;
+            else if(acceptFd > 0) { } //handled below
+            else if(acceptFd < 0)
+            {
+                if(errno == EINTR)
+                    continue; //try to re-read
+                else if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+                    break; //have to go back and wait for activity
+                else
+                {
+                    _errno = errno;
+                    cerr<<"error on accept call = "<<strerror(errno)<<endl;
+                    exit(1); //TODO: re-evaluate exit (could be a breakfrommainloop)
+                }
+            }
+
+            //make the new socket non-blocking
+            if(makeSocketNonBlocking(acceptFd) < 1)
+            {
+                cerr<<"couldnt make accepted socket "<<acceptFd<<" non-blocking"<<strerror(errno)<<endl;
+                ::close(acceptFd);
+                _errno = errno;
+                continue;
+            }
+
+
+            //pass the new socket created to the UF that can deal w/ the request
+            UFIOAcceptArgs* connectedArgs = new UFIOAcceptArgs();
+            connectedArgs->args = startingArgs;
+            //create the UF to handle the new fd
+            UF* uf = UFFactory::getInstance()->selectUF(ufLocation)->createUF();
+            if(!uf)
+            {
+                cerr<<"couldnt create new user fiber after accepting conns"<<endl;
+                exit(1); //TODO: check if this is necessary
+            }
+            connectedArgs->ufio = new UFIO(uf, acceptFd);
+            if(!connectedArgs->ufio)
+            {
+                cerr<<"couldnt create UFIOAcceptArgs"<<endl;
+                exit(1);
+            }
+            connectedArgs->ufio->_remoteIP = inet_ntoa(cli_addr.sin_addr);
+            connectedArgs->ufio->_remotePort = cli_addr.sin_port;
+            uf->_startingArgs = connectedArgs;
+
+            listOfUFsToAdd.push_front(uf);
+            listOfUFsToAddSize++;
+
+            if(listOfUFsToAddSize == 100)
+            {
+                //add fiber to the thread that is recommended's scheduler
+                result = ufiotChooser->pickThread(_fd);
+                ufs = result.first;
+                tToAddTo = result.second;
+                if(!ufs || !ufs->addFibersToScheduler(listOfUFsToAdd, tToAddTo))
+                {
+                    cerr<<"couldnt find thread to assign "<<acceptFd<<" or couldnt add fiber to scheduler"<<endl;
+                    exit(1);
+                }
+
+                listOfUFsToAdd.clear();
+                listOfUFsToAddSize = 0;
+            }
+        }
+
+        //add the remaining userfibers from the last iteration
+        if(listOfUFsToAddSize)
+        {
+            //add fiber to the thread that is recommended's scheduler
+            result = ufiotChooser->pickThread(_fd);
+            ufs = result.first;
+            tToAddTo = result.second;
+            if(!ufs || !ufs->addFibersToScheduler(listOfUFsToAdd, tToAddTo))
+            {
+                cerr<<"couldnt find thread to assign "<<acceptFd<<" or couldnt add fiber to scheduler"<<endl;
+                exit(1);
+            }
+            listOfUFsToAdd.clear();
+            listOfUFsToAddSize = 0;
+        }
+    }
+}
+
+ssize_t UFIO::read(void *buf, size_t nbyte, TIME_IN_US timeout)
+{
+    UFIOScheduler* tmpUfios = _ufios;
+    if(!tmpUfios)
+    {
+        //find the ufios for this thread - this map operation should only be done once
+        ThreadFiberIOSchedulerMap::iterator index = UFIOScheduler::_tfiosscheduler.find(pthread_self());
+        if(index != UFIOScheduler::_tfiosscheduler.end())
+            tmpUfios = index->second;
+        else
+        {
+            cerr<<"couldnt find thread io scheduler for thread "<<pthread_self()<<" - please create one first and assign to the thread - current size of that info = "<<UFIOScheduler::_tfiosscheduler.size()<<endl;
+            exit(1); //TODO: may not be necessary to exit here
+        }
+    }
+
+    //we read everything there was to be read the last time, so this time wait to read
+    if(!_amtReadLastTimeEqualToAskedAmt) 
+    {
+        //wait for something to read first
+        if(!tmpUfios->setupForRead(this, timeout))
+        {
+            _errno = errno;
+            return -1;
+        }
+        if(_errno == ETIMEDOUT) //setupForRead will return w/ success however it will set the errno to ETIMEDOUT if a timeout occurred
+            return -1;
+    }
+
+
+    _amtReadLastTimeEqualToAskedAmt = false;
+    ssize_t n = 0;;
+    while(1)
+    {
+        n = ::read(_fd, buf, nbyte);
+        if(n > 0)
+        {
+            _amtReadLastTimeEqualToAskedAmt = ((unsigned int) n != nbyte) ? false : true;
+            return n;
+        }
+        else if(n < 0)
+        {
+            if((errno == EAGAIN) || (errno == EWOULDBLOCK))
+            {
+                _errno = 0;
+                if(!tmpUfios->setupForRead(this, timeout))
+                {
+                    _errno = errno;
+                    return -1;
+                }
+            }
+            else if(errno == EINTR)
+                continue;
+            else
+            {
+                _errno = errno;
+                break;
+            }
+        }
+        else if(n == 0)
+            break;
+    }
+    return n;
+}
+
+ssize_t UFIO::write(const void *buf, size_t nbyte, TIME_IN_US timeout)
+{
+    UFIOScheduler* tmpUfios = _ufios;
+    if(!tmpUfios)
+    {
+        //find the ufios for this thread - this map operation should only be done once
+        ThreadFiberIOSchedulerMap::iterator index = UFIOScheduler::_tfiosscheduler.find(pthread_self());
+        if(index != UFIOScheduler::_tfiosscheduler.end())
+            tmpUfios = index->second;
+        else
+        {
+            cerr<<"couldnt find thread io scheduler for thread "<<pthread_self()<<" - please create one first and assign to the thread - current size of that info = "<<UFIOScheduler::_tfiosscheduler.size()<<endl;
+            exit(1); //TODO: may not be necessary to exit here
+        }
+    }
+
+    ssize_t n = 0;;
+    unsigned int amtWritten = 0;
+    while(1)
+    {
+        n = ::write(_fd, (char*)buf+amtWritten, nbyte-amtWritten);
+        if(n > 0)
+        {
+            amtWritten += n;
+            if(amtWritten == nbyte)
+                return amtWritten;
+            else
+                continue;
+        }
+        else if(n < 0)
+        {
+            _errno = errno;
+            if((errno == EAGAIN) || (errno == EWOULDBLOCK))
+            {
+                _errno = 0;
+                if(!tmpUfios->setupForWrite(this, timeout))
+                {
+                    _errno = errno;
+                    return -1;
+                }
+            }
+            else if(errno == EINTR)
+                continue;
+            else
+                break;
+        }
+        else if(n == 0)
+            break;
+    }
+    return n;
+}
+
+int UFIO::connect(const struct sockaddr *addr, 
+               int addrlen, 
+               TIME_IN_US timeout)
+{
+    if(!isSetup()) //create the socket and make the socket non-blocking
+        return false;
+
+
+    //find the scheduler for this request
+    UFIOScheduler* tmpUfios = _ufios;
+    if(!tmpUfios)
+    {
+        //find the ufios for this thread - this map operation should only be done once
+        ThreadFiberIOSchedulerMap::iterator index = UFIOScheduler::_tfiosscheduler.find(pthread_self());
+        if(index != UFIOScheduler::_tfiosscheduler.end())
+            tmpUfios = index->second;
+        else
+        {
+            cerr<<"couldnt find thread io scheduler for thread "<<pthread_self()<<" - please create one first and assign to the thread - current size of that info = "<<UFIOScheduler::_tfiosscheduler.size()<<endl;
+            return -1;
+        }
+    }
+
+
+    int n = 0;
+    int err = 0;
+    while(::connect(_fd, addr, addrlen) < 0)
+    {
+        _errno = errno;
+        if(errno != EINTR)
+        {
+            if((errno != EINPROGRESS || errno != EAGAIN) && 
+               (errno != EADDRINUSE || err == 0))
+                return -1;
+
+            //wait to finish the connect
+            if(!tmpUfios->setupForConnect(this, timeout))
+            {
+                cerr<<"couldnt setup for connect - "<<strerror(errno)<<endl;
+                return -1;
+            }
+
+            n = sizeof(int);
+            if (getsockopt(_fd, SOL_SOCKET, SO_ERROR, (char *)&err, (socklen_t *)&n) < 0)
+                return -1;
+            if(err)
+            {
+                _errno = err;
+                return -1;
+            }
+
+            //successful
+            break;
+        }
+    }
+
+    return 0;
+}
+
+
+ThreadFiberIOSchedulerMap UFIOScheduler::_tfiosscheduler;
+EpollUFIOScheduler::EpollUFIOScheduler(UF* uf, unsigned int maxFds)
+{
+    _uf = uf;
+    _maxFds = maxFds;
+    _epollFd = -1;
+    _epollEventStruct = 0;
+    _alreadySetup = false;
+}
+
+EpollUFIOScheduler::~EpollUFIOScheduler()
+{
+    if(_epollFd != -1)
+        close(_epollFd);
+    if(_epollEventStruct)
+        free (_epollEventStruct);
+}
+
+bool EpollUFIOScheduler::isSetup()
+{
+    if(_alreadySetup)
+        return true;
+
+    pthread_t tid = pthread_self();
+    ThreadFiberIOSchedulerMap::iterator index = _tfiosscheduler.find(tid);
+    if(index != _tfiosscheduler.end())
+    {
+        cerr<<"UFIOScheduler* "<<index->second<<" is already associated w/ thread "<<tid<<" - cannot create two schedulers w/in one thread"<<endl;
+        exit(1);
+        return false;
+    }
+    _tfiosscheduler[tid] = this;
+
+    if(_epollFd != -1 && _epollEventStruct)
+        return true;
+
+    if((_epollFd = epoll_create(_maxFds)) < 0)
+    {
+        cerr<<"couldnt create epoll object "<<strerror(errno)<<" got "<<_epollFd<<" instead"<<endl;
+        return false;
+    }
+
+    _epollEventStruct = (struct epoll_event*)malloc((sizeof (struct epoll_event))*_maxFds);
+    if(!_epollEventStruct)
+    {
+        close(_epollFd);
+        _epollFd = -1;
+    }
+
+    return (_alreadySetup = true);
+}
+
+bool EpollUFIOScheduler::addToScheduler(UFIO* ufio, void* inputInfo, TIME_IN_US to)
+{
+    if(!ufio || !inputInfo || !isSetup())
+        return false;
+
+    if(ufio->_lastEpollFlag != *((int*)inputInfo)) //dont do anything if the flags are same as last time
+    {
+        struct epoll_event ev;
+        ev.data.fd = ufio->getFd();
+        ev.events = *((int*)inputInfo);
+
+        int epollCtlOp = EPOLL_CTL_MOD;
+        if(!ufio->getUFIOScheduler()) //the first time we're running
+        {
+            //keep a record of the mapping of fd to UFIO*
+            ufio->setUFIOScheduler(this);
+            epollCtlOp = EPOLL_CTL_ADD;
+            _intUFIOMap[ufio->getFd()] = ufio;
+        }
+
+        if (epoll_ctl(_epollFd, epollCtlOp, ufio->getFd(), &ev) == -1) 
+        {
+            cerr<<"couldnt add/modify fd to epoll queue "<<strerror(errno)<<" trying to add "<<ufio->getFd()<<" to "<<_epollFd<<endl;
+            exit(1);
+            return false;
+        }
+        ufio->_lastEpollFlag = ev.events;
+    }
+
+
+    if(to) //add to the sleep queue for the epoll queue TODO
+    {
+        struct timeval now;
+        gettimeofday(&now, 0);
+        unsigned long long int timeNow = now.tv_sec*1000000+now.tv_usec;
+        UFSleepInfo* ufsi = getSleepInfo();
+        if(!ufsi)
+        {
+            cerr<<"couldnt create sleep info"<<endl;
+            exit(1);
+        }
+        ufsi->_ufio = ufio;
+        ufio->_sleepInfo = ufsi;
+        _sleepList.insert(std::make_pair((timeNow+to), ufsi));
+    }
+
+    ufio->getUF()->block(); //switch context till someone wakes me up
+    if(ufio->_sleepInfo)
+    {
+        ufio->_sleepInfo->_ufio = 0; //set the sleep indicator to not have a dependency w/ this ufio
+        ufio->_sleepInfo = 0; //remove the sleep indicator
+    }
+
+    return true;
+}
+
+bool EpollUFIOScheduler::setupForConnect(UFIO* ufio, TIME_IN_US to)
+{
+    int flags = EPOLLOUT|EPOLLET;
+    return addToScheduler(ufio, &flags, to);
+}
+
+bool EpollUFIOScheduler::setupForAccept(UFIO* ufio, TIME_IN_US to)
+{
+    int flags = EPOLLIN|EPOLLET;
+    return addToScheduler(ufio, &flags, to);
+}
+
+bool EpollUFIOScheduler::setupForRead(UFIO* ufio, TIME_IN_US to)
+{
+    int flags = EPOLLIN|EPOLLET|EPOLLPRI|EPOLLERR|EPOLLHUP;
+    return addToScheduler(ufio, &flags, to);
+}
+
+bool EpollUFIOScheduler::setupForWrite(UFIO* ufio, TIME_IN_US to)
+{
+    int flags = EPOLLOUT|EPOLLET|EPOLLPRI|EPOLLERR|EPOLLHUP;
+    return addToScheduler(ufio, &flags, to);
+}
+
+bool EpollUFIOScheduler::closeConnection(UFIO* ufio)
+{
+    if(!ufio)
+        return false;
+
+    //remove from _intUFIOMap
+    IntUFIOMap::iterator index = _intUFIOMap.find(ufio->getFd());
+    if(index != _intUFIOMap.end())
+        _intUFIOMap.erase(index);
+
+    return true;
+
+    /*
+    struct epoll_event ev;
+    ev.data.fd = ufio->getFd();
+    ev.events = 0;
+    return (epoll_ctl(_epollFd, EPOLL_CTL_DEL, ufio->getFd(), &ev) == 0) ? true : false;
+    */
+}
+
+
+#ifndef PIPE_NOT_EFD
+#include <sys/eventfd.h>
+#endif
+struct EpollNotifyStruct
+{
+    EpollNotifyStruct()
+    {
+        _ufios = 0;
+        _efd = -1;
+    }
+
+    EpollUFIOScheduler*  _ufios;
+    int                  _efd;
+};
+
+#ifdef PIPE_NOT_EFD
+const char eventFDChar = 'e';
+#else
+const eventfd_t efdIncrementor = 1;
+#endif
+struct ReadNotificationUF : public UF
+{
+    void run()
+    {
+        if(!_startingArgs)
+            return;
+
+        EpollNotifyStruct* ens = (EpollNotifyStruct*)_startingArgs;
+        EpollUFIOScheduler* ufios = ens->_ufios;
+        UFIO* ufio = new UFIO(UFScheduler::getUF());
+        ufio->setFd(ens->_efd, false);
+
+#ifdef PIPE_NOT_EFD
+        char readResult[128];
+#else
+        eventfd_t readEventFd;
+#endif
+        while(1)
+        {
+            ufios->setupForRead(ufio);
+#ifdef PIPE_NOT_EFD
+            while(1)
+            {
+                if(read(ufio->getFd(), &readResult, 127) == 127)
+                    continue;
+                break;
+            }
+#else
+            eventfd_read(ufio->getFd(), &readEventFd); //TODO: deal w/ error case later
+#endif
+            ufios->_interruptedByEventFd = true;
+        }
+
+        delete ens;
+        delete ufio;
+    }
+
+    ReadNotificationUF(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    UF* createUF() { return new ReadNotificationUF(); }
+    static ReadNotificationUF* _self;
+    static int _myLoc;
+};
+int ReadNotificationUF::_myLoc = -1;
+ReadNotificationUF* ReadNotificationUF::_self = new ReadNotificationUF(true);
+
+static void* notifyEpollFunc(void* args)
+{
+    if(!args)
+        return 0;
+#ifdef PIPE_NOT_EFD
+    write(*((int*)args), &eventFDChar, 1);
+#else
+    eventfd_write(*((int*)args), efdIncrementor); //TODO: deal w/ error case later
+#endif
+
+    return 0;
+}
+
+void EpollUFIOScheduler::waitForEvents(TIME_IN_US timeToWait)
+{
+    UFScheduler* myScheduler = UFScheduler::getUFScheduler();
+    if(!myScheduler)
+    {
+        cerr<<"have to be able to find my scheduler"<<endl;
+        return;
+    }
+    
+    //add the notification function
+    EpollNotifyStruct* ens = new EpollNotifyStruct();
+    ens->_ufios = this;
+#ifdef PIPE_NOT_EFD
+    int pfd[2];
+    if (pipe(pfd) == -1) 
+    { 
+        cerr<<"error in pipe creation = "<<strerror(errno)<<endl;
+        exit(1);
+    }
+    makeSocketNonBlocking(pfd[0]);
+    //makeSocketNonBlocking(pfd[1]); - dont make the write socket non-blocking
+    myScheduler->_notifyArgs = (void*)(&pfd[1]);
+    ens->_efd = pfd[0];
+#else
+    int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC); //TODO: check the error code of the eventfd creation
+    myScheduler->_notifyArgs = (void*)&efd;
+    ens->_efd = efd;
+#endif
+    myScheduler->_notifyFunc = notifyEpollFunc;
+    //add the UF to handle the efds calls
+    UF* eventFdFiber = UFFactory::getInstance()->selectUF(ReadNotificationUF::_myLoc)->createUF();
+    eventFdFiber->_startingArgs = ens;
+    myScheduler->addFiberToScheduler(eventFdFiber, 0);
+
+
+
+
+    if(!_uf)
+    {
+        cerr<<"have to associate an user fiber with the scheduler"<<endl;
+        return;
+    }
+    if(!isSetup())
+    {
+        cerr<<"have to be able to setup EpollUFIOScheduler "<<strerror(errno)<<endl;
+        return;
+    }
+
+    int nfds;
+    struct timeval now;
+    IntUFIOMap::iterator index;
+    UFIO* ufio = 0;
+    UF* uf = 0;
+    unsigned int amtToSleep = timeToWait;
+    int i = 0;
+    _interruptedByEventFd = false;
+    while(1)
+    {
+        if(_interruptedByEventFd) //this is so that the last interruption gets handled right away
+        {
+            _interruptedByEventFd = false;
+            _uf->yield();
+        }
+
+        nfds = ::epoll_wait(_epollFd, 
+                            _epollEventStruct, 
+                            _maxFds, 
+                            (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1)); //sleep for atleast 1ms
+        if(nfds > 0)
+        {
+            //for each of the fds that had activity activate them
+            for (i = 0; i < nfds; ++i) 
+            {
+                index = _intUFIOMap.find(_epollEventStruct[i].data.fd);
+                if(index != _intUFIOMap.end())
+                {
+                    ufio = index->second;
+                    if(!ufio || !(uf = ufio->getUF()))
+                    {
+                        cerr<<"invalid user fiber io found for fd, "<<_epollEventStruct[i].data.fd<<endl;
+                        exit(1);
+                    }
+                    //activate the fiber
+                    uf->getParentScheduler()->addFiberToScheduler(uf, 0);
+                }
+                else
+                {
+                    cerr<<"couldnt find the associated UF* for fd, "<<_epollEventStruct[i].data.fd<<endl;
+                    exit(1);
+                }
+            }
+        }
+        else if(nfds < 0)
+        {
+            if(errno == EINTR)
+                continue;
+            cerr<<"error w/ epoll wait "<<strerror(errno)<<endl;
+            exit(1);
+        }
+
+        amtToSleep = timeToWait;
+        //pick up the fibers that may have completed sleeping
+        //look into the sleep list;
+        if(!_sleepList.empty())
+        {
+            gettimeofday(&now, 0);
+            unsigned long long int timeNow = (now.tv_sec*1000000)+now.tv_usec;
+            for( MapTimeUFIO::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
+            {
+                //1. see if anyone has crossed the sleep timer - add them to the active list
+                if(beg->first <= timeNow) //sleep time is over
+                {
+                    UFSleepInfo* ufsi = beg->second;
+                    if(ufsi)
+                    {
+                        UFIO* ufio = ufsi->_ufio;
+                        if(ufio &&
+                           ufio->_sleepInfo == ufsi &&  //make sure that the ufio is not listening on another sleep counter right now
+                           ufio->_uf->_status == BLOCKED) //make sure that the uf hasnt been unblocked already
+                        {
+                            ufio->_sleepInfo = 0;
+                            ufio->_errno = ETIMEDOUT;
+                            ufio->_uf->getParentScheduler()->addFiberToScheduler(ufio->_uf, 0);
+                            //this is so that we dont have to wait to handle the conn. being woken up
+                            _interruptedByEventFd = true;
+                       }
+
+                       releaseSleepInfo(*ufsi);
+                    }
+
+                    _sleepList.erase(beg);
+                    beg = _sleepList.begin();
+                    continue;
+                }
+                else
+                {
+                    amtToSleep = (amtToSleep > beg->first-timeNow) ? beg->first-timeNow : amtToSleep;
+                    break;
+                }
+                ++beg;
+            }
+        }
+
+        //take a break - let the active conns. get a chance to run
+        _uf->yield();
+    }
+
+    myScheduler->_notifyArgs = 0;
+    myScheduler->_notifyFunc = 0;
+}

Added: trafficserver/traffic/branches/UserFiber/UFIO.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UFIO.H?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFIO.H (added)
+++ trafficserver/traffic/branches/UserFiber/UFIO.H Fri May  7 21:10:35 2010
@@ -0,0 +1,206 @@
+#ifndef IO_USER_THREADS_H
+#define IO_USER_THREADS_H
+
+#include <string>
+#include <ext/hash_map>
+#include <stdint.h>
+#include "UF.H"
+
+using namespace std;
+namespace std { using namespace __gnu_cxx; }
+
+
+
+typedef unsigned long long int TIME_IN_US;
+
+
+
+struct UFIOAcceptThreadChooser
+{
+    virtual std::pair<UFScheduler*,pthread_t> pickThread(int listeningFd) = 0;
+    virtual ~UFIOAcceptThreadChooser() {}
+};
+
+
+struct UFIO;
+struct UFSleepInfo
+{
+    UFSleepInfo() { _ufio = 0; }
+    UFIO*           _ufio;
+};
+
+
+struct UFIOAcceptArgs
+{
+    UFIOAcceptArgs() { args = 0; ufio = 0; }
+    void* args;
+    UFIO* ufio;
+};
+
+
+
+
+struct UFIOScheduler;
+struct UFIO
+{
+    friend class UFIOScheduler;
+    friend class EpollUFIOScheduler;
+    UFIO(UF* uf, int fd = -1);
+    ~UFIO();
+    bool isSetup(bool makeNonBlocking = true);
+
+
+    static int setupConnectionToAccept(
+                    const char* interface_addr, 
+                    unsigned short int port, 
+                    unsigned short int backlog = 16000, 
+                    bool makeSockNonBlocking = true);
+    //the fd must have been created using socket + bind + listen 
+    //before passing into this function. It also has to be marked as non-blocking
+    //call setupConnectionToAccept first - that will prepare the socket to accept conns
+    //the fxn will not return
+    //TODO: create ipv6 accept model
+    void accept(UFIOAcceptThreadChooser* ufiotChooser,
+                unsigned short int ufLocation,
+                void* startingArgs,
+                void* stackPtr = 0,
+                unsigned int stackSize = 0);
+
+    //The fxn will call isSetup which will make the connection request non-blocking and setup the socket
+    //TODO: support ipv6 connect
+    int connect(const struct sockaddr *addr, int addrlen, TIME_IN_US timeout);
+
+    //since this is most likely going to be run on an edge trigger system
+    //reads should be done in such a manner so that if the read call
+    //returns back w/ the same number specified, then a following read call
+    //should be made to ensure that nothing is left in the network buffer
+    //since in edge trigger no more events will be generated for data that was
+    //already seen earlier as being in the network. Therefore always make 
+    //another call to this fxn if the return was == to nbyte
+    ssize_t read(void *buf, size_t nbyte, TIME_IN_US timeout = 0);
+    ssize_t write(const void *buf, size_t nbyte, TIME_IN_US timeout = 0);
+    bool close();
+
+    bool setFd(int fd, bool makeNonBlocking = true);
+
+
+    unsigned int getErrno() const;
+    int getFd() const;
+    UF* getUF() const;
+
+    const std::string& getRemoteIP() const;
+    unsigned int getRemotePort() const;
+
+    UFIOScheduler* getUFIOScheduler() const;
+    void setUFIOScheduler(UFIOScheduler* ufios);
+
+    UFSleepInfo*                _sleepInfo;
+protected:
+    int                         _fd;
+    unsigned int                _errno;
+    UF*                         _uf;
+    UFIOScheduler*              _ufios;
+
+    UFIO() { reset(); }
+    void reset();
+
+    std::string                 _remoteIP;
+    unsigned int                _remotePort;
+
+    int                         _lastEpollFlag;
+    bool                        _amtReadLastTimeEqualToAskedAmt;
+};
+inline unsigned int UFIO::getErrno() const { return _errno; }
+inline int UFIO::getFd() const { return _fd; }
+inline UF* UFIO::getUF() const { return _uf; }
+inline UFIOScheduler* UFIO::getUFIOScheduler() const { return _ufios; }
+inline void UFIO::setUFIOScheduler(UFIOScheduler* ufios) { _ufios = ufios; }
+inline const std::string& UFIO::getRemoteIP() const { return _remoteIP; };
+inline unsigned int UFIO::getRemotePort() const { return _remotePort; };
+
+
+
+struct UFIOScheduler;
+//typedef map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
+typedef hash_map<pthread_t, UFIOScheduler*, hash<uintptr_t> > ThreadFiberIOSchedulerMap;
+struct UFIOScheduler
+{
+    UFIOScheduler() {}
+    virtual ~UFIOScheduler() {}
+
+    virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0) = 0;
+    virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0) = 0;
+    virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = 0) = 0;
+    virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0) = 0;
+    virtual bool closeConnection(UFIO* ufio) = 0;
+
+    virtual bool isSetup() { return false; }
+    virtual void waitForEvents(TIME_IN_US timeToWait) = 0;
+
+    static ThreadFiberIOSchedulerMap _tfiosscheduler;
+};
+
+
+#define MAX_FDS_FOR_EPOLL 64*1024-1
+//typedef map<int, UFIO*> IntUFIOMap;
+typedef hash_map<int, UFIO*, hash<int> > IntUFIOMap;
+typedef std::multimap<TIME_IN_US, UFSleepInfo*> MapTimeUFIO;
+struct EpollUFIOScheduler : public UFIOScheduler
+{
+    EpollUFIOScheduler(
+           UF* uf /*the uf that the scheduler will run on to handle this UFIO*/, 
+           unsigned int maxFds = MAX_FDS_FOR_EPOLL);
+    ~EpollUFIOScheduler();
+    bool isSetup(); //call after the c'tor to verify that the structure is correctly setup
+
+    //inputInfo is the flags info (its an int*) for this addition
+    bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0);
+    bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0);
+    bool setupForRead(UFIO* ufio, TIME_IN_US to = 0);
+    bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0);
+    bool closeConnection(UFIO* ufio);
+
+
+    void waitForEvents(TIME_IN_US timeToWait = -1);
+
+    bool                            _interruptedByEventFd;
+
+protected:
+    UF*                 _uf;
+    int                 _epollFd;
+    unsigned int        _maxFds;
+    struct epoll_event* _epollEventStruct;
+    IntUFIOMap          _intUFIOMap;
+    bool                _alreadySetup;
+
+
+    MapTimeUFIO         _sleepList;
+
+    bool addToScheduler(UFIO* ufio, 
+                        void* inputInfo /*flags to identify how ot add*/, 
+                        TIME_IN_US to = 0);
+
+
+    list<UFSleepInfo*>  _availableSleepInfo;
+    UFSleepInfo* getSleepInfo();
+    void releaseSleepInfo(UFSleepInfo& ufsi);
+};
+
+inline UFSleepInfo* EpollUFIOScheduler::getSleepInfo()
+{
+    if(!_availableSleepInfo.empty())
+    {
+        UFSleepInfo* ufsi = _availableSleepInfo.front();
+        _availableSleepInfo.pop_front();
+        return ufsi;
+    }
+
+    return new UFSleepInfo();
+}
+
+inline void EpollUFIOScheduler::releaseSleepInfo(UFSleepInfo& ufsi)
+{
+    _availableSleepInfo.push_back(&ufsi);
+}
+
+#endif

Added: trafficserver/traffic/branches/UserFiber/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UFServer.C?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFServer.C (added)
+++ trafficserver/traffic/branches/UserFiber/UFServer.C Fri May  7 21:10:35 2010
@@ -0,0 +1,303 @@
+#include <iostream>
+#include <errno.h>
+#include <string.h>
+#include "UFServer.H"
+#include "UFServer.H"
+
+#include <sys/types.h>
+#include <sys/wait.h>
+#include "UFStatSystem.H"
+
+using namespace std;
+
+//TODO: handle signals later
+//TODO: create monitoring port later
+//
+
+void UFServer::reset()
+{
+    _addressToBindTo = "0";
+    _listenFd = -1;
+    _port = 0;
+    _creationTime = 0;
+
+    MAX_THREADS_ALLOWED = 8;
+    MAX_PROCESSES_ALLOWED = 1;
+    MAX_ACCEPT_THREADS_ALLOWED = 1;
+    UF_STACK_SIZE = 8192;
+
+    _threadChooser = 0;
+}
+
+UFServer::UFServer()
+{
+    reset();
+}
+
+struct NewConnUF : public UF
+{
+    void run()
+    {
+        if(!_startingArgs)
+            return;
+
+        UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
+        ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
+
+        //clear the client connection
+        delete fiberStartingArgs->ufio;
+        //clear the arguments
+        delete fiberStartingArgs;
+        //the UF itself will be cleared by the scheduler
+    }
+    NewConnUF(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    UF* createUF() { return new NewConnUF(); }
+    static NewConnUF* _self;
+    static int _myLoc;
+};
+int NewConnUF::_myLoc = -1;
+NewConnUF* NewConnUF::_self = new NewConnUF(true);
+
+struct AcceptRunner : public UF
+{
+    void run()
+    {
+        if(!_startingArgs)
+            return;
+        UFServer* ufserver = (UFServer*) _startingArgs;
+
+        //add the scheduler for this 
+        UFIO* ufio = new UFIO(UFScheduler::getUF());
+        int fd = ufserver->getListenFd();
+        if(fd == -1)
+            fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), ufserver->getPort() /*, deal w/ backlog*/);
+        if(fd < 0)
+        {
+            cerr<<"couldnt setup listen socket"<<endl;
+            exit(1);
+        }
+        if(!ufio || !ufio->setFd(fd, false/*has already been made non-blocking*/))
+        {
+            cerr<<"couldnt setup accept thread"<<endl;
+            return;
+        }
+        ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, ufserver, 0, 0);
+    }
+    AcceptRunner(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    UF* createUF() { return new AcceptRunner(); }
+    static AcceptRunner* _self;
+    static int _myLoc;
+};
+int AcceptRunner::_myLoc = -1;
+AcceptRunner* AcceptRunner::_self = new AcceptRunner(true);
+
+static void* acceptThreadStart(void* args)
+{
+    if(!args)
+        return 0;
+
+    UFServer* ufserver = (UFServer*) args;
+
+    UFScheduler ufs;
+    //add the io scheduler
+    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+    //add the accept port fiber
+    UF* uf = UFFactory::getInstance()->selectUF(AcceptRunner::_myLoc)->createUF();
+    if(!uf)
+        return 0;
+    uf->_startingArgs = args;
+    ufs.addFiberToScheduler(uf);
+
+    ufserver->addThread("ACCEPT", 0);
+    ufs.runScheduler();
+    return 0;
+}
+
+static void* ioThreadStart(void* args)
+{
+    if(!args)
+        return 0;
+
+    UFScheduler ufs;
+    //add the io scheduler
+    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+
+    ((UFServer*) args)->addThread("NETIO", &ufs);
+    ufs.runScheduler();
+    return 0;
+}
+
+void UFServer::startThreads()
+{
+    preThreadCreation();
+
+    MAX_THREADS_ALLOWED = (MAX_THREADS_ALLOWED ? MAX_THREADS_ALLOWED : 1);
+    MAX_ACCEPT_THREADS_ALLOWED = (MAX_ACCEPT_THREADS_ALLOWED ? MAX_ACCEPT_THREADS_ALLOWED : 1);
+
+    pthread_t* thread = new pthread_t[MAX_THREADS_ALLOWED+MAX_ACCEPT_THREADS_ALLOWED];
+    pthread_attr_t attr;
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+    //start the IO threads
+    unsigned int i = 0;
+    for(; i<MAX_THREADS_ALLOWED; i++)
+    {
+        if(pthread_create(&(thread[i]), &attr, ioThreadStart, this) != 0)
+        {
+            cerr<<"couldnt create thread "<<strerror(errno)<<endl;
+            exit(1);
+        }
+        usleep(500); //TODO: avoid the need for threadChooser to have a mutex
+    }
+
+    //start the stats thread
+    UFStatSystem::init(this);
+
+    usleep(1000); //wait before starting the accept thread //TODO: change to cond signal later
+
+    preAccept();
+
+    //start the accept thread
+    for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
+    {
+        if(pthread_create(&(thread[i]), &attr, acceptThreadStart, this) != 0)
+        {
+            cerr<<"couldnt create accept thread "<<strerror(errno)<<endl;
+            exit(1);
+        }
+    }
+
+    //wait for the threads to finish
+    void* status;
+    for(i=0; i<MAX_THREADS_ALLOWED+MAX_ACCEPT_THREADS_ALLOWED; i++)
+        pthread_join(thread[i], &status);
+
+    delete [] thread;
+}
+
+void UFServer::run()
+{
+    preForkRun();
+
+    if(!_threadChooser)
+        _threadChooser = new UFServerThreadChooser();
+
+    //bind to the socket (before the fork
+    _listenFd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), _port); //TODO:set the backlog
+    if(_listenFd < 0)
+    {
+        cerr<<"couldnt setup listen socket"<<endl;
+        exit(1);
+    }
+
+    if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes
+    {
+        preThreadRun();
+        startThreads();
+        return;
+    }
+
+    //fork children
+    while(1)
+    {
+        while (getProcessCount() < MAX_PROCESSES_ALLOWED)
+        {
+            preBetweenFork();
+            unsigned int pid = fork();
+            if(pid < 0)
+            {
+                cerr<<"("<<getpid()<<")(P): couldnt create child# : "<<strerror(errno)<<endl;
+                exit(1);
+            }
+            if(!pid) //child listens to conns
+            {
+                //TODO: DAEMONIZE LATER
+                _creationTime = time(0);
+
+                //now start
+                postForkPreRun();
+                preThreadRun();
+                startThreads();
+                exit(0);
+            }
+            _childProcesses[pid] = time(0);
+            postBetweenFork(pid);
+        }
+
+
+        int child_exit_status;
+        int child_pid = waitpid(-1, &child_exit_status, WNOHANG);
+        if(child_pid == 0) { }
+        else if(child_pid < 0)
+        {
+            if(errno != ECHILD)
+                cerr<<"("<<getpid()<<")(P): waitpid error: "<<strerror(errno)<<endl;
+        }
+        else if(child_pid > 0)
+        {
+            cerr<<"("<<getpid()<<")(P): child_pid "<<child_pid<<" died "<<endl;
+            map<int, time_t>::iterator itr = _childProcesses.find(child_pid);
+            if(itr != _childProcesses.end()) 
+                _childProcesses.erase(itr);
+            //parentChildDeathHandler(child_pid); TODO
+        }
+
+        //we've been asked to bail
+        if(MAX_PROCESSES_ALLOWED && !_childProcesses.size())
+            break;
+
+        //let the parent rest
+        usleep(500000);
+    }
+}
+
+void UFServer::addThread(const std::string& type, UFScheduler* ufScheduler)
+{
+    pthread_t tid = pthread_self();
+    StringThreadMapping::iterator index = _threadList.find(type);
+    if(index == _threadList.end())
+    {
+        _threadList[type] = new std::vector<pthread_t>;
+        index = _threadList.find(type);
+        if(index == _threadList.end())
+            return;
+    }
+    index->second->push_back(tid);
+
+    if(ufScheduler)
+        _threadChooser->add(ufScheduler, tid);
+}
+
+vector<pthread_t>* UFServer::getThreadType(const string& type)
+{ 
+    StringThreadMapping::iterator index = _threadList.find(type);
+    if(index == _threadList.end())
+        return 0;
+
+    return index->second;
+}
+
+
+int IORunner::_myLoc = -1;
+IORunner* IORunner::_self = new IORunner(true);
+void IORunner::run()
+{
+    UF* uf = UFScheduler::getUF();
+    //add the scheduler for this 
+    EpollUFIOScheduler* ioRunner = new EpollUFIOScheduler(uf, 10000); //TODO: support other event scheduler mechanisms later
+    if(!ioRunner || !ioRunner->isSetup())
+    {
+        cerr<<"couldnt setup epoll io scheduler object"<<endl;
+        return;
+    }
+    ioRunner->waitForEvents(1000000); //TODO: allow to change the epoll interval later
+}

Added: trafficserver/traffic/branches/UserFiber/UFServer.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UFServer.H?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFServer.H (added)
+++ trafficserver/traffic/branches/UserFiber/UFServer.H Fri May  7 21:10:35 2010
@@ -0,0 +1,118 @@
+#ifndef UFSERVER_H
+#define UFSERVER_H
+
+#include <pthread.h>
+#include <map>
+#include <vector>
+
+#include "UF.H"
+#include "UFIO.H"
+
+typedef std::map<std::string, std::vector<pthread_t>* > StringThreadMapping;
+struct UFServerThreadChooser;
+class UFServer
+{
+public:
+    UFServer();
+    virtual ~UFServer() {}
+
+    //handle the newly created connection
+    virtual void handleNewConnection(UFIO* acceptedConn) = 0;
+
+    //setters and getters
+    unsigned int MAX_ACCEPT_THREADS_ALLOWED;
+    unsigned int MAX_THREADS_ALLOWED;
+    unsigned int MAX_PROCESSES_ALLOWED;
+    unsigned int getProcessCount() const { return _childProcesses.size(); }
+    unsigned int UF_STACK_SIZE;
+    const char* getBindingInterface() const { return _addressToBindTo.c_str() ; }
+    unsigned int getPort() const { return _port ; }
+    unsigned int getListenFd() const { return _listenFd; }
+    unsigned int setListenFd(int fd) { return (_listenFd = fd); }
+
+
+    //functions that are allowed to be modified by the 
+    //inherited class to provide customizable functionalities 
+    virtual void preForkRun() {}
+    virtual void preBetweenFork() {}
+    virtual void postBetweenFork(int childPid) {}
+    virtual void postForkPreRun() {}
+    virtual void preThreadRun() {}
+    virtual void preThreadCreation() {}
+    virtual void preAccept() {}
+
+
+
+    /* TODO:
+    virtual void parentChildDeathHandler(int childPid) {}
+    */
+    UFServerThreadChooser*          _threadChooser;
+
+    StringThreadMapping* getThreadList() { return &_threadList; };
+    std::vector<pthread_t>* getThreadType(const std::string& type);
+    void addThread(const std::string& type, UFScheduler* ufScheduler);
+    void run();
+
+protected:
+    void reset();
+
+    std::string                     _addressToBindTo;
+    unsigned int                    _port;
+    int                             _listenFd;
+    unsigned int                    _listenBackLog;
+
+    time_t                          _creationTime;
+
+    std::map<int, time_t>           _childProcesses;
+    StringThreadMapping             _threadList;
+
+    //TODO: bool setupParentSignals();
+    //TODO: bool setupChildrenSignals();
+    //TODO: bool setupMonitoringThread();
+
+    void startThreads();
+
+private:
+    //start processing
+};
+
+struct UFServerThreadChooser : public UFIOAcceptThreadChooser
+{
+    UFServerThreadChooser() { }
+
+    std::pair<UFScheduler*, pthread_t> pickThread(int listeningFd);
+    void add(UFScheduler* ufs, pthread_t tid)
+    {
+        _threadList.push_back(make_pair(ufs, tid));
+    }
+
+protected:
+    std::vector<pair<UFScheduler*, pthread_t> > _threadList;
+};
+
+inline std::pair<UFScheduler*, pthread_t> UFServerThreadChooser::pickThread(int listeningFd)
+{
+    static unsigned int lastLocUsed = 0;
+    if(!_threadList.size())
+    {
+        cerr<<"there has to be some fabric to hand the request to"<<endl;
+        exit(1);
+    }
+
+    return _threadList[lastLocUsed++%(_threadList.size())];
+}
+
+struct IORunner : public UF
+{
+    void run();
+    IORunner(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    UF* createUF() { return new IORunner(); }
+    static IORunner* _self;
+    static int _myLoc;
+};
+
+#endif



Mime
View raw message