trafficserver-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aku...@apache.org
Subject svn commit: r985387 [3/3] - in /trafficserver/traffic/branches/UserFiber: core/include/ core/src/ protocol/ samples/
Date Fri, 13 Aug 2010 22:14:32 GMT
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFServer.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFServer.C Fri Aug 13 22:14:31 2010
@@ -1,16 +1,13 @@
 #include <iostream>
 #include <errno.h>
 #include <string.h>
+#include "UFServer.H"
+#include "UFServer.H"
+
 #include <sys/types.h>
 #include <sys/wait.h>
-#include <deque>
-#include <list>
-
-#include <UF.H>
-#include <UFStatSystem.H>
-#include <UFStats.H>
-#include <UFServer.H>
-#include <UFConf.H>
+#include "UFStatSystem.H"
+#include "UFStats.H"
 
 using namespace std;
 
@@ -18,15 +15,31 @@ using namespace std;
 //TODO: create monitoring port later
 //
 //
+static string getPrintableTime()
+{
+    char asctimeDate[32];
+    asctimeDate[0] = '\0';
+    time_t now = time(0);
+    asctime_r(localtime(&now), asctimeDate);
+
+    string response = asctimeDate;
+    size_t loc = response.find('\n');
+    if(loc != string::npos)
+        response.replace(loc, 1, "");
+    return response;
+}
+
 void UFServer::reset()
 {
     _addressToBindTo = "0";
-    _listenSockets.clear();
+    _listenFd = -1;
+    _port = 0;
     _creationTime = 0;
 
     MAX_THREADS_ALLOWED = 8;
     MAX_PROCESSES_ALLOWED = 1;
     MAX_ACCEPT_THREADS_ALLOWED = 1;
+    UF_STACK_SIZE = 8192;
 
     _threadChooser = 0;
 }
@@ -44,13 +57,9 @@ struct NewConnUF : public UF
             return;
 
         UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
-
+        ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
         // increment connections handled stat
         UFStatSystem::increment(UFStats::connectionsHandled);
-        // Keep track of current connections
-        UFStatSystem::increment(UFStats::currentConnections);
-        ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
-        UFStatSystem::increment(UFStats::currentConnections, -1);
 
         //clear the client connection
         delete fiberStartingArgs->ufio;
@@ -80,22 +89,21 @@ struct AcceptRunner : public UF
 
         //add the scheduler for this 
         UFIO* ufio = new UFIO(UFScheduler::getUF());
-        if (socket.fd == -1)
-        {
-            socket.fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(),
socket.port /*, deal w/ backlog*/);
-        }
-        if (socket.fd < 0)
+        int fd = ufserver->getListenFd();
+        if(fd == -1)
+            fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), ufserver->getPort()
/*, deal w/ backlog*/);
+        if(fd < 0)
         {
             cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup
listen socket"<<endl;
             exit(1);
         }
-        if(!ufio || !ufio->setFd(socket.fd, false/*has already been made non-blocking*/))
+        if(!ufio || !ufio->setFd(fd, false/*has already been made non-blocking*/))
         {
             cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup
accept thread"<<endl;
             return;
         }
 
-        ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, socket.port, ufserver,
0, 0);
+        ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, ufserver, 0, 0);
     }
     AcceptRunner(bool registerMe = false)
     {
@@ -105,39 +113,10 @@ struct AcceptRunner : public UF
     UF* createUF() { return new AcceptRunner(); }
     static AcceptRunner* _self;
     static int _myLoc;
-    UFServer::ListenSocket socket;
 };
 int AcceptRunner::_myLoc = -1;
 AcceptRunner* AcceptRunner::_self = new AcceptRunner(true);
 
-struct PerThreadInitializer : public UF
-{
-    void run()
-        {
-            if(!_startingArgs)
-                return;
-// Add conf manager for thread
-            UFConfManager *confManager = new UFConfManager;
-            int ret = pthread_setspecific(UFConfManager::threadSpecificKey, confManager);
-            cerr << getpid() << ":::Adding thread specific UFConfManager key
" << UFConfManager::threadSpecificKey << " " << confManager << " "
<< ret << ", tid : " << pthread_self() << endl;
-
-            UFServer *_server = (UFServer *)_startingArgs;
-            _server->postThreadCreation();
-        }
-
-    UF* createUF() { return new PerThreadInitializer(); }
-
-    PerThreadInitializer(bool registerMe = false)
-    {
-        if(registerMe)
-            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
-    }
-    static PerThreadInitializer* _self;
-    static int _myLoc;
-};
-int PerThreadInitializer::_myLoc = -1;
-PerThreadInitializer* PerThreadInitializer::_self = new PerThreadInitializer(true);
-
 void UFServer::startThreads()
 {
     preThreadCreation();
@@ -150,14 +129,7 @@ void UFServer::startThreads()
     //start the IO threads
     for(; i<MAX_THREADS_ALLOWED; i++)
     {
-        list<UF*>* ufsToAdd = new list<UF*>();
-        
-        PerThreadInitializer *pti = new PerThreadInitializer;
-        pti->_startingArgs = this;
-        ufsToAdd->push_back(pti);
-        
-        UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
-
+        UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>());
         cerr<<getPrintableTime()<<" "<<getpid()<<": created thread
(with I/O) - "<<thread[i]<<endl;
         usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to
cond. var later
         //add the io threads to the thread chooser
@@ -180,19 +152,10 @@ void UFServer::startThreads()
     //start the accept thread
     for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
     {
+        AcceptRunner* ar = new AcceptRunner();
+        ar->_startingArgs = this;
         list<UF*>* ufsToAdd = new list<UF*>();
-        for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end();
++iter)
-        {
-            AcceptRunner* ar = new AcceptRunner();
-            ar->_startingArgs = this;
-            ar->socket = *iter;
-            ufsToAdd->push_back(ar);
-        }
-            
-        PerThreadInitializer *pti = new PerThreadInitializer();
-        pti->_startingArgs = this;
-        ufsToAdd->push_back(pti);
-        
+        ufsToAdd->push_back(ar);
         UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
         usleep(5000); //TODO: let the thread finish initializing 
         addThread("ACCEPT", 0, thread[i]);
@@ -215,17 +178,14 @@ void UFServer::run()
     if(!_threadChooser)
         _threadChooser = new UFServerThreadChooser();
 
-    for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end();
++iter)
+    //bind to the socket (before the fork
+    _listenFd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), _port); //TODO:set
the backlog
+    if(_listenFd < 0)
     {
-        //bind to the socket (before the fork)
-        iter->fd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), iter->port);
//TODO:set the backlog
-        if(iter->fd < 0)
-        {
-            cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup
listen socket "<<strerror(errno)<<endl;
-            exit(1);
-        }
+        cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup
listen socket "<<strerror(errno)<<endl;
+        exit(1);
     }
-    
+
     if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes (or to only run in threaded
mode)
     {
         preThreadRun();
@@ -254,7 +214,6 @@ void UFServer::run()
                 postForkPreRun();
                 preThreadRun();
                 startThreads();
-                postThreadRun();
                 exit(0);
             }
             cerr<<getPrintableTime()<<" "<<getpid()<<": (P): started
child process: "<<pid<<endl;

Propchange: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
            ('svn:mergeinfo' removed)

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C Fri Aug 13 22:14:31 2010
@@ -1,16 +1,15 @@
 #include <string.h>
 #include <stdio.h>
-#include <UFStatSystem.H>
-#include <UF.H>
-#include <UFIO.H>
-#include <UFServer.H>
+#include "UFStatSystem.H"
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
 #include <iostream>
 #include <errno.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <arpa/inet.h>
 
-using namespace std;
 UFServer *UFStatSystem::server;
 
 std::map<std::string, uint32_t> UFStatSystem::stat_name_to_num;
@@ -127,8 +126,9 @@ bool UFStatSystem::get_current(uint32_t 
 bool UFStatSystem::get_current(const char *stat_name, long long *stat_val)
 {
     uint32_t stat_num;
-    if(!getStatNum(stat_name, stat_num))
+    if(!getStatNum(stat_name, stat_num)) {
         return false;
+    }
     return get_current(stat_num, stat_val);
 }
 
@@ -349,9 +349,8 @@ void StatCommandProcessing::run()
     char readbuf[1024];
     std::string readData;
     
-    while(1) 
-    {
-        if((readbytes = ufio->read(readbuf, 1023, 60000000)) <= 0)
+    while(1) {
+        if((readbytes = ufio->read(readbuf, 1023, 0)) <= 0)
             break;
 
         readData.append(readbuf, readbytes);
@@ -359,26 +358,25 @@ void StatCommandProcessing::run()
         if(readData.find("\r\n") == string::npos)
            continue;
            
-        if(readData.find("stats_current") != string::npos) 
-        {
+        if(readData.find("stats_current") != string::npos) {
             // Force a collect before printing out the stats
             UFStatSystem::collect();
             std::stringstream printbuf;
             UFStatCollector::printStats(printbuf);
-            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
                 //failed write, break to close connection
                 break;
+            }
         }
-        else if (readData.find("stats") != string::npos) 
-        {
+        else if (readData.find("stats") != string::npos) {
             std::stringstream printbuf;
             UFStatCollector::printStats(printbuf);
-            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
                     //failed write, break to close connection
                 break;
+            }
         }
-        else if (readData.find("stat ") != string::npos || readData.find("stat_current ")
!= string::npos) 
-        {
+        else if (readData.find("stat ") != string::npos || readData.find("stat_current ")
!= string::npos) {
             std::vector<std::string> stats;
             char stat_name[MAX_STAT_NAME_LENGTH];
             bzero(stat_name, MAX_STAT_NAME_LENGTH);
@@ -389,49 +387,49 @@ void StatCommandProcessing::run()
             bool get_current = false;
             if(readData.find("stat ") != string::npos)
                 start += strlen("stat ");
-            else 
-            {
+            else {
                 start += strlen("stat_current ");
                 get_current = true;
             }
             
-            while(sscanf(start, "%s%n", stat_name, &next) == 1) 
-            {
+            while(sscanf(start, "%s%n", stat_name, &next) == 1) {
                 // Prefix support
                 char *prefix_end = strchr(start, '*');
-                if(prefix_end != NULL) 
-                {
+                if(prefix_end != NULL) {
                     std::string prefix;
                     prefix.assign(start, prefix_end-start);
                     // Get all stats with the prefix
                     UFStatCollector::getStatsWithPrefix(prefix, stats);
                 }
-                else
+                else {
                     stats.push_back(stat_name);
+                }
                 bzero(stat_name, MAX_STAT_NAME_LENGTH);
                 start+=next;
             }
             std::stringstream printbuf;
             
             UFStatCollector::printStats(stats, printbuf, get_current);
-            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
                 //failed write, break to close connection
                 break;
+            }
         }
-        else if (readData.find("help") != string::npos) 
-        {
-            if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)
+        else if (readData.find("help") != string::npos) {
+            if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1) {
                 //failed write, break to close connection
                 break;
+            }
         }
-        else if (readData.find("quit") != string::npos) 
+        else if (readData.find("quit") != string::npos) {
             break;
-        else 
-        {
+        }
+        else {
             if ((ufio->write(cmdUnrec, sizeof(cmdUnrec)-1) == -1) ||
-                (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1))
+                (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)) {
                 //failed write, break to close connection
                 break;
+            }
         }
         readData.clear();
     } // END while loop
@@ -446,24 +444,12 @@ int StatCommandListenerRun::_myLoc = -1;
 StatCommandListenerRun* StatCommandListenerRun::_self = new StatCommandListenerRun(true);
 void StatCommandListenerRun::run()
 {
-    int fd;
-    unsigned int counter = 0;
-    do
+    int fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
+    if(fd < 0)
     {
-        UFStatCollector::_statCommandPort += counter;
-        fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
-        if(fd < 0)
-        {
-            cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
-            if(counter++ == 50) //try upto 50 times
-                return;
-            continue;
-        }
-        else
-            break;
-    } while(1);
-    cerr<<"setup stat command port at "<<UFStatCollector::_statCommandPort;
-
+        cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
+        return;
+    }
 
     UFIO* ufio = new UFIO(UFScheduler::getUF());
     if(!ufio)
@@ -475,7 +461,7 @@ void StatCommandListenerRun::run()
     ufio->setFd(fd, false);
 
     StatThreadChooser ufiotChooser;
-    ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, UFStatCollector::_statCommandPort,
0, 0);
+    ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, 0, 0, 0);
 }
 
 void* UFStatCollector::scheduler(void *args)
@@ -483,66 +469,70 @@ void* UFStatCollector::scheduler(void *a
     if(!args)
         return 0;
 
-    cerr<<getPrintableTime()<<" "<<getpid()<<": created stats thread
(with I/O) - "<<pthread_self()<<endl;
     // add jobs to scheduler
     UFScheduler ufs;
 
-    //insertion has to be done in a LIFO (stack) manner
     // stat collector
     ufs.addFiberToScheduler(new CollectorRunner());
-    // stat command listener port
-    ufs.addFiberToScheduler(new StatCommandListenerRun());
-    ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
-    // io scheduler
-    ufs.addFiberToScheduler(new IORunner());
-    
+
     // set thread for stat command listener to run on
     StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
     
+    // io scheduler
+    ufs.addFiberToScheduler(new IORunner());
+    
+    // stat command listener
+    ufs.addFiberToScheduler(new StatCommandListenerRun());
+    ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
     ufs.runScheduler();
     return 0;
 }
 
 //----------------------------------------------------------
-void UFStatCollector::printStats(std::stringstream &printbuf) 
-{
-    printbuf <<  "Cache stats: \n"
+void UFStatCollector::printStats(std::stringstream &printbuf) {
+   printbuf <<  "Cache stats: \n"
                 "-----------------------------------------------------------------------------\n";
-    printbuf << "TIME " << _startTime <<"\n";
+   
+  printbuf << "TIME " << _startTime <<"\n";
 
-    UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
-    UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
-    statsMutex.lock(running_user_fiber);
+  UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+  UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+  statsMutex.lock(running_user_fiber);
   
-    for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
-        it != UFStatSystem::global_stats.end(); it++) 
-        printbuf << "STAT " << it->name << " " << it->value
<< "\n";
-    statsMutex.unlock(running_user_fiber);
+  for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
+      it != UFStatSystem::global_stats.end(); it++) {
+      if(it->value != 0 ) {
+          printbuf << "STAT " << it->name << " " << it->value
<< "\n";
+      }
+  }
+  statsMutex.unlock(running_user_fiber);
 
-    printbuf << "END\n";
+  printbuf << "END\n";
 }
 
-void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool
current) 
-{
+void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool
current) {
+    // Print only non zero stats
     long long stat_val = 0;
     bool stat_get_status;
-    if(current)
+    if(current) {
         stat_get_status = UFStatSystem::get_current(stat_name, &stat_val);
-    else
+    }
+    else {
         stat_get_status = UFStatSystem::get(stat_name, &stat_val);
+    }
     
-    //if(stat_get_status && stat_val != 0) {
-    if(stat_get_status)
+    if(stat_get_status && stat_val != 0) {
         printbuf << "STAT " << stat_name << " " << stat_val <<
"\n";
+    }
 }
 
-void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream
&printbuf, bool current)
-{
-    printbuf << "TIME " << _startTime <<"\n";
+void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream
&printbuf, bool current) {
+  printbuf << "TIME " << _startTime <<"\n";
     for(std::vector<std::string>::const_iterator it = stat_names.begin();
         it != stat_names.end();
-        it++)
+        it++) {
         printStat(it->c_str(), printbuf, current);
+    }
    printbuf << "END\n";
 }
 
@@ -554,11 +544,11 @@ UFStatCollector::getStatsWithPrefix(cons
     statsMutex.lock(running_user_fiber);
     // Get all stats which start with stat_prefix
     for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
-        it != UFStatSystem::global_stats.end(); it++) 
-    {
+        it != UFStatSystem::global_stats.end(); it++) {
         size_t found = it->name.find(stat_prefix);
-        if(!found)
+        if(found == 0) {
             stat_names.push_back(it->name);
+        }
     }
     statsMutex.unlock(running_user_fiber);
 }

Propchange: trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
            ('svn:mergeinfo' removed)

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStats.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStats.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStats.C Fri Aug 13 22:14:31 2010
@@ -1,7 +1,6 @@
-#include <UFStats.H>
-#include <UFStatSystem.H>
+#include "UFStats.H"
+#include "UFStatSystem.H"
 
-uint32_t UFStats::currentConnections;
 uint32_t UFStats::connectionsHandled;
 uint32_t UFStats::txnSuccess;
 uint32_t UFStats::txnFail;
@@ -13,7 +12,6 @@ namespace UFStats
 {
     void registerStats(bool lock_needed)
     {
-        UFStatSystem::registerStat("connections.current", &currentConnections, lock_needed);
         UFStatSystem::registerStat("connections.handled", &connectionsHandled, lock_needed);
         UFStatSystem::registerStat("txn.success", &txnSuccess, lock_needed);
         UFStatSystem::registerStat("txn.fail", &txnFail, lock_needed);

Propchange: trafficserver/traffic/branches/UserFiber/protocol/
            ('svn:mergeinfo' removed)

Modified: trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C (original)
+++ trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C Fri Aug 13 22:14:31 2010
@@ -21,15 +21,14 @@
 #include <string>
 #include <stdio.h>
 
-#include <ufcore/UF.H>
-#include <ufcore/UFIO.H>
-#include <ufcore/UFServer.H>
-#include <ufcore/UFConnectionPool.H>
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
+#include "UFConnectionPool.H"
 #include <vector>
 
 using namespace std;
 
-unsigned int NUM_REQUESTS_TO_RUN                        = 0;
 unsigned short int NUM_THREADS                          = 1;
 unsigned int NUM_USER_FIBERS_ALLOWED_TO_RUN             = 1;
 unsigned int NUM_CONNECTIONS_PER_FIBER                  = 1;
@@ -80,10 +79,10 @@ ResponseInfoObject overallInfo;
 pthread_mutex_t overallInfoTrackMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_key_t threadUpdateOverallInfo;
 
-TIME_IN_US GET_RESPONSE_TIMEOUT = -1;
+int GET_RESPONSE_TIMEOUT = 1*1000*1000;
 string DOUBLE_NEWLINE = "\r\n\r\n";
 unsigned int DOUBLE_NEWLINE_LENGTH = DOUBLE_NEWLINE.length();
-bool readData(UFIO* ufio, bool& connClosed)
+bool readData(UFIO* ufio, unsigned int timeout, bool& connClosed)
 {
     string result;
     char buf[4096];
@@ -93,7 +92,7 @@ bool readData(UFIO* ufio, bool& connClos
     bool okToExitToEnd = false;
     while(1)
     {
-        int num_bytes_read = ufio->read(buf, 4095, GET_RESPONSE_TIMEOUT);
+        int num_bytes_read = ufio->read(buf, 4095, 0);
         if(num_bytes_read <= 0)
         {
             if(okToExitToEnd && (num_bytes_read == 0))
@@ -102,13 +101,8 @@ bool readData(UFIO* ufio, bool& connClos
                 connClosed = true;
                 return true;
             }
-            cerr<<"bytes read = "<<num_bytes_read<<" with errno = "<<strerror(ufio->getErrno())<<endl;
             return false;
         }
-        /*
-        else
-            cerr<<"read"<<string(buf, num_bytes_read)<<endl;
-            */
         result.append(buf, num_bytes_read);
 
 
@@ -153,7 +147,6 @@ bool readData(UFIO* ufio, bool& connClos
            (result.length() > (endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength)))
//dont support pipelining yet
         {
             cerr<<"read more than supposed to"<<endl;
-            cerr<<"read "<<result;
             return false;
         }
         else if(result.length() == endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength)
@@ -165,12 +158,10 @@ bool readData(UFIO* ufio, bool& connClos
 
 
 unsigned int SLEEP_BETWEEN_CONN_SETUP = 0;
-TIME_IN_US CONNECT_AND_REQUEST_TIMEOUT = -1;
+int CONNECT_AND_REQUEST_TIMEOUT = 1*1000*1000;
 bool writeData(UFIO* ufio, const string& data)
 {
     int amt_written = ufio->write(data.data(), data.length(), CONNECT_AND_REQUEST_TIMEOUT);
-    if(amt_written <= 0)
-        cerr<<"write failed "<<ufio->getErrno()<<endl;
     return ((amt_written == (int)data.length()) ? true : false);
 }
 
@@ -184,7 +175,7 @@ UFIO* getConn(ResponseInfoObject* rIO)
 
     struct timeval start,finish;
     gettimeofday(&start, 0);
-    UFIO* ufio = cpool->getConnection(remote_addr, true, CONNECT_AND_REQUEST_TIMEOUT);
+    UFIO* ufio = cpool->getConnection(remote_addr);
     if(!ufio)
     {
         if(random()%100 < 10)
@@ -230,7 +221,7 @@ void run_handler()
         return;
 
     //do the requests
-    unsigned int num_requests_run = 0;
+    unsigned short int num_requests_run = 0;
     while(num_requests_run++ < NUM_REQUESTS_PER_FIBER)
     {
         if(INTER_SEND_SLEEP)
@@ -267,7 +258,7 @@ void run_handler()
 
 
         bool connClosed = false;
-        if(!readData(ufio, connClosed))
+        if(!readData(ufio, CONNECT_AND_REQUEST_TIMEOUT, connClosed))
         {
             if(random()%100 < 10)
                 cerr<<"bailing since read data failed "<<strerror(errno)<<endl;
@@ -309,7 +300,7 @@ void ClientUF::run()
     ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo);
     if(!rIO)
         return;
-    unsigned int num_requests_run = 0;
+    unsigned short int num_requests_run = 0;
     while(num_requests_run++ < NUM_CONNECTIONS_PER_FIBER)
     {
         //wait if told to do so
@@ -414,13 +405,16 @@ void SetupClientUF::run()
         {
             if(!rIO.num_user_fibers_running)
                 break;
+            else if ((THREAD_COMPLETION_PERCENT_TO_BAIL_ON < 100) && 
+                     (rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN <=
THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
+                )
+            {
+                cerr<<"bailing due to "<<"num_user_fibers_running = "<<rIO.num_user_fibers_running<<"
and div = "<<(rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN)<<"
and amt to bail on = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl;
+                break;
+            }
 
             UF::gusleep(5000000);
-            unsigned short int threadCompletionPercent = (rIO.num_attempt*100)/NUM_REQUESTS_TO_RUN;
-            cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<NUM_REQUESTS_TO_RUN<<"
("<<threadCompletionPercent<<"%)"<<endl;
-
-            if(threadCompletionPercent > THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
-                break;
+            cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN)<<"
("<<(rIO.num_attempt*100/(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN))<<"%)"<<endl;
         }
     }
 
@@ -557,7 +551,6 @@ void print_usage()
 
 int main(int argc, char** argv)
 {
-    unsigned long long int DELAY_BETWEEN_STARTING_THREADS_IN_US = 0;
     if(pthread_key_create(&threadUpdateOverallInfo, 0) != 0)
     {
         cerr<<"couldnt create key for threadUpdateOverallInfo "<<strerror(errno)<<endl;
@@ -567,13 +560,10 @@ int main(int argc, char** argv)
     string rem_port = "80";
     string rem_addr = "127.0.0.1";
     char ch;
-	while ((ch = getopt(argc, argv, "M:Z:U:x:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1) 
+	while ((ch = getopt(argc, argv, "M:Z:U:x:X:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1)

     {
 		switch (ch) 
         {
-            case'x':
-                DELAY_BETWEEN_STARTING_THREADS_IN_US = atoi(optarg);
-                break;
             case 'Z':
                 sleepShouldBeRandom = atoi(optarg);
                 break;
@@ -619,10 +609,14 @@ int main(int argc, char** argv)
                 HTTP_BASE_REQ_STRING = optarg;
                 break;
             case 'c':
-                CONNECT_AND_REQUEST_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 :
-1;
+                CONNECT_AND_REQUEST_TIMEOUT = atoi(optarg)*1000;
+                if(CONNECT_AND_REQUEST_TIMEOUT <= 0)
+                    CONNECT_AND_REQUEST_TIMEOUT = -1;
                 break;
             case 'd':
-                GET_RESPONSE_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1;
+                GET_RESPONSE_TIMEOUT = atoi(optarg)*1000;
+                if(GET_RESPONSE_TIMEOUT <= 0)
+                    GET_RESPONSE_TIMEOUT = -1;
                 break;
             case 's':
                 INTER_SEND_SLEEP = atoi(optarg)*1000;
@@ -648,7 +642,6 @@ int main(int argc, char** argv)
     remote_addr = rem_addr + ":" + rem_port;
     print_info();
 
-    NUM_REQUESTS_TO_RUN = NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN;
 
     //create the threads
     pthread_t* thread = new pthread_t[NUM_THREADS];
@@ -658,11 +651,6 @@ int main(int argc, char** argv)
         list<UF*>* ufList = new list<UF*>();
         ufList->push_back(new SetupClientUF());
         UFIO::ufCreateThreadWithIO(&thread[i], ufList);
-        if(DELAY_BETWEEN_STARTING_THREADS_IN_US)
-        {
-            cerr<<"sleeping for "<<DELAY_BETWEEN_STARTING_THREADS_IN_US<<endl;
-            usleep(DELAY_BETWEEN_STARTING_THREADS_IN_US);
-        }
     }
 
 



Mime
View raw message