rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifplu...@apache.org
Subject [rocketmq-client-cpp] 03/04: feat: support ipv6
Date Wed, 24 Mar 2021 09:06:11 GMT
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 40882dc35e8f6bc62edd2fb03864ef031e72c8b3
Author: James Yin <ywhjames@hotmail.com>
AuthorDate: Thu Mar 11 15:59:04 2021 +0800

    feat: support ipv6
---
 src/ClientRemotingProcessor.cpp            |   6 +-
 src/MQClientConfigImpl.hpp                 |   3 +-
 src/common/UtilAll.cpp                     |  53 +------
 src/common/UtilAll.h                       |   8 --
 src/consumer/DefaultMQPushConsumerImpl.cpp |   4 +-
 src/message/MessageBatch.cpp               |   3 +-
 src/message/MessageClientIDSetter.cpp      |  40 ++++--
 src/message/MessageClientIDSetter.h        |   2 +-
 src/message/MessageDecoder.cpp             |  42 +++---
 src/message/MessageDecoder.h               |   7 +-
 src/message/MessageExtImpl.cpp             |  26 ++--
 src/message/MessageExtImpl.h               |   4 +-
 src/message/MessageId.h                    |  16 +--
 src/transport/EventLoop.cpp                |   6 +-
 src/transport/SocketUtil.cpp               | 217 ++++++++++++++++++-----------
 src/transport/SocketUtil.h                 |  45 ++++--
 16 files changed, 258 insertions(+), 224 deletions(-)

diff --git a/src/ClientRemotingProcessor.cpp b/src/ClientRemotingProcessor.cpp
index cf97306..32366d6 100644
--- a/src/ClientRemotingProcessor.cpp
+++ b/src/ClientRemotingProcessor.cpp
@@ -16,9 +16,9 @@
  */
 #include "ClientRemotingProcessor.h"
 
-#include "MessageDecoder.h"
 #include "MQProtos.h"
 #include "MessageAccessor.hpp"
+#include "MessageDecoder.h"
 #include "MessageSysFlag.h"
 #include "RequestFutureTable.h"
 #include "SocketUtil.h"
@@ -153,11 +153,11 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand*
r
     msg->set_store_timestamp(requestHeader->store_timestamp());
 
     if (!requestHeader->born_host().empty()) {
-      msg->set_born_host(string2SocketAddress(requestHeader->born_host()));
+      msg->set_born_host(StringToSockaddr(requestHeader->born_host()));
     }
 
     if (!requestHeader->store_host().empty()) {
-      msg->set_store_host(string2SocketAddress(requestHeader->store_host()));
+      msg->set_store_host(StringToSockaddr(requestHeader->store_host()));
     }
 
     auto body = request->body();
diff --git a/src/MQClientConfigImpl.hpp b/src/MQClientConfigImpl.hpp
index 59b4601..b9ad503 100644
--- a/src/MQClientConfigImpl.hpp
+++ b/src/MQClientConfigImpl.hpp
@@ -22,6 +22,7 @@
 
 #include "MQClientConfig.h"
 #include "NamespaceUtil.h"
+#include "SocketUtil.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
@@ -45,7 +46,7 @@ class MQClientConfigImpl : virtual public MQClientConfig {
 
   std::string buildMQClientId() const override {
     std::string clientId;
-    clientId.append(UtilAll::getLocalAddress());  // clientIP
+    clientId.append(GetLocalAddress());  // clientIP
     clientId.append("@");
     clientId.append(instance_name_);  // instanceName
     if (!unit_name_.empty()) {
diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp
index 2cbe00a..731d8aa 100644
--- a/src/common/UtilAll.cpp
+++ b/src/common/UtilAll.cpp
@@ -40,9 +40,6 @@
 
 namespace rocketmq {
 
-std::string UtilAll::sLocalHostName;
-std::string UtilAll::sLocalIpAddress;
-
 bool UtilAll::try_lock_for(std::timed_mutex& mutex, long timeout) {
   auto now = std::chrono::steady_clock::now();
   auto deadline = now + std::chrono::milliseconds(timeout);
@@ -157,7 +154,7 @@ bool UtilAll::isBlank(const std::string& str) {
 }
 
 bool UtilAll::SplitURL(const std::string& serverURL, std::string& addr, short&
nPort) {
-  auto pos = serverURL.find(':');
+  auto pos = serverURL.find_last_of(':');
   if (pos == std::string::npos) {
     return false;
   }
@@ -228,54 +225,6 @@ int UtilAll::Split(std::vector<std::string>& ret_, const std::string&
strIn, con
   return ret_.size();
 }
 
-std::string UtilAll::getLocalHostName() {
-  if (sLocalHostName.empty()) {
-    char name[1024];
-    if (::gethostname(name, sizeof(name)) != 0) {
-      return null;
-    }
-    sLocalHostName.append(name, strlen(name));
-  }
-  return sLocalHostName;
-}
-
-std::string UtilAll::getLocalAddress() {
-  if (sLocalIpAddress.empty()) {
-    auto hostname = getLocalHostName();
-    if (!hostname.empty()) {
-      try {
-        sLocalIpAddress = socketAddress2String(lookupNameServers(hostname));
-      } catch (std::exception& e) {
-        LOG_WARN(e.what());
-        sLocalIpAddress = "127.0.0.1";
-      }
-    }
-  }
-  return sLocalIpAddress;
-}
-
-uint32_t UtilAll::getIP() {
-  std::string ip = UtilAll::getLocalAddress();
-  if (ip.empty()) {
-    return 0;
-  }
-
-  char* ip_str = new char[ip.length() + 1];
-  std::strncpy(ip_str, ip.c_str(), ip.length());
-  ip_str[ip.length()] = '\0';
-
-  int i = 3;
-  uint32_t nResult = 0;
-  for (char* token = std::strtok(ip_str, "."); token != nullptr && i >= 0; token
= std::strtok(nullptr, ".")) {
-    uint32_t n = std::atoi(token);
-    nResult |= n << (8 * i--);
-  }
-
-  delete[] ip_str;
-
-  return nResult;
-}
-
 std::string UtilAll::getHomeDirectory() {
 #ifndef WIN32
   char* home_env = std::getenv("HOME");
diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h
index 8ad49b0..63989f7 100644
--- a/src/common/UtilAll.h
+++ b/src/common/UtilAll.h
@@ -112,10 +112,6 @@ class UtilAll {
   static int Split(std::vector<std::string>& ret_, const std::string& strIn,
const char sep);
   static int Split(std::vector<std::string>& ret_, const std::string& strIn,
const std::string& sep);
 
-  static std::string getLocalHostName();
-  static std::string getLocalAddress();
-  static uint32_t getIP();
-
   static std::string getHomeDirectory();
   static void createDirectory(std::string const& dir);
   static bool existDirectory(std::string const& dir);
@@ -138,10 +134,6 @@ class UtilAll {
   // Returns true on success.
   // Returns false on failure..
   static bool ReplaceFile(const std::string& from_path, const std::string& to_path);
-
- private:
-  static std::string sLocalHostName;
-  static std::string sLocalIpAddress;
 };
 
 template <typename T>
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 11a12a1..bf00cc2 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -536,8 +536,8 @@ bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int
delay_lev
   try {
     msg->set_topic(NamespaceUtil::wrapNamespace(client_config_->name_space(), msg->topic()));
 
-    std::string brokerAddr = brokerName.empty() ? socketAddress2String(msg->store_host())
-                                                : client_instance_->findBrokerAddressInPublish(brokerName);
+    std::string brokerAddr =
+        brokerName.empty() ? msg->store_host_string() : client_instance_->findBrokerAddressInPublish(brokerName);
 
     client_instance_->getMQClientAPIImpl()->consumerSendMessageBack(
         brokerAddr, msg, getDefaultMQPushConsumerConfig()->group_name(), delay_level,
5000,
diff --git a/src/message/MessageBatch.cpp b/src/message/MessageBatch.cpp
index 4030fd6..2ba0c18 100644
--- a/src/message/MessageBatch.cpp
+++ b/src/message/MessageBatch.cpp
@@ -16,8 +16,9 @@
  */
 #include "MessageBatch.h"
 
-#include "MessageDecoder.h"
+#include "MQException.h"
 #include "MessageClientIDSetter.h"
+#include "MessageDecoder.h"
 
 namespace rocketmq {
 
diff --git a/src/message/MessageClientIDSetter.cpp b/src/message/MessageClientIDSetter.cpp
index dda45df..5560ba1 100644
--- a/src/message/MessageClientIDSetter.cpp
+++ b/src/message/MessageClientIDSetter.cpp
@@ -25,7 +25,8 @@
 #include <unistd.h>
 #endif
 
-#include "ByteOrder.h"
+#include "ByteBuffer.hpp"
+#include "SocketUtil.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
@@ -33,16 +34,28 @@ namespace rocketmq {
 MessageClientIDSetter::MessageClientIDSetter() {
   std::srand((uint32_t)std::time(NULL));
 
-  uint32_t pid = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(UtilAll::getProcessId()));
-  uint32_t ip = ByteOrderUtil::NorminalBigEndian(UtilAll::getIP());
-  uint32_t random_num = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(std::rand()));
-
-  char bin_buf[10];
-  std::memcpy(bin_buf + 2, &pid, 4);
-  std::memcpy(bin_buf, &ip, 4);
-  std::memcpy(bin_buf + 6, &random_num, 4);
+  std::unique_ptr<ByteBuffer> buffer;
+  sockaddr* addr = GetSelfIP();
+  if (addr != nullptr) {
+    buffer.reset(ByteBuffer::allocate(SockaddrSize(addr) + 2 + 4));
+    if (addr->sa_family == AF_INET) {
+      auto* sin = (struct sockaddr_in*)addr;
+      buffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize));
+    } else if (addr->sa_family == AF_INET6) {
+      auto* sin6 = (struct sockaddr_in6*)addr;
+      buffer->put(ByteArray(reinterpret_cast<char*>(&sin6->sin6_addr), kIPv6AddrSize));
+    } else {
+      (void)buffer.release();
+    }
+  }
+  if (buffer == nullptr) {
+    buffer.reset(ByteBuffer::allocate(4 + 2 + 4));
+    buffer->putInt(UtilAll::currentTimeMillis());
+  }
+  buffer->putShort(UtilAll::getProcessId());
+  buffer->putInt(std::rand());
 
-  fix_string_ = UtilAll::bytes2string(bin_buf, 10);
+  fixed_string_ = UtilAll::bytes2string(buffer->array(), buffer->position());
 
   setStartTime(UtilAll::currentTimeMillis());
 
@@ -93,11 +106,8 @@ std::string MessageClientIDSetter::createUniqueID() {
   uint32_t period = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(current
- start_time_));
   uint16_t seqid = ByteOrderUtil::NorminalBigEndian(counter_++);
 
-  char bin_buf[6];
-  std::memcpy(bin_buf, &period, 4);
-  std::memcpy(bin_buf + 4, &seqid, 2);
-
-  return fix_string_ + UtilAll::bytes2string(bin_buf, 6);
+  return fixed_string_ + UtilAll::bytes2string(reinterpret_cast<char*>(&period),
sizeof(period)) +
+         UtilAll::bytes2string(reinterpret_cast<char*>(&seqid), sizeof(seqid));
 }
 
 }  // namespace rocketmq
diff --git a/src/message/MessageClientIDSetter.h b/src/message/MessageClientIDSetter.h
index 222115e..e688786 100644
--- a/src/message/MessageClientIDSetter.h
+++ b/src/message/MessageClientIDSetter.h
@@ -68,7 +68,7 @@ class MessageClientIDSetter {
   uint64_t next_start_time_;
   std::atomic<uint16_t> counter_;
 
-  std::string fix_string_;
+  std::string fixed_string_;
 };
 
 }  // namespace rocketmq
diff --git a/src/message/MessageDecoder.cpp b/src/message/MessageDecoder.cpp
index a36d3bf..08df407 100644
--- a/src/message/MessageDecoder.cpp
+++ b/src/message/MessageDecoder.cpp
@@ -20,14 +20,18 @@
 #include <sstream>    // std::stringstream
 
 #ifndef WIN32
-#include <netinet/in.h>  // struct sockaddr, sockaddr_in, sockaddr_in6
+#include <arpa/inet.h>   // htons
+#include <netinet/in.h>  // sockaddr_in, sockaddr_in6
+#else
+#include "Winsock2.h"
 #endif
 
 #include "ByteOrder.h"
 #include "Logging.h"
-#include "MessageExtImpl.h"
 #include "MessageAccessor.hpp"
+#include "MessageExtImpl.h"
 #include "MessageSysFlag.h"
+#include "SocketUtil.h"
 #include "UtilAll.h"
 
 static const char NAME_VALUE_SEPARATOR = 1;
@@ -36,37 +40,35 @@ static const char PROPERTY_SEPARATOR = 2;
 namespace rocketmq {
 
 std::string MessageDecoder::createMessageId(const struct sockaddr* sa, int64_t offset) {
-  int msgIDLength = sa->sa_family == AF_INET ? 16 : 28;
-  std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(msgIDLength));
+  int msgIdLength = IpaddrSize(sa) + /* port field size */ 4 + sizeof(offset);
+  std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(msgIdLength));
   if (sa->sa_family == AF_INET) {
     struct sockaddr_in* sin = (struct sockaddr_in*)sa;
-    byteBuffer->put(ByteArray((char*)&sin->sin_addr, 4));
-    byteBuffer->putInt(ByteOrderUtil::NorminalBigEndian(sin->sin_port));
+    byteBuffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize));
+    byteBuffer->putInt(ntohs(sin->sin_port));
   } else {
     struct sockaddr_in6* sin6 = (struct sockaddr_in6*)sa;
-    byteBuffer->put(ByteArray((char*)&sin6->sin6_addr, 16));
-    byteBuffer->putInt(ByteOrderUtil::NorminalBigEndian(sin6->sin6_port));
+    byteBuffer->put(ByteArray(reinterpret_cast<char*>(&sin6->sin6_addr),
kIPv6AddrSize));
+    byteBuffer->putInt(ntohs(sin6->sin6_port));
   }
   byteBuffer->putLong(offset);
   byteBuffer->flip();
-  return UtilAll::bytes2string(byteBuffer->array(), msgIDLength);
+  return UtilAll::bytes2string(byteBuffer->array(), msgIdLength);
 }
 
 MessageId MessageDecoder::decodeMessageId(const std::string& msgId) {
-  size_t ip_length = msgId.length() == 32 ? 4 * 2 : 16 * 2;
+  size_t ip_length = msgId.length() == 32 ? kIPv4AddrSize * 2 : kIPv6AddrSize * 2;
 
   ByteArray byteArray(ip_length / 2);
   std::string ip = msgId.substr(0, ip_length);
   UtilAll::string2bytes(byteArray.array(), ip);
 
   std::string port = msgId.substr(ip_length, 8);
-  // uint32_t portInt = ByteOrderUtil::NorminalBigEndian<uint32_t>(std::stoul(port,
nullptr, 16));
   uint32_t portInt = std::stoul(port, nullptr, 16);
 
-  auto* sin = ipPort2SocketAddress(byteArray, portInt);
+  auto* sin = IPPortToSockaddr(byteArray, portInt);
 
   std::string offset = msgId.substr(ip_length + 8);
-  // uint64_t offsetInt = ByteOrderUtil::NorminalBigEndian<uint64_t>(std::stoull(offset,
nullptr, 16));
   uint64_t offsetInt = std::stoull(offset, nullptr, 16);
 
   return MessageId(sin, offsetInt);
@@ -123,22 +125,22 @@ MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool
readBody, bool
   msgExt->set_born_timestamp(bornTimeStamp);
 
   // 10 BORNHOST
-  int bornhostIPLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? 4 : 16;
-  ByteArray bornHost(bornhostIPLength);
-  byteBuffer.get(bornHost, 0, bornhostIPLength);
+  int bornHostLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? kIPv4AddrSize
: kIPv6AddrSize;
+  ByteArray bornHost(bornHostLength);
+  byteBuffer.get(bornHost, 0, bornHostLength);
   int32_t bornPort = byteBuffer.getInt();
-  msgExt->set_born_host(ipPort2SocketAddress(bornHost, bornPort));
+  msgExt->set_born_host(IPPortToSockaddr(bornHost, bornPort));
 
   // 11 STORETIMESTAMP
   int64_t storeTimestamp = byteBuffer.getLong();
   msgExt->set_store_timestamp(storeTimestamp);
 
   // 12 STOREHOST
-  int storehostIPLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? 4 : 16;
-  ByteArray storeHost(bornhostIPLength);
+  int storehostIPLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? kIPv4AddrSize
: kIPv6AddrSize;
+  ByteArray storeHost(bornHostLength);
   byteBuffer.get(storeHost, 0, storehostIPLength);
   int32_t storePort = byteBuffer.getInt();
-  msgExt->set_store_host(ipPort2SocketAddress(storeHost, storePort));
+  msgExt->set_store_host(IPPortToSockaddr(storeHost, storePort));
 
   // 13 RECONSUMETIMES
   int32_t reconsumeTimes = byteBuffer.getInt();
diff --git a/src/message/MessageDecoder.h b/src/message/MessageDecoder.h
index ad0b82d..5c45d8e 100644
--- a/src/message/MessageDecoder.h
+++ b/src/message/MessageDecoder.h
@@ -17,8 +17,13 @@
 #ifndef ROCKETMQ_MESSAGE_MESSAGEDECODER_H_
 #define ROCKETMQ_MESSAGE_MESSAGEDECODER_H_
 
+#ifndef WIN32
+#include <sys/socket.h>  // sockaddr
+#else
+#include <Winsock2.h>
+#endif
+
 #include "ByteBuffer.hpp"
-#include "MQException.h"
 #include "MQMessageExt.h"
 #include "MessageId.h"
 
diff --git a/src/message/MessageExtImpl.cpp b/src/message/MessageExtImpl.cpp
index f43ac35..cfd6b99 100644
--- a/src/message/MessageExtImpl.cpp
+++ b/src/message/MessageExtImpl.cpp
@@ -44,20 +44,14 @@ MessageExtImpl::MessageExtImpl(int queueId,
       commit_log_offset_(0),
       sys_flag_(0),
       born_timestamp_(bornTimestamp),
-      born_host_(nullptr),
+      born_host_(SockaddrToStorage(bornHost)),
       store_timestamp_(storeTimestamp),
-      store_host_(nullptr),
+      store_host_(SockaddrToStorage(storeHost)),
       reconsume_times_(3),
       prepared_transaction_offset_(0),
-      msg_id_(msgId) {
-  born_host_ = copySocketAddress(born_host_, bornHost);
-  store_host_ = copySocketAddress(store_host_, storeHost);
-}
+      msg_id_(msgId) {}
 
-MessageExtImpl::~MessageExtImpl() {
-  free(born_host_);
-  free(store_host_);
-}
+MessageExtImpl::~MessageExtImpl() = default;
 
 TopicFilterType MessageExtImpl::parseTopicFilterType(int32_t sysFlag) {
   if ((sysFlag & MessageSysFlag::MULTI_TAGS_FLAG) == MessageSysFlag::MULTI_TAGS_FLAG)
{
@@ -123,15 +117,15 @@ void MessageExtImpl::set_born_timestamp(int64_t bornTimestamp) {
 }
 
 const struct sockaddr* MessageExtImpl::born_host() const {
-  return born_host_;
+  return reinterpret_cast<sockaddr*>(born_host_.get());
 }
 
 std::string MessageExtImpl::born_host_string() const {
-  return socketAddress2String(born_host_);
+  return SockaddrToString(born_host());
 }
 
 void MessageExtImpl::set_born_host(const struct sockaddr* bornHost) {
-  born_host_ = copySocketAddress(born_host_, bornHost);
+  born_host_ = SockaddrToStorage(bornHost);
 }
 
 int64_t MessageExtImpl::store_timestamp() const {
@@ -143,15 +137,15 @@ void MessageExtImpl::set_store_timestamp(int64_t storeTimestamp) {
 }
 
 const struct sockaddr* MessageExtImpl::store_host() const {
-  return store_host_;
+  return reinterpret_cast<sockaddr*>(store_host_.get());
 }
 
 std::string MessageExtImpl::store_host_string() const {
-  return socketAddress2String(store_host_);
+  return SockaddrToString(store_host());
 }
 
 void MessageExtImpl::set_store_host(const struct sockaddr* storeHost) {
-  store_host_ = copySocketAddress(store_host_, storeHost);
+  store_host_ = SockaddrToStorage(storeHost);
 }
 
 const std::string& MessageExtImpl::msg_id() const {
diff --git a/src/message/MessageExtImpl.h b/src/message/MessageExtImpl.h
index 5d85cea..fc44471 100644
--- a/src/message/MessageExtImpl.h
+++ b/src/message/MessageExtImpl.h
@@ -94,9 +94,9 @@ class MessageExtImpl : public MessageImpl,        // base
   int64_t commit_log_offset_;
   int32_t sys_flag_;
   int64_t born_timestamp_;
-  struct sockaddr* born_host_;
+  std::unique_ptr<sockaddr_storage> born_host_;
   int64_t store_timestamp_;
-  struct sockaddr* store_host_;
+  std::unique_ptr<sockaddr_storage> store_host_;
   int32_t reconsume_times_;
   int64_t prepared_transaction_offset_;
   std::string msg_id_;
diff --git a/src/message/MessageId.h b/src/message/MessageId.h
index 64e4603..e9a501b 100644
--- a/src/message/MessageId.h
+++ b/src/message/MessageId.h
@@ -27,29 +27,29 @@ namespace rocketmq {
 class MessageId {
  public:
   MessageId() : MessageId(nullptr, 0) {}
-  MessageId(struct sockaddr* address, int64_t offset) : address_(nullptr), offset_(offset)
{ setAddress(address); }
+  MessageId(const struct sockaddr* address, int64_t offset) : address_(SockaddrToStorage(address)),
offset_(offset) {}
 
-  MessageId(const MessageId& other) : MessageId(other.address_, other.offset_) {}
-  MessageId(MessageId&& other) : address_(other.address_), offset_(other.offset_)
{ other.address_ = nullptr; }
+  MessageId(const MessageId& other) : MessageId(other.getAddress(), other.offset_) {}
+  MessageId(MessageId&& other) : address_(std::move(other.address_)), offset_(other.offset_)
{}
 
-  virtual ~MessageId() { std::free(address_); }
+  virtual ~MessageId() = default;
 
   MessageId& operator=(const MessageId& other) {
     if (&other != this) {
-      setAddress(other.address_);
+      setAddress(other.getAddress());
       this->offset_ = other.offset_;
     }
     return *this;
   }
 
-  const struct sockaddr* getAddress() const { return address_; }
-  void setAddress(struct sockaddr* address) { address_ = copySocketAddress(address_, address);
}
+  const struct sockaddr* getAddress() const { return reinterpret_cast<sockaddr*>(address_.get());
}
+  void setAddress(const struct sockaddr* address) { address_ = SockaddrToStorage(address);
}
 
   int64_t getOffset() const { return offset_; }
   void setOffset(int64_t offset) { offset_ = offset; }
 
  private:
-  struct sockaddr* address_;
+  std::unique_ptr<sockaddr_storage> address_;
   int64_t offset_;
 };
 
diff --git a/src/transport/EventLoop.cpp b/src/transport/EventLoop.cpp
index 85ea113..5dce2d1 100644
--- a/src/transport/EventLoop.cpp
+++ b/src/transport/EventLoop.cpp
@@ -207,9 +207,9 @@ int BufferEvent::connect(const std::string& addr) {
   }
 
   try {
-    auto* sa = string2SocketAddress(addr);  // resolve domain
-    peer_addr_port_ = socketAddress2String(sa);
-    return bufferevent_socket_connect(buffer_event_, sa, sockaddr_size(sa));
+    auto* sa = StringToSockaddr(addr);  // resolve domain
+    peer_addr_port_ = SockaddrToString(sa);
+    return bufferevent_socket_connect(buffer_event_, sa, SockaddrSize(sa));
   } catch (const std::exception& e) {
     LOG_ERROR_NEW("can not connect to {}, {}", addr, e.what());
     return -1;
diff --git a/src/transport/SocketUtil.cpp b/src/transport/SocketUtil.cpp
index 0470088..f207263 100644
--- a/src/transport/SocketUtil.cpp
+++ b/src/transport/SocketUtil.cpp
@@ -16,73 +16,115 @@
  */
 #include "SocketUtil.h"
 
-#include <cstdlib>  // std::realloc
-#include <cstring>  // std::memset, std::memcpy
+#include <cstdlib>  // std::abort
+#include <cstring>  // std::memcpy, std::memset
 
-#include <sstream>
-#include <stdexcept>
-
-#include <event2/event.h>
+#include <iostream>
+#include <stdexcept>  // std::invalid_argument, std::runtime_error
+#include <string>
 
 #ifndef WIN32
-#include <netdb.h>
-#include <unistd.h>
+#include <arpa/inet.h>  // htons
+#include <unistd.h>     // gethostname
+#else
+#include <Winsock2.h>
 #endif
 
-#include "ByteOrder.h"
+#include <event2/event.h>
+
 #include "MQException.h"
-#include "UtilAll.h"
 
 namespace rocketmq {
 
-union sockaddr_union {
-  struct sockaddr_in sin;
-  struct sockaddr_in6 sin6;
-};
+std::unique_ptr<sockaddr_storage> SockaddrToStorage(const sockaddr* src) {
+  if (src == nullptr) {
+    return nullptr;
+  }
+  std::unique_ptr<sockaddr_storage> ss(new sockaddr_storage);
+  std::memcpy(ss.get(), src, SockaddrSize(src));
+  return ss;
+}
 
-thread_local static sockaddr_union sin_buf;
+thread_local sockaddr_storage ss_buffer;
 
-struct sockaddr* ipPort2SocketAddress(const ByteArray& ip, uint16_t port) {
-  if (ip.size() == 4) {
-    struct sockaddr_in* sin = &sin_buf.sin;
+sockaddr* IPPortToSockaddr(const ByteArray& ip, uint16_t port) {
+  sockaddr_storage* ss = &ss_buffer;
+  if (ip.size() == kIPv4AddrSize) {
+    auto* sin = reinterpret_cast<sockaddr_in*>(ss);
     sin->sin_family = AF_INET;
-    sin->sin_port = ByteOrderUtil::NorminalBigEndian<uint16_t>(port);
-    ByteOrderUtil::Read<decltype(sin->sin_addr)>(&sin->sin_addr, ip.array());
-    return (struct sockaddr*)sin;
-  } else if (ip.size() == 16) {
-    struct sockaddr_in6* sin6 = &sin_buf.sin6;
+    sin->sin_port = htons(port);
+    std::memcpy(&sin->sin_addr, ip.array(), kIPv4AddrSize);
+  } else if (ip.size() == kIPv6AddrSize) {
+    auto* sin6 = reinterpret_cast<sockaddr_in6*>(&ss);
     sin6->sin6_family = AF_INET6;
-    sin6->sin6_port = ByteOrderUtil::NorminalBigEndian<uint16_t>(port);
-    ByteOrderUtil::Read<decltype(sin6->sin6_addr)>(&sin6->sin6_addr, ip.array());
-    return (struct sockaddr*)sin6;
+    sin6->sin6_port = htons(port);
+    std::memcpy(&sin6->sin6_addr, ip.array(), kIPv6AddrSize);
+  } else {
+    throw std::invalid_argument("invalid ip size");
   }
-  return nullptr;
+  return reinterpret_cast<sockaddr*>(ss);
 }
 
-struct sockaddr* string2SocketAddress(const std::string& addr) {
+sockaddr* StringToSockaddr(const std::string& addr) {
+  if (addr.empty()) {
+    throw std::invalid_argument("invalid address");
+  }
+
   std::string::size_type start_pos = addr[0] == '/' ? 1 : 0;
-  auto colon_pos = addr.find_last_of(":");
-  std::string host = addr.substr(start_pos, colon_pos - start_pos);
-  std::string port = addr.substr(colon_pos + 1, addr.length() - colon_pos);
-  auto* sa = lookupNameServers(host);
-  if (sa != nullptr) {
-    if (sa->sa_family == AF_INET) {
-      auto* sin = (struct sockaddr_in*)sa;
-      sin->sin_port = htons((uint16_t)std::stoi(port));
-    } else {
-      auto* sin6 = (struct sockaddr_in6*)sa;
-      sin6->sin6_port = htons((uint16_t)std::stoi(port));
+  auto colon_pos = addr.find_last_of(':');
+  auto bracket_pos = addr.find_last_of(']');
+  if (bracket_pos != std::string::npos) {
+    // ipv6 address
+    if (addr.at(start_pos) != '[') {
+      throw std::invalid_argument("invalid address");
+    }
+    if (colon_pos == std::string::npos) {
+      throw std::invalid_argument("invalid address");
     }
+    if (colon_pos < bracket_pos) {
+      // have not port
+      if (bracket_pos != addr.size() - 1) {
+        throw std::invalid_argument("invalid address");
+      }
+      colon_pos = addr.size();
+    } else if (colon_pos != bracket_pos + 1) {
+      throw std::invalid_argument("invalid address");
+    }
+  } else if (colon_pos == std::string::npos) {
+    // have not port
+    colon_pos = addr.size();
   }
+
+  decltype(bracket_pos) fix_bracket = bracket_pos == std::string::npos ? 0 : 1;
+  std::string host = addr.substr(start_pos + fix_bracket, colon_pos - start_pos - fix_bracket
* 2);
+  auto* sa = LookupNameServers(host);
+
+  std::string port = colon_pos >= addr.size() ? "0" : addr.substr(colon_pos + 1);
+  uint32_t n = std::stoul(port);
+  if (n > std::numeric_limits<uint16_t>::max()) {
+    throw std::out_of_range("port is to large");
+  }
+  uint16_t port_num = htons(static_cast<uint16_t>(n));
+
+  if (sa->sa_family == AF_INET) {
+    auto* sin = reinterpret_cast<sockaddr_in*>(sa);
+    sin->sin_port = port_num;
+  } else if (sa->sa_family == AF_INET6) {
+    auto* sin6 = reinterpret_cast<sockaddr_in6*>(sa);
+    sin6->sin6_port = port_num;
+  } else {
+    throw std::runtime_error("don't support non-inet address families");
+  }
+
   return sa;
 }
 
 /**
- * converts an address from network format to presentation format (a.b.c.d)
+ * converts an address from network format to presentation format
  */
-std::string socketAddress2String(const struct sockaddr* addr) {
+std::string SockaddrToString(const sockaddr* addr) {
   if (nullptr == addr) {
-    return "127.0.0.1";
+    return std::string();
   }
 
   char buf[128];
@@ -90,38 +132,40 @@ std::string socketAddress2String(const struct sockaddr* addr) {
   uint16_t port = 0;
 
   if (addr->sa_family == AF_INET) {
-    auto* sin = (struct sockaddr_in*)addr;
-    if (nullptr != evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf))) {
-      address = buf;
+    const auto* sin = reinterpret_cast<const sockaddr_in*>(addr);
+    if (nullptr == evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf))) {
+      throw std::runtime_error("can not convert AF_INET address to text form");
     }
+    address = buf;
     port = ntohs(sin->sin_port);
   } else if (addr->sa_family == AF_INET6) {
-    auto* sin6 = (struct sockaddr_in6*)addr;
-    if (nullptr != evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf)))
{
-      address = buf;
+    const auto* sin6 = reinterpret_cast<const sockaddr_in6*>(addr);
+    if (nullptr == evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf)))
{
+      throw std::runtime_error("can not convert AF_INET6 address to text form");
     }
+    address = buf;
     port = ntohs(sin6->sin6_port);
   } else {
-    throw std::runtime_error("don't support non-inet Address families.");
+    throw std::runtime_error("don't support non-inet address families");
   }
 
-  if (!address.empty() && port != 0) {
-    if (addr->sa_family == AF_INET6) {
-      address = "[" + address + "]";
-    }
-    address += ":" + UtilAll::to_string(port);
+  if (addr->sa_family == AF_INET6) {
+    address = "[" + address + "]";
+  }
+  if (port != 0) {
+    address += ":" + std::to_string(port);
   }
 
   return address;
 }
 
-struct sockaddr* lookupNameServers(const std::string& hostname) {
+sockaddr* LookupNameServers(const std::string& hostname) {
   if (hostname.empty()) {
-    return nullptr;
+    throw std::invalid_argument("invalid hostname");
   }
 
-  struct evutil_addrinfo hints;
-  struct evutil_addrinfo* answer = NULL;
+  evutil_addrinfo hints;
+  evutil_addrinfo* answer = nullptr;
 
   /* Build the hints to tell getaddrinfo how to act. */
   std::memset(&hints, 0, sizeof(hints));
@@ -131,48 +175,63 @@ struct sockaddr* lookupNameServers(const std::string& hostname)
{
   hints.ai_flags = EVUTIL_AI_ADDRCONFIG; /* Only return addresses we can use. */
 
   // Look up the hostname.
-  int err = evutil_getaddrinfo(hostname.c_str(), NULL, &hints, &answer);
+  int err = evutil_getaddrinfo(hostname.c_str(), nullptr, &hints, &answer);
   if (err != 0) {
-    std::string info = "Failed to resolve host name(" + hostname + "): " + evutil_gai_strerror(err);
+    std::string info = "Failed to resolve hostname(" + hostname + "): " + evutil_gai_strerror(err);
     THROW_MQEXCEPTION(UnknownHostException, info, -1);
   }
 
-  struct sockaddr* sin = nullptr;
+  sockaddr_storage* ss = &ss_buffer;
 
-  for (struct evutil_addrinfo* ai = answer; ai != NULL; ai = ai->ai_next) {
+  bool hit = false;
+  for (struct evutil_addrinfo* ai = answer; ai != nullptr; ai = ai->ai_next) {
     auto* ai_addr = ai->ai_addr;
     if (ai_addr->sa_family != AF_INET && ai_addr->sa_family != AF_INET6) {
       continue;
     }
-    sin = (struct sockaddr*)&sin_buf;
-    std::memcpy(sin, ai_addr, sockaddr_size(ai_addr));
+    std::memcpy(ss, ai_addr, SockaddrSize(ai_addr));
+    hit = true;
     break;
   }
 
   evutil_freeaddrinfo(answer);
 
-  return sin;
+  if (!hit) {
+    throw std::runtime_error("hostname is non-inet address family");
+  }
+
+  return reinterpret_cast<sockaddr*>(ss);
 }
 
-struct sockaddr* copySocketAddress(struct sockaddr* dst, const struct sockaddr* src) {
-  if (src != nullptr) {
-    if (dst == nullptr || dst->sa_family != src->sa_family) {
-      dst = (struct sockaddr*)std::realloc(dst, sizeof(union sockaddr_union));
-    }
-    std::memcpy(dst, src, sockaddr_size(src));
-  } else {
-    free(dst);
-    dst = nullptr;
+sockaddr* GetSelfIP() {
+  try {
+    return LookupNameServers(GetLocalHostname());
+  } catch (const UnknownHostException& e) {
+    return LookupNameServers("localhost");
   }
-  return dst;
 }
 
-uint64_t h2nll(uint64_t v) {
-  return ByteOrderUtil::NorminalBigEndian(v);
+const std::string& GetLocalHostname() {
+  static std::string local_hostname = []() {
+    char name[1024];
+    if (::gethostname(name, sizeof(name)) != 0) {
+      return std::string();
+    }
+    return std::string(name);
+  }();
+  return local_hostname;
 }
 
-uint64_t n2hll(uint64_t v) {
-  return ByteOrderUtil::NorminalBigEndian(v);
+const std::string& GetLocalAddress() {
+  static std::string local_address = []() {
+    try {
+      return SockaddrToString(GetSelfIP());
+    } catch (std::exception& e) {
+      std::cerr << e.what() << std::endl;
+      std::abort();
+    }
+  }();
+  return local_address;
 }
 
 }  // namespace rocketmq
diff --git a/src/transport/SocketUtil.h b/src/transport/SocketUtil.h
index be18a38..6bfe535 100644
--- a/src/transport/SocketUtil.h
+++ b/src/transport/SocketUtil.h
@@ -17,13 +17,14 @@
 #ifndef ROCKETMQ_TRANSPORT_SOCKETUTIL_H_
 #define ROCKETMQ_TRANSPORT_SOCKETUTIL_H_
 
-#include <cstdint>
+#include <cstddef>  // size_t
+#include <cstdint>  // uint16_t
 
 #include <string>
 
 #ifndef WIN32
-#include <arpa/inet.h>
-#include <sys/socket.h>
+#include <netinet/in.h>  // sockaddr_in, AF_INET, sockaddr_in6, AF_INET6
+#include <sys/socket.h>  // sockaddr, sockaddr_storage
 #else
 #include <Winsock2.h>
 #pragma comment(lib, "ws2_32.lib")
@@ -33,21 +34,41 @@
 
 namespace rocketmq {
 
-static inline size_t sockaddr_size(const struct sockaddr* sa) {
-  return sa->sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
+const size_t kIPv4AddrSize = 4;
+const size_t kIPv6AddrSize = 16;
+
+static inline size_t IpaddrSize(const sockaddr* sa) {
+  assert(sa != nullptr);
+  assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6);
+  return sa->sa_family == AF_INET6 ? kIPv6AddrSize : kIPv4AddrSize;
+}
+
+static inline size_t IpaddrSize(const sockaddr_storage* ss) {
+  return IpaddrSize(reinterpret_cast<const sockaddr*>(ss));
+}
+
+static inline size_t SockaddrSize(const sockaddr* sa) {
+  assert(sa != nullptr);
+  assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6);
+  return sa->sa_family == AF_INET6 ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
+}
+
+static inline size_t SockaddrSize(const sockaddr_storage* ss) {
+  return SockaddrSize(reinterpret_cast<const sockaddr*>(ss));
 }
 
-struct sockaddr* ipPort2SocketAddress(const ByteArray& ip, uint16_t port);
+std::unique_ptr<sockaddr_storage> SockaddrToStorage(const sockaddr* src);
 
-struct sockaddr* string2SocketAddress(const std::string& addr);
-std::string socketAddress2String(const struct sockaddr* addr);
+sockaddr* IPPortToSockaddr(const ByteArray& ip, uint16_t port);
 
-struct sockaddr* lookupNameServers(const std::string& hostname);
+sockaddr* StringToSockaddr(const std::string& addr);
+std::string SockaddrToString(const sockaddr* addr);
 
-struct sockaddr* copySocketAddress(struct sockaddr* dst, const struct sockaddr* src);
+sockaddr* LookupNameServers(const std::string& hostname);
 
-uint64_t h2nll(uint64_t v);
-uint64_t n2hll(uint64_t v);
+sockaddr* GetSelfIP();
+const std::string& GetLocalHostname();
+const std::string& GetLocalAddress();
 
 }  // namespace rocketmq
 

Mime
View raw message