metron-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] justinleet commented on a change in pull request #1254: METRON-1849 Elasticsearch Index Write Functionality Should be Shared
Date Mon, 10 Dec 2018 23:07:29 GMT
justinleet commented on a change in pull request #1254: METRON-1849 Elasticsearch Index Write
Functionality Should be Shared
URL: https://github.com/apache/metron/pull/1254#discussion_r240410367
 
 

 ##########
 File path: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
 ##########
 @@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Writes documents to an Elasticsearch index in bulk.
+ *
+ * @param <D> The type of document to write.
+ */
+public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D>
{
+
+    /**
+     * A {@link Document} along with the index it will be written to.
+     */
+    private class Indexable {
+        D document;
+        String index;
+
+        public Indexable(D document, String index) {
+            this.document = document;
+            this.index = index;
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private ElasticsearchClient client;
+    private List<Indexable> documents;
+    private WriteRequest.RefreshPolicy refreshPolicy;
+
+    public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) {
+        this.client = client;
+        this.documents = new ArrayList<>();
+        this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+    }
+
+    @Override
+    public void addDocument(D document, String indexName) {
+        documents.add(new Indexable(document, indexName));
+        LOG.debug("Adding document to batch; document={}, index={}", document, indexName);
+    }
+
+    @Override
+    public BulkDocumentWriterResults<D> write() {
+        BulkDocumentWriterResults<D> results = new BulkDocumentWriterResults<>();
+        try {
+            // create an index request for each document
+            BulkRequest bulkRequest = new BulkRequest();
+            bulkRequest.setRefreshPolicy(refreshPolicy);
+            for(Indexable doc: documents) {
+                DocWriteRequest request = createRequest(doc.document, doc.index);
+                bulkRequest.add(request);
+            }
+
+            // submit the request and handle the response
+            BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
+            handleBulkResponse(bulkResponse, documents, results);
+
+        } catch(IOException e) {
+            // assume all documents have failed. notify the failure listeners
+            for(Indexable indexable: documents) {
+                D failed = indexable.document;
+                results.addFailure(failed, e, ExceptionUtils.getRootCauseMessage(e));
+            }
+            LOG.error("Failed to submit bulk request; all documents failed", e);
+
+        } finally {
+            // flush all documents no matter which ones succeeded or failed
+            documents.clear();
+        }
+
+        LOG.debug("Wrote document(s) to Elasticsearch; batchSize={}, success={}, failed={}",
+                documents.size(), results.getSuccesses().size(), results.getFailures().size());
+        return results;
+    }
+
+    @Override
+    public int size() {
+        return documents.size();
+    }
+
+    public ElasticsearchBulkDocumentWriter<D> withRefreshPolicy(WriteRequest.RefreshPolicy
refreshPolicy) {
+        this.refreshPolicy = refreshPolicy;
+        return this;
+    }
+
+    private IndexRequest createRequest(D document, String index) {
+        if(document.getTimestamp() == null) {
+            throw new IllegalArgumentException("Document must contain the timestamp");
+        }
+        return new IndexRequest()
+                .source(document.getDocument())
+                .type(document.getSensorType() + "_doc")
+                .id(document.getGuid())
+                .index(index)
+                .timestamp(document.getTimestamp().toString());
+    }
+
+    /**
+     * Handles the {@link BulkResponse} received from Elasticsearch.
+     * @param bulkResponse The response received from Elasticsearch.
+     * @param documents The documents included in the bulk request.
+     * @param results The writer results.
+     * @return The documents that were successfully written. Failed documents are excluded.
+     */
+    private void handleBulkResponse(BulkResponse bulkResponse, List<Indexable> documents,
BulkDocumentWriterResults<D> results) {
+        if (bulkResponse.hasFailures()) {
+
+            // interrogate the response to distinguish between those that succeeded and those
that failed
+            Iterator<BulkItemResponse> iterator = bulkResponse.iterator();
 
 Review comment:
   This could just be a foreach, couldn't it? It's a style nit, so don't feel obligated to
fix it if you don't want to.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message