helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hu...@apache.org
Subject [helix] 27/44: implementation of CustomRestClient (post request and get health checks)
Date Sat, 25 May 2019 01:20:01 GMT
This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit a4f5faae717571468afc191cd836e678a0567f3a
Author: Yi Wang <ywang4@linkedin.com>
AuthorDate: Fri Apr 19 17:27:19 2019 -0700

    implementation of CustomRestClient (post request and get health checks)
    
    RB=1638858
    G=helix-reviewers
    R=cjerian
    A=jxue
    
    Signed-off-by: Hunter Lee <hulee@linkedin.com>
---
 helix-rest/pom.xml                                 |   5 +
 .../apache/helix/rest/client/CustomRestClient.java |  27 ++--
 .../helix/rest/client/CustomRestClientFactory.java |  26 +++-
 .../helix/rest/client/CustomRestClientImpl.java    | 120 ++++++++++++++++-
 .../helix/rest/server/service/InstanceService.java |   3 +-
 .../rest/server/service/InstanceServiceImpl.java   |  15 ++-
 .../helix/rest/client/TestCustomRestClient.java    | 149 +++++++++++++++++++++
 7 files changed, 320 insertions(+), 25 deletions(-)

diff --git a/helix-rest/pom.xml b/helix-rest/pom.xml
index 390d578..baaf1f5 100644
--- a/helix-rest/pom.xml
+++ b/helix-rest/pom.xml
@@ -67,6 +67,11 @@ under the License.
       <version>3.8.1</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.5.8</version>
+    </dependency>
+    <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-server</artifactId>
       <version>9.1.0.RC0</version>
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
index f8ce0e5..afa4ef3 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClient.java
@@ -19,25 +19,34 @@ package org.apache.helix.rest.client;
  * under the License.
  */
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
-
 /**
- * Interface for interacting with client side rest endpoints
+ * Interacting with participant side to query for its health checks
  */
 public interface CustomRestClient {
   /**
    * Get stoppable check result on instance
-   *
+   * @param baseUrl the base url of the participant
    * @param customPayloads generic payloads required from client side and helix only works
as proxy
-   * @return a map where key is custom stoppable check name and boolean value indicates if
the check succeeds
+   * @return a map where key is custom stoppable check name and boolean value indicates if
the check
+   *         succeeds
+   * @throws IOException
    */
-  Map<String, Boolean> getInstanceStoppableCheck(Map<String, String> customPayloads);
+  Map<String, Boolean> getInstanceStoppableCheck(String baseUrl, Map<String, String>
customPayloads)
+      throws IOException;
+
   /**
    * Get stoppable check result on partition
-   *
+   * @param baseUrl the base url of the participant
+   * @param partitions a list of partitions maintained by the participant
    * @param customPayloads generic payloads required from client side and helix only works
as proxy
-   * @return a map where key is custom stoppable check name and boolean value indicates if
the check succeeds
-  */
-  Map<String, Boolean> getPartitionStoppableCheck(Map<String, String> customPayloads);
+   * @return a map where key is partition name and boolean value indicates if the partition
is
+   *         healthy
+   * @throws IOException
+   */
+  Map<String, Boolean> getPartitionStoppableCheck(String baseUrl, List<String>
partitions,
+      Map<String, String> customPayloads) throws IOException;
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java
index acbfef7..362818c 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientFactory.java
@@ -19,17 +19,33 @@ package org.apache.helix.rest.client;
  * under the License.
  */
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * The memory efficient factory to create instances for {@link CustomRestClient}
  */
 public class CustomRestClientFactory {
-  private static final String INSTANCE_HEALTH_STATUS = "/instanceHealthStatus";
-  private static final String PARTITION_HEALTH_STATUS = "/partitionHealthStatus";
+  private static final Logger LOG = LoggerFactory.getLogger(CustomRestClientFactory.class);
+
+  private static CustomRestClient INSTANCE = null;
 
-  private CustomRestClientFactory() {}
+  private CustomRestClientFactory() {
+  }
 
   public static CustomRestClient get(String jsonContent) {
-    //TODO: add implementation
-    return new CustomRestClientImpl();
+    if (INSTANCE == null) {
+      synchronized (CustomRestClientFactory.class) {
+        if (INSTANCE == null) {
+          try {
+            INSTANCE = new CustomRestClientImpl();
+          } catch (Exception e) {
+            LOG.error("Exception when initializing CustomRestClient", e);
+          }
+        }
+      }
+    }
+
+    return INSTANCE;
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
index 133a338..0db5a9b 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java
@@ -19,20 +19,130 @@ package org.apache.helix.rest.client;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 
-//TODO: add implementation details
 class CustomRestClientImpl implements CustomRestClient {
+  private static final Logger LOG = LoggerFactory.getLogger(CustomRestClient.class);
+
+  // postfix used to append at the end of base url
+  private static final String INSTANCE_HEALTH_STATUS = "/instanceHealthStatus";
+  private static final String PARTITION_HEALTH_STATUS = "/partitionHealthStatus";
+
+  private static final String IS_HEALTHY_FIELD = "IS_HEALTHY";
+  private static final String PARTITIONS = "partitions";
+  private static final String ACCEPT_CONTENT_TYPE = "application/json";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private HttpClient _httpClient;
+
+  private interface JsonConverter {
+    Map<String, Boolean> convert(JsonNode jsonNode);
+  }
+
+  /**
+   * TODO: create Config to initialize SSLContext for Https endpoint
+   * Override the constructor if https endpoint is expected
+   */
+  public CustomRestClientImpl() {
+    _httpClient = HttpClients.createDefault();
+  }
+
+  public CustomRestClientImpl(HttpClient httpClient) {
+    _httpClient = httpClient;
+  }
 
   @Override
-  public Map<String, Boolean> getInstanceStoppableCheck(Map<String, String> customPayloads)
{
-    return new HashMap<>();
+  public Map<String, Boolean> getInstanceStoppableCheck(String baseUrl,
+      Map<String, String> customPayloads) throws IOException {
+    // example url: http://<baseUrl>/instanceHealthStatus, assuming the base url already
directly
+    // queries at the instance
+    String url = baseUrl + INSTANCE_HEALTH_STATUS;
+    JsonConverter jsonConverter = jsonNode -> {
+      Map<String, Boolean> result = new HashMap<>();
+      jsonNode.fields()
+          .forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().asBoolean()));
+      return result;
+    };
+    return handleResponse(post(url, customPayloads), jsonConverter);
   }
 
   @Override
-  public Map<String, Boolean> getPartitionStoppableCheck(Map<String, String>
customPayloads) {
-    return new HashMap<>();
+  public Map<String, Boolean> getPartitionStoppableCheck(String baseUrl, List<String>
partitions,
+      Map<String, String> customPayloads) throws IOException {
+    /*
+     * example url: http://<baseUrl>/partitionHealthStatus -d {
+     * "partitions" : ["p1", "p3", "p9"],
+     * "<key>": "<value>",
+     * ...
+     * }
+     */
+    String url = baseUrl + PARTITION_HEALTH_STATUS;
+    // To avoid ImmutableMap as parameter
+    Map<String, String> payLoads = new HashMap<>(customPayloads);
+    // Add the entry: "partitions" : ["p1", "p3", "p9"]
+    payLoads.put(PARTITIONS, partitions.toString());
+    JsonConverter jsonConverter = jsonNode -> {
+      Map<String, Boolean> result = new HashMap<>();
+      jsonNode.fields()
+          .forEachRemaining(kv -> result.put(kv.getKey(), kv.getValue().get(IS_HEALTHY_FIELD).asBoolean()));
+      return result;
+    };
+    return handleResponse(post(url, payLoads), jsonConverter);
+  }
+
+  @VisibleForTesting
+  protected JsonNode getJsonObject(HttpResponse httpResponse) throws IOException {
+    HttpEntity httpEntity = httpResponse.getEntity();
+    String str = EntityUtils.toString(httpEntity);
+    return OBJECT_MAPPER.readTree(str);
+  }
+
+  private Map<String, Boolean> handleResponse(HttpResponse httpResponse,
+      JsonConverter jsonConverter) throws IOException {
+    int status = httpResponse.getStatusLine().getStatusCode();
+    if (status == 200) {
+      return jsonConverter.convert(getJsonObject(httpResponse));
+    } else {
+      throw new ClientProtocolException("Unexpected response status: " + status + ", reason:
"
+          + httpResponse.getStatusLine().getReasonPhrase());
+    }
+  }
+
+  private HttpResponse post(String url, Map<String, String> payloads) throws IOException
{
+    List<NameValuePair> params = payloads.entrySet().stream()
+        .map(entry -> new BasicNameValuePair(entry.getKey(), entry.getValue()))
+        .collect(Collectors.toList());
+    try {
+      HttpPost postRequest = new HttpPost(url);
+      postRequest.setHeader("Accept", ACCEPT_CONTENT_TYPE);
+      postRequest.setEntity(new UrlEncodedFormEntity(params, "UTF-8"));
+      LOG.info("Executing request {}", postRequest.getRequestLine());
+      return _httpClient.execute(postRequest);
+    } catch (IOException e) {
+      LOG.error("Failed to perform customized health check. Is participant endpoint {} available?",
+          url, e);
+      throw e;
+    }
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
index 471a4ec..3e642cb 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceService.java
@@ -19,6 +19,7 @@ package org.apache.helix.rest.server.service;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -97,5 +98,5 @@ public interface InstanceService {
       List<HealthCheck> healthChecks);
 
   StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName,
-      String jsonContent);
+      String jsonContent) throws IOException;
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
index 8db4d42..a01912a 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.helix.rest.server.service;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -151,15 +152,19 @@ public class InstanceServiceImpl implements InstanceService {
 
   @Override
   public StoppableCheck checkSingleInstanceStoppable(String clusterId, String instanceName,
-      String jsonContent) {
+      String jsonContent) throws IOException {
     // TODO reduce GC by dependency injection
     Map<String, Boolean> helixStoppableCheck = getInstanceHealthStatus(clusterId,
         instanceName, InstanceService.HealthCheck.STOPPABLE_CHECK_LIST);
     CustomRestClient customClient = CustomRestClientFactory.get(jsonContent);
-    // TODO add the json content parse logic
-    Map<String, Boolean> customStoppableCheck =
-        customClient.getInstanceStoppableCheck(Collections.emptyMap());
-    return StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck);
+    try {
+      Map<String, Boolean> customStoppableCheck =
+        customClient.getInstanceStoppableCheck("", Collections.emptyMap());
+      return StoppableCheck.mergeStoppableChecks(helixStoppableCheck, customStoppableCheck);
+    } catch (IOException e) {
+      LOG.error("Failed to perform customized health check for {}/{}", clusterId, instanceName,
e);
+      throw e;
+    }
   }
 
   public PartitionHealth generatePartitionHealthMapFromZK() {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
new file mode 100644
index 0000000..c43578b
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java
@@ -0,0 +1,149 @@
+package org.apache.helix.rest.client;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.junit.Assert;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+
+public class TestCustomRestClient {
+  private static final String HTTP_LOCALHOST = "http://localhost:1000";
+  @Mock
+  HttpClient _httpClient;
+
+  @BeforeMethod
+  public void init() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testGetInstanceStoppableCheck() throws IOException {
+    MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
+    String jsonResponse = "{\n" + "   \"check1\": \"false\",\n" + "   \"check2\": \"true\"\n"
+ "}";
+
+    HttpResponse httpResponse = mock(HttpResponse.class);
+    StatusLine statusLine = mock(StatusLine.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    customRestClient.setJsonResponse(jsonResponse);
+    when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+
+    Map<String, Boolean> healthCheck =
+        customRestClient.getInstanceStoppableCheck(HTTP_LOCALHOST, Collections.emptyMap());
+    Assert.assertFalse(healthCheck.get("check1"));
+    Assert.assertTrue(healthCheck.get("check2"));
+  }
+
+  @Test(expectedExceptions = ClientProtocolException.class)
+  public void testGetInstanceStoppableCheck_when_url_404() throws IOException {
+    MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
+    HttpResponse httpResponse = mock(HttpResponse.class);
+    StatusLine statusLine = mock(StatusLine.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+
+    customRestClient.getInstanceStoppableCheck(HTTP_LOCALHOST, Collections.emptyMap());
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testGetInstanceStoppableCheck_when_response_empty() throws IOException {
+    MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
+    HttpResponse httpResponse = mock(HttpResponse.class);
+    StatusLine statusLine = mock(StatusLine.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+    customRestClient.setJsonResponse("");
+
+    customRestClient.getInstanceStoppableCheck(HTTP_LOCALHOST, Collections.emptyMap());
+  }
+
+  @Test
+  public void testGetPartitionStoppableCheck() throws IOException {
+    MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
+    String jsonResponse = "\n" + "{\n" + "   \"db1\": {\n" + "      \"IS_HEALTHY\": \"false\"\n"
+        + "   },\n" + "   \"db0\": {\n" + "      \"IS_HEALTHY\": \"true\"\n" + "   }\n" +
"}";
+
+    HttpResponse httpResponse = mock(HttpResponse.class);
+    StatusLine statusLine = mock(StatusLine.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    customRestClient.setJsonResponse(jsonResponse);
+    when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+
+    Map<String, Boolean> partitionHealth = customRestClient.getPartitionStoppableCheck(HTTP_LOCALHOST,
+        ImmutableList.of("db0", "db1"), Collections.emptyMap());
+
+    Assert.assertTrue(partitionHealth.get("db0"));
+    Assert.assertFalse(partitionHealth.get("db1"));
+  }
+
+  @Test(expectedExceptions = ClientProtocolException.class)
+  public void testGetPartitionStoppableCheck_when_url_404() throws IOException {
+    MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
+
+    HttpResponse httpResponse = mock(HttpResponse.class);
+    StatusLine statusLine = mock(StatusLine.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+
+    customRestClient.getPartitionStoppableCheck(HTTP_LOCALHOST,
+        ImmutableList.of("db0", "db1"), Collections.emptyMap());
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testGetPartitionStoppableCheck_when_response_empty() throws IOException {
+    MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
+    HttpResponse httpResponse = mock(HttpResponse.class);
+    StatusLine statusLine = mock(StatusLine.class);
+
+    when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_NOT_FOUND);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);
+    customRestClient.setJsonResponse("");
+
+    customRestClient.getPartitionStoppableCheck(HTTP_LOCALHOST,
+        ImmutableList.of("db0", "db1"), Collections.emptyMap());
+  }
+
+  private class MockCustomRestClient extends CustomRestClientImpl {
+    private String _jsonResponse = "";
+
+    MockCustomRestClient(HttpClient mockHttpClient) {
+      super(mockHttpClient);
+    }
+
+    void setJsonResponse(String response) {
+      _jsonResponse = response;
+    }
+
+    @Override
+    protected JsonNode getJsonObject(HttpResponse httpResponse) throws IOException {
+      return new ObjectMapper().readTree(_jsonResponse);
+    }
+  }
+}


Mime
View raw message