metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] justinleet commented on a change in pull request #1254: METRON-1849 Elasticsearch Index Write Functionality Should be Shared
Date Mon, 10 Dec 2018 23:07:29 GMT
justinleet commented on a change in pull request #1254: METRON-1849 Elasticsearch Index Write
Functionality Should be Shared
URL: https://github.com/apache/metron/pull/1254#discussion_r240408309
 
 

 ##########
 File path: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
 ##########
 @@ -17,103 +17,90 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
 import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.AlertComment;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
+
+import static java.lang.String.format;
+
 public class ElasticsearchUpdateDao implements UpdateDao {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
-  private WriteRequest.RefreshPolicy refreshPolicy;
+  private ElasticsearchBulkDocumentWriter<Document> documentWriter;
 
   public ElasticsearchUpdateDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchRetrieveLatestDao searchDao) {
-    this.client = client;
     this.accessConfig = accessConfig;
     this.retrieveLatestDao = searchDao;
-    this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+    this.documentWriter = new ElasticsearchBulkDocumentWriter<>(client)
+            .withRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
   }
 
   @Override
   public Document update(Document update, Optional<String> index) throws IOException
{
-    String indexPostfix = ElasticsearchUtils
-        .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
-    String sensorType = update.getSensorType();
-    String indexName = getIndexName(update, index, indexPostfix);
-
-    IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
-    try {
-      IndexResponse response = client.getHighLevelClient().index(indexRequest);
-
-      ShardInfo shardInfo = response.getShardInfo();
-      int failed = shardInfo.getFailed();
-      if (failed > 0) {
-        throw new IOException(
-            "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures()));
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-    return update;
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    updates.put(update, index);
+
+    Map<Document, Optional<String>> results = batchUpdate(updates);
+    return results.keySet().iterator().next();
   }
 
   @Override
   public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>>
updates) throws IOException {
-    String indexPostfix = ElasticsearchUtils
-        .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
-
-    BulkRequest bulkRequestBuilder = new BulkRequest();
-    bulkRequestBuilder.setRefreshPolicy(refreshPolicy);
-
-    // Get the indices we'll actually be using for each Document.
-    for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet())
{
-      Document update = updateEntry.getKey();
-      String sensorType = update.getSensorType();
-      String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix);
-      IndexRequest indexRequest = buildIndexRequest(
-          update,
-          sensorType,
-          indexName
-      );
-
-      bulkRequestBuilder.add(indexRequest);
+    Map<String, Object> globalConfig = accessConfig.getGlobalConfigSupplier().get();
+    String indexPostfix = ElasticsearchUtils.getIndexFormat(globalConfig).format(new Date());
+
+    for (Map.Entry<Document, Optional<String>> entry : updates.entrySet()) {
+      Document document = entry.getKey();
+      Optional<String> optionalIndex = entry.getValue();
+      String indexName = optionalIndex.orElse(getIndexName(document, indexPostfix));
+      documentWriter.addDocument(document, indexName);
     }
 
-    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder);
-    if (bulkResponse.hasFailures()) {
-      LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage());
-      throw new IOException(
-          "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage());
+    // write the documents. if any document fails, raise an exception.
+    BulkDocumentWriterResults<Document> results = documentWriter.write();
+    int failures = results.getFailures().size();
+    if(failures > 0) {
+      int successes = results.getSuccesses().size();
+      String msg = format("Failed to update all documents; %d successes, %d failures", successes,
failures);
+      LOG.error(msg);
+
+      // log each individual failure
+      for(WriteFailure<Document> failure: results.getFailures()) {
+        LOG.error(failure.getMessage(), failure.getCause());
+      }
+
+      // raise an exception
+      Throwable cause = results.getFailures().get(0).getCause();
 
 Review comment:
   Why just get(0)? Is it just because the message already contains the failures?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message