hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From millec...@apache.org
Subject svn commit: r1544764 - /hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
Date Sat, 23 Nov 2013 09:49:41 GMT
Author: millecker
Date: Sat Nov 23 09:49:41 2013
New Revision: 1544764

URL: http://svn.apache.org/r1544764
Log:
HAMA-815: Hama Pipes uses C++ templates (fixed glibc detection)

Modified:
    hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc

Modified: hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
URL: http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/Pipes.cc?rev=1544764&r1=1544763&r2=1544764&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc (original)
+++ hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc Sat Nov 23 09:49:41 2013
@@ -173,7 +173,7 @@ public:
     cmd = deserializeInt(*in_stream_);
     
     switch (cmd) {
-        
+      
       case START_MESSAGE: {
         int32_t protocol_version;
         protocol_version = deserialize<int32_t>(*in_stream_);
@@ -184,7 +184,7 @@ public:
         handler_->start(protocol_version);
         break;
       }
-        // setup BSP Job Configuration
+      // setup BSP Job Configuration
       case SET_BSPJOB_CONF: {
         int32_t entries;
         entries = deserialize<int32_t>(*in_stream_);
@@ -310,7 +310,7 @@ public:
     if (expected_response_cmd == cmd) {
       
       switch (cmd) {
-          
+        
         case GET_MSG_COUNT: {
           T msg_count;
           msg_count = deserialize<T>(*in_stream_);
@@ -362,7 +362,7 @@ public:
           }
           return superstep_count;
         }
-          
+        
         case SEQFILE_OPEN: {
           T file_id = deserialize<T>(*in_stream_);
           if(logging) {
@@ -474,7 +474,7 @@ public:
     if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) {
       
       switch (cmd) {
-          
+        
         case READ_KEYVALUE: {
           K key = deserialize<K>(*in_stream_);
           V value = deserialize<V>(*in_stream_);
@@ -563,6 +563,7 @@ public:
     reader_ = NULL;
     writer_ = NULL;
     protocol_ = NULL;
+    uplink_ = NULL;
     
     done_ = false;
     has_task_ = false;
@@ -930,24 +931,6 @@ public:
     
     uplink_->sendCommand(READ_KEYVALUE);
     
-    // TODO
-    // check if value is array [0, 1, 2, ...], and remove brackets
-    /*
-     int len = current_value_.length();
-     if ( (current_value_[0]=='[') &&
-     (current_value_[len-1]==']') ) {
-     value = current_value_.substr(1,len-2);
-     } else {
-     value = current_value_;
-     }
-     
-     if (logging && key.empty() && value.empty()) {
-     fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty KeyValuePair\n");
-     }
-     
-     return (!key.empty() && !value.empty());
-     */
-    
     KeyValuePair<K1,V1> key_value_pair;
     key_value_pair = protocol_->template getKeyValueResult<K1,V1>(READ_KEYVALUE);
     
@@ -1034,18 +1017,6 @@ public:
     // send request
     uplink_->sendCommand<int32_t>(SEQFILE_READNEXT, file_id);
     
-    // TODO
-    /*
-     // check if value is array [0, 1, 2, ...], and remove brackets
-     int len = current_value_.length();
-     if ( (current_value_[0]=='[') &&
-     (current_value_[len-1]==']') ) {
-     value = current_value_(1,len-2);
-     } else {
-     value = current_value_;
-     }
-     */
-    
     // get response
     KeyValuePair<K,V> key_value_pair;
     key_value_pair = protocol_->template getKeyValueResult<K,V>(SEQFILE_READNEXT);
@@ -1163,14 +1134,15 @@ public:
   }
   
   virtual ~BSPContextImpl() {
+    delete factory_;
     delete job_;
-    //delete inputSplit_;
-    //if (reader) {
-    //  delete value;
-    //}
-    delete reader_;
     delete bsp_;
+    delete partitioner_;
+    delete reader_;
     delete writer_;
+    delete protocol_;
+    delete uplink_;
+    //delete inputSplit_;
     pthread_mutex_destroy(&mutex_done_);
   }
 };
@@ -1307,19 +1279,11 @@ bool runTask(const Factory<K1, V1, K2, V
     
     context->waitForTask();
     
-    //while (!context->isDone()) {
-    //context->nextKey();
-    //}
-    
     context->closeAll();
     protocol->getUplink()->sendCommand(DONE);
     
     //pthread_join(pingThread,NULL);
     
-    // Cleanup
-    delete context;
-    delete protocol;
-    
     if (in_stream != NULL) {
       fflush(in_stream);
     }
@@ -1336,16 +1300,16 @@ bool runTask(const Factory<K1, V1, K2, V
       HADOOP_ASSERT(result == 0, "problem closing socket");
     }
     
-    //TODO REFACTOR
-    if (in_stream != NULL) {
-      //fclose(stream);
-    }
-    if (out_stream != NULL) {
-      //fclose(outStream);
-    }
+    // Cleanup
+    delete context;
+    delete protocol;
+    
     delete bufin;
     delete bufout;
     
+    delete in_stream;
+    delete out_stream;
+    
     return true;
     
   } catch (Error& err) {



Mime
View raw message