metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] nickwallen commented on a change in pull request #1254: METRON-1849 Elasticsearch Index Write Functionality Should be Shared
Date Tue, 11 Dec 2018 13:17:46 GMT
nickwallen commented on a change in pull request #1254: METRON-1849 Elasticsearch Index Write
Functionality Should be Shared
URL: https://github.com/apache/metron/pull/1254#discussion_r240605171
 
 

 ##########
 File path: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
 ##########
 @@ -17,103 +17,90 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
 import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.AlertComment;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
+
+import static java.lang.String.format;
+
 public class ElasticsearchUpdateDao implements UpdateDao {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
-  private WriteRequest.RefreshPolicy refreshPolicy;
+  private ElasticsearchBulkDocumentWriter<Document> documentWriter;
 
   public ElasticsearchUpdateDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchRetrieveLatestDao searchDao) {
-    this.client = client;
     this.accessConfig = accessConfig;
     this.retrieveLatestDao = searchDao;
-    this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+    this.documentWriter = new ElasticsearchBulkDocumentWriter<>(client)
+            .withRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
   }
 
   @Override
   public Document update(Document update, Optional<String> index) throws IOException
{
-    String indexPostfix = ElasticsearchUtils
-        .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
-    String sensorType = update.getSensorType();
-    String indexName = getIndexName(update, index, indexPostfix);
-
-    IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
-    try {
-      IndexResponse response = client.getHighLevelClient().index(indexRequest);
-
-      ShardInfo shardInfo = response.getShardInfo();
-      int failed = shardInfo.getFailed();
-      if (failed > 0) {
-        throw new IOException(
-            "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures()));
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-    return update;
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    updates.put(update, index);
+
+    Map<Document, Optional<String>> results = batchUpdate(updates);
+    return results.keySet().iterator().next();
   }
 
   @Override
   public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>>
updates) throws IOException {
-    String indexPostfix = ElasticsearchUtils
-        .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
-
-    BulkRequest bulkRequestBuilder = new BulkRequest();
-    bulkRequestBuilder.setRefreshPolicy(refreshPolicy);
-
-    // Get the indices we'll actually be using for each Document.
-    for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet())
{
-      Document update = updateEntry.getKey();
-      String sensorType = update.getSensorType();
-      String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix);
-      IndexRequest indexRequest = buildIndexRequest(
-          update,
-          sensorType,
-          indexName
-      );
-
-      bulkRequestBuilder.add(indexRequest);
+    Map<String, Object> globalConfig = accessConfig.getGlobalConfigSupplier().get();
+    String indexPostfix = ElasticsearchUtils.getIndexFormat(globalConfig).format(new Date());
+
+    for (Map.Entry<Document, Optional<String>> entry : updates.entrySet()) {
+      Document document = entry.getKey();
+      Optional<String> optionalIndex = entry.getValue();
+      String indexName = optionalIndex.orElse(getIndexName(document, indexPostfix));
+      documentWriter.addDocument(document, indexName);
     }
 
-    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder);
-    if (bulkResponse.hasFailures()) {
-      LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage());
-      throw new IOException(
-          "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage());
+    // write the documents. if any document fails, raise an exception.
+    BulkDocumentWriterResults<Document> results = documentWriter.write();
+    int failures = results.getFailures().size();
+    if(failures > 0) {
+      int successes = results.getSuccesses().size();
+      String msg = format("Failed to update all documents; %d successes, %d failures", successes,
failures);
+      LOG.error(msg);
+
+      // log each individual failure
+      for(WriteFailure<Document> failure: results.getFailures()) {
+        LOG.error(failure.getMessage(), failure.getCause());
+      }
+
+      // raise an exception
+      Throwable cause = results.getFailures().get(0).getCause();
 
 Review comment:
   The current behavior is for the `ElasticsearchUpdateDao` to throw an exception if any of
the writes fail.  Inside this if statement, we know there is at least 1 exception and possibly
many more.  I need to choose one of those exceptions as the root cause, so I just use the
first. Make sense?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message