trafodion-codereview mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [trafodion] narendragoyal commented on a change in pull request #1869: [TRAFODION-3334] Refactored and re-implemented monitor communication.
Date Wed, 12 Feb 2020 21:44:34 GMT
narendragoyal commented on a change in pull request #1869: [TRAFODION-3334] Refactored and
re-implemented monitor communication.
URL: https://github.com/apache/trafodion/pull/1869#discussion_r377995049
 
 

 ##########
 File path: core/sqf/monitor/linux/comm.cxx
 ##########
 @@ -0,0 +1,1757 @@
+///////////////////////////////////////////////////////////////////////////////
+//
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include <iostream>
+
+using namespace std;
+
+#include <errno.h>
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#include "monlogging.h"
+#include "montrace.h"
+#include "comm.h"
+
+const char *EpollEventString( __uint32_t events );
+const char *EpollOpString( int op );
+
+CComm::CComm( void )
+      :epollFd_(-1)
+{
+    const char method_name[] = "CComm::CComm";
+    TRACE_ENTRY;
+
+    // Add eyecatcher sequence as a debugging aid
+    memcpy(&eyecatcher_, "COMM", 4);
+
+    epollFd_ = epoll_create1( EPOLL_CLOEXEC );
+    if ( epollFd_ < 0 )
+    {
+        char ebuff[256];
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf), "[%s@%d] epoll_create1(sendrecv) error: %s\n",
+            method_name, __LINE__, strerror_r( errno, ebuff, 256 ) );
+        mon_log_write( COMM_COMM_1, SQ_LOG_CRIT, buf );
+
+        mon_failure_exit();
+    }
+
+    TRACE_EXIT;
+}
+
+CComm::~CComm( void )
+{
+    const char method_name[] = "CComm::~CComm";
+    TRACE_ENTRY;
+
+    if (epollFd_ != -1)
+    {
+        close( epollFd_ );
+    }
+
+    // Alter eyecatcher sequence as a debugging aid to identify deleted object
+    memcpy(&eyecatcher_, "comm", 4);
+
+    TRACE_EXIT;
+}
+
+int CComm::Accept( int listenSock )
+{
+    const char method_name[] = "CComm::Accept";
+    TRACE_ENTRY;
+
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+    socklen_t  size;    // size of socket address
+#else
+    size_t   size;      // size of socket address
+#endif
+#else
+    int    size;        // size of socket address
+#endif
+    int csock; // connected socket
+    struct sockaddr_in  sockinfo;   // socket address info
+
+    size = sizeof(struct sockaddr *);
+    if ( getsockname( listenSock, (struct sockaddr *) &sockinfo, &size ) )
+    {
+        char buf[MON_STRING_BUF_SIZE];
+        int err = errno;
+        snprintf(buf, sizeof(buf), "[%s], getsockname() failed, errno=%d (%s).\n",
+                 method_name, err, strerror(err));
+        mon_log_write(COMM_ACCEPT_1, SQ_LOG_ERR, buf);
+        return ( -1 );
+    }
+
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
+        trace_printf( "%s@%d - Accepting socket on addr=%d.%d.%d.%d,  port=%d\n"
+                    , method_name, __LINE__
+                    , addrp[0]
+                    , addrp[1]
+                    , addrp[2]
+                    , addrp[3]
+                    , (int) ntohs( sockinfo.sin_port ) );
+    }
+
+    while ( ((csock = accept( listenSock
+                            , (struct sockaddr *) 0
+                            , (socklen_t *) 0 ) ) < 0) && (errno == EINTR) );
+
+    if ( csock > 0 )
+    {
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
+            trace_printf( "%s@%d - Accepted socket on addr=%d.%d.%d.%d, "
+                          "port=%d, listenSock=%d, csock=%d\n"
+                        , method_name, __LINE__
+                        , addrp[0]
+                        , addrp[1]
+                        , addrp[2]
+                        , addrp[3]
+                        , (int) ntohs( sockinfo.sin_port )
+                        , listenSock
+                        , csock );
+        }
+
+        int nodelay = 1;
+        if ( setsockopt( csock
+                       , IPPROTO_TCP
+                       , TCP_NODELAY
+                       , (char *) &nodelay
+                       , sizeof(int) ) )
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            int err = errno;
+            snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n",
+                     method_name, err, strerror(err));
+            mon_log_write(COMM_ACCEPT_2, SQ_LOG_ERR, buf);
+            return ( -2 );
+        }
+
+        int reuse = 1;
+        if ( setsockopt( csock
+                       , SOL_SOCKET
+                       , SO_REUSEADDR
+                       , (char *) &reuse
+                       , sizeof(int) ) )
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            int err = errno;
+            snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n",
+                     method_name, err, strerror(err));
+            mon_log_write(COMM_ACCEPT_3, SQ_LOG_ERR, buf);
+            return ( -2 );
+        }
+    }
+
+    TRACE_EXIT;
+    return ( csock );
+}
+
+void CComm::ConnectLocal( int port )
+{
+    const char method_name[] = "CComm::ConnectLocal";
+    TRACE_ENTRY;
+
+    int  sock;     // socket
+    int  ret;      // returned value
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+    socklen_t  size;    // size of socket address
+#else
+    size_t   size;      // size of socket address
+#endif
+#else
+    int    size;        // size of socket address
+#endif
+    static int retries = 0;       // # times to retry connect
+    int     connect_failures = 0; // # failed connects
+    char   *p;     // getenv results
+    struct sockaddr_in  sockinfo; // socket address info
+    struct hostent *he;
+
+    size = sizeof(sockinfo);
+
+    if ( !retries )
+    {
+        p = getenv( "HPMP_CONNECT_RETRIES" );
+        if ( p ) retries = atoi( p );
+        else retries = 5;
+    }
+
+    sock = socket( AF_INET, SOCK_STREAM, 0 );
+    if ( sock < 0 )
+    {
+        char la_buf[MON_STRING_BUF_SIZE];
+        int err = errno;
+        sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n"
+               , method_name, err, strerror( err ));
+        mon_log_write( COMM_CONNECTLOCAL_1, SQ_LOG_CRIT, la_buf );
+
+        mon_failure_exit();
+    }
+
+    he = gethostbyname( "localhost" );
+    if ( !he )
+    {
+        char ebuff[256];
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n",
+            method_name, __LINE__, "localhost", strerror_r( h_errno, ebuff, 256 ) );
+        mon_log_write( COMM_CONNECTLOCAL_2, SQ_LOG_CRIT, buf );
+
+        mon_failure_exit();
+    }
+
+    // Connect socket.
+    memset( (char *) &sockinfo, 0, size );
+    memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 );
+    sockinfo.sin_family = AF_INET;
+    sockinfo.sin_port = htons( (unsigned short) port );
+
+    connect_failures = 0;
+    ret = 1;
+    while ( ret != 0 && connect_failures <= 10 )
+    {
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Connecting to localhost addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n"
+                        , method_name, __LINE__
+                        , (int)((unsigned char *)he->h_addr)[0]
+                        , (int)((unsigned char *)he->h_addr)[1]
+                        , (int)((unsigned char *)he->h_addr)[2]
+                        , (int)((unsigned char *)he->h_addr)[3]
+                        , port
+                        , connect_failures );
+        }
+
+        ret = connect( sock, (struct sockaddr *) &sockinfo, size );
+        if ( ret == 0 ) break;
+        if ( errno == EINTR )
+        {
+            ++connect_failures;
+        }
+        else
+        {
+            char la_buf[MON_STRING_BUF_SIZE];
+            int err = errno;
+            sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n"
+                   , method_name, err, strerror( err ));
+            mon_log_write(COMM_CONNECTLOCAL_3, SQ_LOG_CRIT, la_buf);
+
+            mon_failure_exit();
+        }
+    }
+
+    close( sock );
+
+    TRACE_EXIT;
+}
+
+int CComm::Connect( const char *portName, bool doRetries )
+{
+    const char method_name[] = "CComm::Connect";
+    TRACE_ENTRY;
+
+    int  sock;      // socket
+    int  ret;       // returned value
+    int  nodelay = 1; // sockopt reuse option
 
 Review comment:
   typo in the comment - not a 'reuse' option :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message