kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [kudu] branch master updated: [client] support resolve one master address to multiple addresses
Date Fri, 17 Jan 2020 20:19:42 GMT
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf7e45  [client] support resolve one master address to multiple addresses
4bf7e45 is described below

commit 4bf7e457b054d9bfb002e3971113e2152b92951a
Author: zhangyifan27 <chinazhangyifan@163.com>
AuthorDate: Tue Dec 31 16:06:43 2019 +0800

    [client] support resolve one master address to multiple addresses
    
    The master addresses of kudu client are fixed during runtime, we need to
    restart the client if we want to change the master addresses.
    
    In some network settings we can map a domain name to multiple ip addresses,
    if we configure the client with such a domain name as a 'master address',
    resolve it and try to connect each ip address, we can easily change the master
    addresses by network settings modifications.
    
    Change-Id: Ie242bc1331596902fa16f1c6d1b439d78b73977a
    Reviewed-on: http://gerrit.cloudera.org:8080/15036
    Reviewed-by: Adar Dembo <adar@cloudera.com>
    Tested-by: Adar Dembo <adar@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    |  9 ++---
 .../org/apache/kudu/client/ConnectToCluster.java   | 41 +++++++++++++++++++---
 .../main/java/org/apache/kudu/util/NetUtil.java    | 26 +++++++++++---
 .../apache/kudu/client/TestConnectToCluster.java   |  1 +
 src/kudu/client/client-internal.cc                 | 13 ++++---
 src/kudu/util/net/sockaddr.cc                      | 11 ++++++
 src/kudu/util/net/sockaddr.h                       |  5 +++
 7 files changed, 86 insertions(+), 20 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index a399366..e41e6c9 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -417,22 +417,19 @@ public class AsyncKuduClient implements AutoCloseable {
    * Get a proxy to send RPC calls to Kudu master at the specified end-point.
    *
    * @param hostPort master end-point
+   * @param inetAddress master ip-address
    * @param credentialsPolicy credentials policy to use for the connection negotiation to
the target
    *                          master server
    * @return the proxy object bound to the target master
    */
-  @Nullable
+  @Nonnull
   RpcProxy newMasterRpcProxy(HostAndPort hostPort,
+                             InetAddress inetAddress,
                              Connection.CredentialsPolicy credentialsPolicy) {
     // We should have a UUID to construct ServerInfo for the master, but we have a chicken
     // and egg problem, we first need to communicate with the masters to find out about them,
     // and that's what we're trying to do. The UUID is just used for logging and cache key,
     // so instead we just use concatenation of master host and port, prefixed with "master-".
-    final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
-    if (inetAddress == null) {
-      // TODO(todd): should we log the resolution failure? throw an exception?
-      return null;
-    }
     return newRpcProxy(
         new ServerInfo(getFakeMasterUuid(hostPort),
                        hostPort,
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 917a750..217b768 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -17,6 +17,7 @@
 
 package org.apache.kudu.client;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -40,6 +41,7 @@ import org.apache.kudu.consensus.Metadata.RaftPeerPB.Role;
 import org.apache.kudu.master.Master.ConnectToMasterResponsePB;
 import org.apache.kudu.rpc.RpcHeader.ErrorStatusPB.RpcErrorCodePB;
 import org.apache.kudu.util.NetUtil;
+import org.apache.kudu.util.Pair;
 
 /**
  * Class responsible for fanning out RPCs to all of the configured masters,
@@ -52,7 +54,6 @@ final class ConnectToCluster {
 
   private final List<HostAndPort> masterAddrs;
   private final Deferred<ConnectToClusterResponse> responseD;
-  private final int numMasters;
 
   // Used to avoid calling 'responseD' twice.
   private final AtomicBoolean responseDCalled = new AtomicBoolean(false);
@@ -70,6 +71,8 @@ final class ConnectToCluster {
    */
   private AtomicReference<List<HostPortPB>> knownMasters = new AtomicReference<>();
 
+  private int numMasters;
+
   /**
    * Creates an object that holds the state needed to retrieve master table's location.
    * @param masterAddrs Addresses of all master replicas that we want to retrieve the
@@ -78,7 +81,6 @@ final class ConnectToCluster {
   ConnectToCluster(List<HostAndPort> masterAddrs) {
     this.masterAddrs = masterAddrs;
     this.responseD = new Deferred<>();
-    this.numMasters = masterAddrs.size();
   }
 
   @InterfaceAudience.LimitedPrivate("Test")
@@ -91,6 +93,15 @@ final class ConnectToCluster {
     return exceptionsReceived;
   }
 
+  /**
+   * Set the number of masters.
+   * Only used for testing.
+   * @param numMasters the number of masters.
+   */
+  public void setNumMasters(int numMasters) {
+    this.numMasters = numMasters;
+  }
+
   private static Deferred<ConnectToMasterResponsePB> connectToMaster(
       final KuduTable masterTable,
       final RpcProxy masterProxy,
@@ -167,11 +178,31 @@ final class ConnectToCluster {
     // waits until it gets a good response before firing the returned
     // deferred.
     List<Deferred<ConnectToMasterResponsePB>> deferreds = new ArrayList<>();
+    List<Pair<InetAddress, HostAndPort>> masterAddrsWithNames = new ArrayList<>();
     for (HostAndPort hostAndPort : masterAddrs) {
+      InetAddress[] inetAddrs = NetUtil.getAllInetAddresses(hostAndPort.getHost());
+      if (inetAddrs != null) {
+        if (inetAddrs.length > 1) {
+          LOG.info("Specified master server address {} resolved to multiple IPs {}. " +
+                   "Connecting to each one of them.", hostAndPort.getHost(), inetAddrs);
+        }
+        for (InetAddress addr : inetAddrs) {
+          masterAddrsWithNames.add(
+              new Pair<>(addr, new HostAndPort(addr.getHostAddress(), hostAndPort.getPort())));
+        }
+      } else {
+        masterAddrsWithNames.add(new Pair<>(null, hostAndPort));
+      }
+    }
+
+    this.numMasters = masterAddrsWithNames.size();
+    for (Pair<InetAddress, HostAndPort> masterPair : masterAddrsWithNames) {
+      InetAddress addr = masterPair.getFirst();
+      HostAndPort hostAndPort = masterPair.getSecond();
       Deferred<ConnectToMasterResponsePB> d;
-      AsyncKuduClient client = masterTable.getAsyncClient();
-      RpcProxy proxy = client.newMasterRpcProxy(hostAndPort, credentialsPolicy);
-      if (proxy != null) {
+      if (addr != null) {
+        AsyncKuduClient client = masterTable.getAsyncClient();
+        RpcProxy proxy = client.newMasterRpcProxy(hostAndPort, addr, credentialsPolicy);
         d = connectToMaster(masterTable, proxy, parentRpc, client.getTimer(), defaultTimeoutMs);
       } else {
         String message = "Couldn't resolve this master's address " + hostAndPort.toString();
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
index 65d9b59..dd54a4e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/NetUtil.java
@@ -21,6 +21,7 @@ import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.base.Functions;
@@ -99,16 +100,33 @@ public class NetUtil {
    * or {@code null} if the address couldn't be resolved
    */
   public static InetAddress getInetAddress(final String host) {
+    InetAddress[] addrs = getAllInetAddresses(host);
+    if (addrs != null && addrs.length > 0) {
+      return addrs[0];
+    }
+    return null;
+  }
+
+  /**
+   * Gets a hostname or an IP address and returns an array of InetAddresses.
+   * <p>
+   * <strong>This method can block</strong> as there is no API for
+   * asynchronous DNS resolution in the JDK.
+   * @param host the hostname to resolve
+   * @return an array of InetAddresses for the given hostname,
+   * or {@code null} if the address couldn't be resolved
+   */
+  public static InetAddress[] getAllInetAddresses(final String host) {
     final long start = System.nanoTime();
     try {
-      InetAddress ip = InetAddress.getByName(host);
+      InetAddress[] ipAddrs = InetAddress.getAllByName(host);
       long latency = System.nanoTime() - start;
       if (latency > 500000/*ns*/ && LOG.isDebugEnabled()) {
-        LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+        LOG.debug("Resolved IP of `{}' to {} in {}ns", host, ipAddrs, latency);
       } else if (latency >= 3000000/*ns*/) {
-        LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ip, latency);
+        LOG.warn("Slow DNS lookup! Resolved IP of `{}' to {} in {}ns", host, ipAddrs, latency);
       }
-      return ip;
+      return ipAddrs;
     } catch (UnknownHostException e) {
       LOG.error("Failed to resolve the IP of `{}' in {}ns", host, (System.nanoTime() - start));
       return null;
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
index 256f4a9..492b5a3 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
@@ -248,6 +248,7 @@ public class TestConnectToCluster {
     // add the responses. We then check for the right response.
 
     ConnectToCluster grrm = new ConnectToCluster(MASTERS);
+    grrm.setNumMasters(MASTERS.size());
 
     Callback<Void, ConnectToMasterResponsePB> cb0 = grrm.callbackForNode(MASTERS.get(0));
     Callback<Void, ConnectToMasterResponsePB> cb1 = grrm.callbackForNode(MASTERS.get(1));
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 75c7200..d74ea00 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -35,6 +35,7 @@
 #include <boost/function.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
+#include <google/protobuf/stubs/common.h>
 
 #include "kudu/client/authz_token_cache.h"
 #include "kudu/client/master_proxy_rpc.h"
@@ -682,12 +683,14 @@ void KuduClient::Data::ConnectToClusterAsync(KuduClient* client,
       return;
     }
     if (addrs.size() > 1) {
-      KLOG_EVERY_N_SECS(WARNING, 1)
-          << Substitute("Specified master server address '$0' resolved to "
-                        "multiple IPs. Using $1",
-                        master_server_addr, addrs[0].ToString());
+      KLOG_EVERY_N_SECS(INFO, 1)
+          << Substitute("Specified master server address '$0' resolved to multiple
IPs $1. "
+                        "Connecting to each one of them.",
+                        master_server_addr, Sockaddr::ToCommaSeparatedString(addrs));
+    }
+    for (const Sockaddr& addr : addrs) {
+      master_addrs_with_names.emplace_back(addr, hp.host());
     }
-    master_addrs_with_names.emplace_back(addrs[0], hp.host());
   }
 
   // This ensures that no more than one ConnectToClusterRpc of each credentials
diff --git a/src/kudu/util/net/sockaddr.cc b/src/kudu/util/net/sockaddr.cc
index cc100c6..8e445f9 100644
--- a/src/kudu/util/net/sockaddr.cc
+++ b/src/kudu/util/net/sockaddr.cc
@@ -27,11 +27,13 @@
 
 #include "kudu/gutil/hash/builtin_type_hash.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/stopwatch.h"
 
 using std::string;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
@@ -130,4 +132,13 @@ Status Sockaddr::LookupHostname(string* hostname) const {
   return Status::OK();
 }
 
+string Sockaddr::ToCommaSeparatedString(const std::vector<Sockaddr>& addrs) {
+  vector<string> addrs_str;
+  addrs_str.reserve(addrs.size());
+  for (const Sockaddr& addr : addrs) {
+    addrs_str.push_back(addr.ToString());
+  }
+  return JoinStrings(addrs_str, ",");
+}
+
 } // namespace kudu
diff --git a/src/kudu/util/net/sockaddr.h b/src/kudu/util/net/sockaddr.h
index dffd151..e968212 100644
--- a/src/kudu/util/net/sockaddr.h
+++ b/src/kudu/util/net/sockaddr.h
@@ -22,6 +22,7 @@
 #include <cstdint>
 #include <functional>
 #include <string>
+#include <vector>
 
 #include "kudu/util/status.h"
 
@@ -75,6 +76,10 @@ class Sockaddr {
   // Does reverse DNS lookup of the address and stores it in hostname.
   Status LookupHostname(std::string* hostname) const;
 
+  // Takes a vector of Sockaddr objects and returns a comma separated
+  // string containing ip addresses.
+  static std::string ToCommaSeparatedString(const std::vector<Sockaddr>& addrs);
+
   // the default auto-generated copy constructor is fine here
  private:
   struct sockaddr_in addr_;


Mime
View raw message