nutch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NUTCH-2238) Indexer for Elasticsearch 2.x
Date Mon, 11 Apr 2016 19:25:26 GMT

    [ https://issues.apache.org/jira/browse/NUTCH-2238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235788#comment-15235788
] 

ASF GitHub Bot commented on NUTCH-2238:
---------------------------------------

Github user lewismc commented on a diff in the pull request:

    https://github.com/apache/nutch/pull/96#discussion_r59265077
  
    --- Diff: src/plugin/indexer-elastic2/src/java/org/apache/nutch/indexwriter/elastic2/ElasticIndexWriter.java
---
    @@ -0,0 +1,273 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nutch.indexwriter.elastic2;
    +
    +import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.net.InetAddress;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.nutch.indexer.IndexWriter;
    +import org.apache.nutch.indexer.NutchDocument;
    +import org.elasticsearch.ElasticsearchException;
    +import org.elasticsearch.action.ListenableActionFuture;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkRequestBuilder;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.delete.DeleteRequestBuilder;
    +import org.elasticsearch.action.index.IndexRequestBuilder;
    +import org.elasticsearch.client.Client;
    +import org.elasticsearch.client.transport.TransportClient;
    +import org.elasticsearch.common.settings.Settings;
    +import org.elasticsearch.common.settings.Settings.Builder;
    +import org.elasticsearch.common.transport.InetSocketTransportAddress;
    +import org.elasticsearch.node.Node;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + */
    +public class ElasticIndexWriter implements IndexWriter {
    +  public static Logger LOG = LoggerFactory.getLogger(ElasticIndexWriter.class);
    +
    +  private static final int DEFAULT_MAX_BULK_DOCS = 250;
    +  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
    +
    +  private Client client;
    +  private Node node;
    +  private String defaultIndex;
    +
    +  private Configuration config;
    +
    +  private BulkRequestBuilder bulk;
    +  private ListenableActionFuture<BulkResponse> execute;
    +  private int port = -1;
    +  private String host = null;
    +  private String clusterName = null;
    +  private int maxBulkDocs;
    +  private int maxBulkLength;
    +  private long indexedDocs = 0;
    +  private int bulkDocs = 0;
    +  private int bulkLength = 0;
    +  private boolean createNewBulk = false;
    +
    +  @Override
    +  public void open(Configuration job) throws IOException {
    +    clusterName = job.get(ElasticConstants.CLUSTER);
    +    host = job.get(ElasticConstants.HOST);
    +    port = job.getInt(ElasticConstants.PORT, 9300);
    +
    +    Builder settingsBuilder = Settings.builder();
    +
    +    BufferedReader reader = new BufferedReader(
    +        job.getConfResourceAsReader("elasticsearch.conf"));
    +    String line;
    +    String parts[];
    +
    +    while ((line = reader.readLine()) != null) {
    +      if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
    +        line.trim();
    +        parts = line.split("=");
    +
    +        if (parts.length == 2) {
    +          settingsBuilder.put(parts[0].trim(), parts[1].trim());
    +        }
    +      }
    +    }
    +
    +    if (StringUtils.isNotBlank(clusterName))
    +      settingsBuilder.put("cluster.name", clusterName);
    +    
    +    // Set the cluster name and build the settings
    +    Settings settings = settingsBuilder.build();
    +
    +    // Prefer TransportClient
    +    if (host != null && port > 1) {
    +      client = TransportClient.builder().settings(settings).build()
    +          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host),
port));
    +    } else if (clusterName != null) {
    +      node = nodeBuilder().settings(settings).client(true).node();
    +      client = node.client();
    +    }
    +
    +    bulk = client.prepareBulk();
    +    defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
    +    maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
    +        DEFAULT_MAX_BULK_DOCS);
    +    maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
    +        DEFAULT_MAX_BULK_LENGTH);
    +  }
    +
    +  @Override
    +  public void write(NutchDocument doc) throws IOException {
    +    String id = (String) doc.getFieldValue("id");
    +    String type = doc.getDocumentMeta().get("type");
    +    if (type == null)
    +      type = "doc";
    +    IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
    +
    +    Map<String, Object> source = new HashMap<String, Object>();
    +
    +    // Loop through all fields of this doc
    +    for (String fieldName : doc.getFieldNames()) {
    +      if (doc.getFieldValues(fieldName).size() > 1) {
    +        source.put(fieldName, doc.getFieldValue(fieldName));
    +        // Loop through the values to keep track of the size of this document
    +        for (Object value : doc.getFieldValues(fieldName)) {
    +          bulkLength += value.toString().length();
    +        }
    +      } else {
    +        source.put(fieldName, doc.getFieldValue(fieldName));
    +        bulkLength += doc.getFieldValue(fieldName).toString().length();
    +      }
    +    }
    +    request.setSource(source);
    +
    +    // Add this indexing request to a bulk request
    +    bulk.add(request);
    +    indexedDocs++;
    +    bulkDocs++;
    +
    +    if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
    +      LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
    +          + bulkLength + ", total docs = " + indexedDocs
    +          + ", last doc in bulk = '" + id + "']");
    +      // Flush the bulk of indexing requests
    +      createNewBulk = true;
    +      commit();
    +    }
    +  }
    +
    +  @Override
    +  public void delete(String key) throws IOException {
    +    try {
    +      DeleteRequestBuilder builder = client.prepareDelete();
    +      builder.setIndex(defaultIndex);
    +      builder.setType("doc");
    +      builder.setId(key);
    +      builder.execute().actionGet();
    +    } catch (ElasticsearchException e) {
    +      throw makeIOException(e);
    +    }
    +  }
    +
    +  public static IOException makeIOException(ElasticsearchException e) {
    +    final IOException ioe = new IOException();
    +    ioe.initCause(e);
    +    return ioe;
    +  }
    +
    +  @Override
    +  public void update(NutchDocument doc) throws IOException {
    +    write(doc);
    +  }
    +
    +  @Override
    +  public void commit() throws IOException {
    +    if (execute != null) {
    +      // wait for previous to finish
    +      long beforeWait = System.currentTimeMillis();
    +      BulkResponse actionGet = execute.actionGet();
    +      if (actionGet.hasFailures()) {
    +        for (BulkItemResponse item : actionGet) {
    +          if (item.isFailed()) {
    +            throw new RuntimeException("First failure in bulk: "
    +                + item.getFailureMessage());
    +          }
    +        }
    +      }
    +      long msWaited = System.currentTimeMillis() - beforeWait;
    +      LOG.info("Previous took in ms " + actionGet.getTookInMillis()
    --- End diff --
    
    Logging could be improved here to use parameterized messages
    http://www.slf4j.org/faq.html#logging_performance


> Indexer for Elasticsearch 2.x
> -----------------------------
>
>                 Key: NUTCH-2238
>                 URL: https://issues.apache.org/jira/browse/NUTCH-2238
>             Project: Nutch
>          Issue Type: New Feature
>          Components: plugin
>    Affects Versions: 2.3.1
>            Reporter: Pablo Torres
>             Fix For: 2.4, 2.3.2
>
>
> Add an additional plugin for Elasticsearch 2.x



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message