nutch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NUTCH-1480) SolrIndexer to write to multiple servers.
Date Fri, 01 Jun 2018 17:48:01 GMT

    [ https://issues.apache.org/jira/browse/NUTCH-1480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498332#comment-16498332 ] 

ASF GitHub Bot commented on NUTCH-1480:
---------------------------------------

sebastian-nagel closed pull request #218: fix for NUTCH-1480 contributed by r0ann3l
URL: https://github.com/apache/nutch/pull/218
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/index-writers.xml.template b/conf/index-writers.xml.template
new file mode 100644
index 000000000..118c8bc88
--- /dev/null
+++ b/conf/index-writers.xml.template
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<writers xmlns="http://lucene.apache.org/nutch"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://lucene.apache.org/nutch index-writers.xsd">
+
+  <writer id="indexer_solr_1" class="org.apache.nutch.indexwriter.solr.SolrIndexWriter">
+    <parameters>
+      <param name="type" value="http"/>
+      <!-- Solr URL (default core name is "nutch" but you may change it): -->
+      <param name="url" value="http://localhost:8983/solr/nutch"/>
+      <param name="commitSize" value="250"/>
+      <param name="commitIndex" value="true"/>
+      <param name="auth" value="false"/>
+      <param name="username" value="username"/>
+      <param name="password" value="password"/>
+    </parameters>
+    <mapping>
+      <copy>
+        <!-- <field source="content" dest="search"/> -->
+        <!-- <field source="title" dest="title,search"/> -->
+      </copy>
+      <rename>
+        <field source="metatag.description" dest="description"/>
+        <field source="metatag.keywords" dest="keywords"/>
+      </rename>
+      <remove>
+        <field source="segment"/>
+      </remove>
+    </mapping>
+  </writer>
+  <writer id="indexer_rabbit_1" class="org.apache.nutch.indexwriter.rabbit.RabbitIndexWriter">
+    <parameters>
+      <param name="server.host" value="localhost"/>
+      <param name="server.port" value="5672"/>
+      <param name="server.virtualhost" value="nutch"/>
+      <param name="server.username" value="admin"/>
+      <param name="server.password" value="admin"/>
+      <param name="exchange.server" value="nutch.exchange"/>
+      <param name="exchange.type" value="direct"/>
+      <param name="queue.name" value="nutch.queue"/>
+      <param name="queue.durable" value="true"/>
+      <param name="queue.routingkey" value="nutch.key"/>
+      <param name="commit.size" value="250"/>
+    </parameters>
+    <mapping>
+      <copy>
+        <field source="title" dest="title,search"/>
+      </copy>
+      <rename>
+        <field source="metatag.description" dest="description"/>
+        <field source="metatag.keywords" dest="keywords"/>
+      </rename>
+      <remove>
+        <field source="content"/>
+        <field source="segment"/>
+        <field source="boost"/>
+      </remove>
+    </mapping>
+  </writer>
+  <writer id="indexer_dummy_1" class="org.apache.nutch.indexwriter.dummy.DummyIndexWriter">
+    <parameters>
+      <param name="delete" value="false"/>
+      <param name="path" value="./dummy-index.txt"/>
+    </parameters>
+    <mapping>
+      <copy />
+      <rename />
+      <remove />
+    </mapping>
+  </writer>
+  <writer id="indexer_elastic_1" class="org.apache.nutch.indexwriter.elastic.ElasticIndexWriter">
+    <parameters>
+      <param name="host" value=""/>
+      <param name="port" value="9300"/>
+      <param name="cluster" value=""/>
+      <param name="index" value="nutch"/>
+      <param name="max.bulk.docs" value="250"/>
+      <param name="max.bulk.size" value="2500500"/>
+      <param name="exponential.backoff.millis" value="100"/>
+      <param name="exponential.backoff.retries" value="10"/>
+      <param name="bulk.close.timeout" value="600"/>
+    </parameters>
+    <mapping>
+      <copy>
+        <field source="title" dest="title,search"/>
+      </copy>
+      <rename />
+      <remove />
+    </mapping>
+  </writer>
+  <writer id="indexer_elastic_rest_1" class="org.apache.nutch.indexwriter.elasticrest.ElasticRestIndexWriter">
+    <parameters>
+      <param name="host" value=""/>
+      <param name="port" value="9200"/>
+      <param name="index" value="nutch"/>
+      <param name="max.bulk.docs" value="250"/>
+      <param name="max.bulk.size" value="2500500"/>
+      <param name="user" value="user"/>
+      <param name="password" value="password"/>
+      <param name="type" value="doc"/>
+      <param name="https" value="false"/>
+      <param name="trustallhostnames" value="false"/>
+      <param name="languages" value=""/>
+      <param name="separator" value="_"/>
+      <param name="sink" value="others"/>
+    </parameters>
+    <mapping>
+      <copy>
+        <field source="title" dest="search"/>
+      </copy>
+      <rename />
+      <remove />
+    </mapping>
+  </writer>
+  <writer id="indexer_cloud_search_1" class="org.apache.nutch.indexwriter.elasticrest.ElasticRestIndexWriter">
+    <parameters>
+      <param name="endpoint" value=""/>
+      <param name="region" value=""/>
+      <param name="batch.dump" value="false"/>
+      <param name="batch.maxSize" value="-1"/>
+    </parameters>
+    <mapping>
+      <copy />
+      <rename />
+      <remove />
+    </mapping>
+  </writer>
+</writers>
diff --git a/conf/index-writers.xsd b/conf/index-writers.xsd
new file mode 100644
index 000000000..50ab1f313
--- /dev/null
+++ b/conf/index-writers.xsd
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+           targetNamespace="http://lucene.apache.org/nutch"
+           xmlns="http://lucene.apache.org/nutch"
+           elementFormDefault="qualified">
+  <xs:element name="writers">
+    <xs:annotation>
+      <xs:documentation>
+        Root tag of index-writers.xml document. It's a wrapper for the all index writers.
+      </xs:documentation>
+    </xs:annotation>
+    <xs:complexType>
+      <xs:sequence>
+        <xs:element name="writer" type="writerType" maxOccurs="unbounded" minOccurs="1">
+          <xs:annotation>
+            <xs:documentation>
+              Contains all the configuration of a particular index writer.
+            </xs:documentation>
+          </xs:annotation>
+        </xs:element>
+      </xs:sequence>
+    </xs:complexType>
+  </xs:element>
+  <xs:complexType name="writerType">
+    <xs:sequence>
+      <xs:element type="parametersType" name="parameters" maxOccurs="1" minOccurs="1">
+        <xs:annotation>
+          <xs:documentation>
+            This tag contains all the parameters that will be passed to the index writer implementation.
+          </xs:documentation>
+        </xs:annotation>
+      </xs:element>
+      <xs:element type="mappingType" name="mapping" maxOccurs="1" minOccurs="1">
+        <xs:annotation>
+          <xs:documentation>
+            It's a wrapper for the allowed actions over a document before it's indexed.
+          </xs:documentation>
+        </xs:annotation>
+      </xs:element>
+    </xs:sequence>
+    <xs:attribute type="xs:string" name="id" use="required">
+      <xs:annotation>
+        <xs:documentation>
+          Writer's ID.
+        </xs:documentation>
+      </xs:annotation>
+    </xs:attribute>
+    <xs:attribute type="xs:string" name="class" use="required">
+      <xs:annotation>
+        <xs:documentation>
+          The class of the index writer implementation which will be used.
+        </xs:documentation>
+      </xs:annotation>
+    </xs:attribute>
+  </xs:complexType>
+  <xs:complexType name="parametersType">
+    <xs:sequence>
+      <xs:element type="parameterType" name="param" maxOccurs="unbounded" minOccurs="1">
+        <xs:annotation>
+          <xs:documentation>
+            One single parameter that will be pass to the index writer implementation.
+          </xs:documentation>
+        </xs:annotation>
+      </xs:element>
+    </xs:sequence>
+  </xs:complexType>
+  <xs:complexType name="parameterType">
+    <xs:attribute type="xs:string" name="name" use="required">
+      <xs:annotation>
+        <xs:documentation>
+          Parameter's name. It is used to identify the parameter.
+        </xs:documentation>
+      </xs:annotation>
+    </xs:attribute>
+    <xs:attribute type="xs:string" name="value" use="required">
+      <xs:annotation>
+        <xs:documentation>
+          Parameter's value.
+        </xs:documentation>
+      </xs:annotation>
+    </xs:attribute>
+  </xs:complexType>
+  <xs:complexType name="mappingType">
+    <xs:sequence>
+      <xs:element type="fieldsType" name="copy" maxOccurs="1" minOccurs="1">
+        <xs:annotation>
+          <xs:documentation>
+            Action of copy fields. Multiple comma-separated targets can be specified.
+          </xs:documentation>
+        </xs:annotation>
+      </xs:element>
+      <xs:element type="fieldsType" name="rename" maxOccurs="1" minOccurs="1">
+        <xs:annotation>
+          <xs:documentation>
+            Action of rename fields.
+          </xs:documentation>
+        </xs:annotation>
+      </xs:element>
+      <xs:element name="remove" maxOccurs="1" minOccurs="1">
+        <xs:annotation>
+          <xs:documentation>
+            Action of remove fields.
+          </xs:documentation>
+        </xs:annotation>
+        <xs:complexType>
+          <xs:sequence>
+            <xs:element name="field" maxOccurs="unbounded" minOccurs="0">
+              <xs:annotation>
+                <xs:documentation>
+                  One single field that will be mapped.
+                </xs:documentation>
+              </xs:annotation>
+              <xs:complexType>
+                <xs:simpleContent>
+                  <xs:extension base="xs:string">
+                    <xs:attribute type="xs:string" name="source" use="required">
+                      <xs:annotation>
+                        <xs:documentation>
+                          Field's name before it's mapped.
+                        </xs:documentation>
+                      </xs:annotation>
+                    </xs:attribute>
+                  </xs:extension>
+                </xs:simpleContent>
+              </xs:complexType>
+            </xs:element>
+          </xs:sequence>
+        </xs:complexType>
+      </xs:element>
+    </xs:sequence>
+  </xs:complexType>
+  <xs:complexType name="fieldsType">
+    <xs:sequence>
+      <xs:element type="fieldType" name="field" maxOccurs="unbounded" minOccurs="0">
+        <xs:annotation>
+          <xs:documentation>
+            One single field that will be mapped.
+          </xs:documentation>
+        </xs:annotation>
+      </xs:element>
+    </xs:sequence>
+  </xs:complexType>
+  <xs:complexType name="fieldType">
+    <xs:simpleContent>
+      <xs:extension base="xs:string">
+        <xs:attribute type="xs:string" name="source" use="required">
+          <xs:annotation>
+            <xs:documentation>
+              Field's name before it's mapped.
+            </xs:documentation>
+          </xs:annotation>
+        </xs:attribute>
+        <xs:attribute type="xs:string" name="dest" use="required">
+          <xs:annotation>
+            <xs:documentation>
+              Field's name after the action is applied.
+            </xs:documentation>
+          </xs:annotation>
+        </xs:attribute>
+      </xs:extension>
+    </xs:simpleContent>
+  </xs:complexType>
+</xs:schema>
diff --git a/conf/log4j.properties b/conf/log4j.properties
index 6fad2b5d3..1939014dc 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -44,6 +44,7 @@ log4j.logger.org.apache.nutch.hostdb.UpdateHostDb=INFO,cmdstdout
 log4j.logger.org.apache.nutch.hostdb.ReadHostDb=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexer.IndexingFiltersChecker=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexer.IndexingJob=INFO,cmdstdout
+log4j.logger.org.apache.nutch.indexer.IndexerOutputFormat=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexwriter.solr.SolrIndexWriter=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexwriter.solr.SolrUtils=INFO,cmdstdout
 log4j.logger.org.apache.nutch.parse.ParserChecker=INFO,cmdstdout
diff --git a/src/java/org/apache/nutch/indexer/CleaningJob.java b/src/java/org/apache/nutch/indexer/CleaningJob.java
index fd5dff913..e8dab4031 100644
--- a/src/java/org/apache/nutch/indexer/CleaningJob.java
+++ b/src/java/org/apache/nutch/indexer/CleaningJob.java
@@ -93,7 +93,7 @@ public void map(Text key, CrawlDatum value,
 
     public void setup(Reducer<ByteWritable, Text, Text, ByteWritable>.Context context) {
       Configuration conf = context.getConfiguration();
-      writers = new IndexWriters(conf);
+      writers = IndexWriters.get(conf);
       try {
         writers.open(conf, "Deletion");
       } catch (IOException e) {
@@ -186,7 +186,7 @@ public int run(String[] args) throws IOException {
       String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
       LOG.error("Missing crawldb. " + usage);
       System.err.println(usage);
-      IndexWriters writers = new IndexWriters(getConf());
+      IndexWriters writers = IndexWriters.get(getConf());
       System.err.println(writers.describe());
       return 1;
     }
diff --git a/src/java/org/apache/nutch/indexer/IndexWriter.java b/src/java/org/apache/nutch/indexer/IndexWriter.java
index 7cc2d1569..4413699d5 100644
--- a/src/java/org/apache/nutch/indexer/IndexWriter.java
+++ b/src/java/org/apache/nutch/indexer/IndexWriter.java
@@ -16,19 +16,29 @@
  */
 package org.apache.nutch.indexer;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.plugin.Pluggable;
 
+import java.io.IOException;
+
 public interface IndexWriter extends Pluggable, Configurable {
-  /** The name of the extension point. */
+  /**
+   * The name of the extension point.
+   */
   final static String X_POINT_ID = IndexWriter.class.getName();
 
+  @Deprecated
   public void open(Configuration conf, String name) throws IOException;
 
+  /**
+   * Initializes the internal variables from a given index writer configuration.
+   *
+   * @param parameters Params from the index writer configuration.
+   * @throws IOException Some exception thrown by writer.
+   */
+  void open(IndexWriterParams parameters) throws IOException;
+
   public void write(NutchDocument doc) throws IOException;
 
   public void delete(String key) throws IOException;
@@ -40,8 +50,9 @@
   public void close() throws IOException;
 
   /**
-   * Returns a String describing the IndexWriter instance and the specific
-   * parameters it can take
+   * Returns a String describing the IndexWriter instance and the specific parameters it can take.
+   *
+   * @return The full description.
    */
   public String describe();
 }
diff --git a/src/java/org/apache/nutch/indexer/IndexWriterConfig.java b/src/java/org/apache/nutch/indexer/IndexWriterConfig.java
new file mode 100644
index 000000000..9a38676b6
--- /dev/null
+++ b/src/java/org/apache/nutch/indexer/IndexWriterConfig.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IndexWriterConfig {
+
+  private String id;
+
+  private String clazz;
+
+  private IndexWriterParams params;
+
+  private Map<MappingReader.Actions, Map<String, List<String>>> mapping;
+
+  private IndexWriterConfig(String id, String clazz, Map<String, String> params, Map<MappingReader.Actions, Map<String, List<String>>> mapping) {
+    this.id = id;
+    this.clazz = clazz;
+    this.params = new IndexWriterParams(params);
+    this.mapping = mapping;
+  }
+
+  static IndexWriterConfig getInstanceFromElement(Element rootElement) {
+    String id = rootElement.getAttribute("id");
+    String clazz = rootElement.getAttribute("class");
+
+    NodeList parametersList = rootElement.getElementsByTagName("param");
+    Map<String, String> parameters = new HashMap<>();
+
+    for (int i = 0; i < parametersList.getLength(); i++) {
+      Element parameterNode = (Element) parametersList.item(i);
+      parameters.put(parameterNode.getAttribute("name"), parameterNode.getAttribute("value"));
+    }
+
+    return new IndexWriterConfig(id, clazz, parameters,
+            MappingReader.parseMapping((Element) rootElement.getElementsByTagName("mapping").item(0)));
+  }
+
+  String getId() {
+    return id;
+  }
+
+  String getClazz() {
+    return clazz;
+  }
+
+  IndexWriterParams getParams() {
+    return params;
+  }
+
+  Map<MappingReader.Actions, Map<String, List<String>>> getMapping() {
+    return mapping;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("ID: ");
+    sb.append(id);
+    sb.append("\n");
+
+    sb.append("Class: ");
+    sb.append(clazz);
+    sb.append("\n");
+
+    sb.append("Params {\n");
+    for (Map.Entry<String, String> entry : params.entrySet()) {
+      sb.append("\t");
+      sb.append(entry.getKey());
+      sb.append(":\t");
+      sb.append(entry.getValue());
+      sb.append("\n");
+    }
+    sb.append("}\n");
+
+    sb.append("Mapping {\n");
+    for (Map.Entry<MappingReader.Actions, Map<String, List<String>>> entry : mapping.entrySet()) {
+      sb.append("\t");
+      sb.append(entry.getKey());
+      sb.append(" {\n");
+      for (Map.Entry<String, List<String>> entry1 : entry.getValue().entrySet()) {
+        sb.append("\t\t");
+        sb.append(entry1.getKey());
+        sb.append(":\t");
+        sb.append(String.join(",", entry1.getValue()));
+        sb.append("\n");
+      }
+      sb.append("\t}\n");
+    }
+    sb.append("}\n");
+    return sb.toString();
+  }
+}
diff --git a/src/java/org/apache/nutch/indexer/IndexWriterParams.java b/src/java/org/apache/nutch/indexer/IndexWriterParams.java
new file mode 100644
index 000000000..9c9ee72ef
--- /dev/null
+++ b/src/java/org/apache/nutch/indexer/IndexWriterParams.java
@@ -0,0 +1,58 @@
+package org.apache.nutch.indexer;
+
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class IndexWriterParams extends HashMap<String, String> {
+
+    /**
+     * Constructs a new <tt>HashMap</tt> with the same mappings as the
+     * specified <tt>Map</tt>.  The <tt>HashMap</tt> is created with
+     * default load factor (0.75) and an initial capacity sufficient to
+     * hold the mappings in the specified <tt>Map</tt>.
+     *
+     * @param m the map whose mappings are to be placed in this map
+     * @throws NullPointerException if the specified map is null
+     */
+    public IndexWriterParams(Map<? extends String, ? extends String> m) {
+        super(m);
+    }
+
+    public String get(String name, String defaultValue) {
+        return this.getOrDefault(name, defaultValue);
+    }
+
+    public boolean getBoolean(String name, boolean defaultValue) {
+        String value;
+        if ((value = this.get(name)) != null) {
+            return Boolean.parseBoolean(value);
+        }
+
+        return defaultValue;
+    }
+
+    public long getLong(String name, long defaultValue) {
+        String value;
+        if ((value = this.get(name)) != null) {
+            return Long.parseLong(value);
+        }
+
+        return defaultValue;
+    }
+
+    public int getInt(String name, int defaultValue) {
+        String value;
+        if ((value = this.get(name)) != null) {
+            return Integer.parseInt(value);
+        }
+
+        return defaultValue;
+    }
+
+    public String[] getStrings(String name) {
+        String value = this.get(name);
+        return StringUtils.getStrings(value);
+    }
+}
diff --git a/src/java/org/apache/nutch/indexer/IndexWriters.java b/src/java/org/apache/nutch/indexer/IndexWriters.java
index 63ddaffa5..085a01fa4 100644
--- a/src/java/org/apache/nutch/indexer/IndexWriters.java
+++ b/src/java/org/apache/nutch/indexer/IndexWriters.java
@@ -16,131 +16,264 @@
  */
 package org.apache.nutch.indexer;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.plugin.Extension;
 import org.apache.nutch.plugin.ExtensionPoint;
 import org.apache.nutch.plugin.PluginRepository;
 import org.apache.nutch.plugin.PluginRuntimeException;
-import org.apache.nutch.util.ObjectCache;
+import org.apache.nutch.util.NutchConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
 
-/** Creates and caches {@link IndexWriter} implementing plugins. */
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+/**
+ * Creates and caches {@link IndexWriter} implementing plugins.
+ */
 public class IndexWriters {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
-  private IndexWriter[] indexWriters;
-
-  public IndexWriters(Configuration conf) {
-    ObjectCache objectCache = ObjectCache.get(conf);
-    synchronized (objectCache) {
-      this.indexWriters = (IndexWriter[]) objectCache
-          .getObject(IndexWriter.class.getName());
-      if (this.indexWriters == null) {
-        try {
-          ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
-              IndexWriter.X_POINT_ID);
-          if (point == null)
-            throw new RuntimeException(IndexWriter.X_POINT_ID + " not found.");
-          Extension[] extensions = point.getExtensions();
-          HashMap<String, IndexWriter> indexerMap = new HashMap<>();
-          for (int i = 0; i < extensions.length; i++) {
-            Extension extension = extensions[i];
-            IndexWriter writer = (IndexWriter) extension.getExtensionInstance();
-            LOG.info("Adding " + writer.getClass().getName());
-            if (!indexerMap.containsKey(writer.getClass().getName())) {
-              indexerMap.put(writer.getClass().getName(), writer);
-            }
+  private static final WeakHashMap<String, IndexWriters> CACHE = new WeakHashMap<>();
+
+  public static synchronized IndexWriters get(Configuration conf) {
+    String uuid = NutchConfiguration.getUUID(conf);
+    if (uuid == null) {
+      uuid = "nonNutchConf@" + conf.hashCode(); // fallback
+    }
+    return CACHE.computeIfAbsent(uuid, k -> new IndexWriters(conf));
+  }
+
+  private HashMap<String, IndexWriterWrapper> indexWriters;
+
+  private IndexWriters(Configuration conf) {
+    //It's not cached yet
+    if (this.indexWriters == null) {
+      try {
+        ExtensionPoint point = PluginRepository.get(conf)
+            .getExtensionPoint(IndexWriter.X_POINT_ID);
+
+        if (point == null) {
+          throw new RuntimeException(IndexWriter.X_POINT_ID + " not found.");
+        }
+
+        Extension[] extensions = point.getExtensions();
+
+        HashMap<String, Extension> extensionMap = new HashMap<>();
+        for (Extension extension : extensions) {
+          LOG.info("Index writer {} identified.", extension.getClazz());
+          extensionMap.putIfAbsent(extension.getClazz(), extension);
+        }
+
+        IndexWriterConfig[] indexWriterConfigs = loadWritersConfiguration(
+            conf);
+        this.indexWriters = new HashMap<>();
+
+        for (IndexWriterConfig indexWriterConfig : indexWriterConfigs) {
+          final String clazz = indexWriterConfig.getClazz();
+
+          //If was enabled in plugin.includes property
+          if (extensionMap.containsKey(clazz)) {
+            IndexWriterWrapper writerWrapper = new IndexWriterWrapper();
+            writerWrapper.setIndexWriterConfig(indexWriterConfig);
+            writerWrapper.setIndexWriter(
+                (IndexWriter) extensionMap.get(clazz).getExtensionInstance());
+
+            indexWriters.put(indexWriterConfig.getId(), writerWrapper);
           }
-          objectCache.setObject(IndexWriter.class.getName(), indexerMap
-              .values().toArray(new IndexWriter[0]));
-        } catch (PluginRuntimeException e) {
-          throw new RuntimeException(e);
         }
-        this.indexWriters = (IndexWriter[]) objectCache
-            .getObject(IndexWriter.class.getName());
+      } catch (PluginRuntimeException e) {
+        throw new RuntimeException(e);
       }
     }
   }
 
-  public void open(Configuration conf, String name) throws IOException {
-    for (int i = 0; i < this.indexWriters.length; i++) {
-      try {
-        this.indexWriters[i].open(conf, name);
-      } catch (IOException ioe) {
-        throw ioe;
+  /**
+   * Loads the configuration of index writers.
+   *
+   * @param conf Nutch configuration instance.
+   */
+  private IndexWriterConfig[] loadWritersConfiguration(Configuration conf) {
+    InputStream ssInputStream = conf
+        .getConfResourceAsInputStream("index-writers.xml");
+    InputSource inputSource = new InputSource(ssInputStream);
+
+    try {
+      DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+      DocumentBuilder builder = factory.newDocumentBuilder();
+      Document document = builder.parse(inputSource);
+      Element rootElement = document.getDocumentElement();
+      NodeList writerList = rootElement.getElementsByTagName("writer");
+
+      IndexWriterConfig[] indexWriterConfigs = new IndexWriterConfig[writerList
+          .getLength()];
+
+      for (int i = 0; i < writerList.getLength(); i++) {
+        indexWriterConfigs[i] = IndexWriterConfig
+            .getInstanceFromElement((Element) writerList.item(i));
       }
+
+      return indexWriterConfigs;
+    } catch (SAXException | IOException | ParserConfigurationException e) {
+      LOG.warn(e.toString());
+      return new IndexWriterConfig[0];
+    }
+  }
+
+  /**
+   * Maps the fields of a given document.
+   *
+   * @param document The document to map.
+   * @param mapping  The mapping to apply.
+   * @return The mapped document.
+   */
+  private NutchDocument mapDocument(final NutchDocument document,
+      final Map<MappingReader.Actions, Map<String, List<String>>> mapping) {
+    try {
+      NutchDocument mappedDocument = document.clone();
+
+      mapping.get(MappingReader.Actions.COPY).forEach((key, value) -> {
+        //Checking whether the field to copy exists or not
+        if (mappedDocument.getField(key) != null) {
+          for (String field : value) {
+            //To avoid duplicate the values
+            if (!key.equals(field)) {
+              for (Object val : mappedDocument.getField(key).getValues()) {
+                mappedDocument.add(field, val);
+              }
+            }
+          }
+        }
+      });
+
+      mapping.get(MappingReader.Actions.RENAME).forEach((key, value) -> {
+        //Checking whether the field to rename exists or not
+        if (mappedDocument.getField(key) != null) {
+          NutchField field = mappedDocument.removeField(key);
+          mappedDocument.add(value.get(0), field.getValues());
+          mappedDocument.getField(value.get(0)).setWeight(field.getWeight());
+        }
+      });
+
+      mapping.get(MappingReader.Actions.REMOVE)
+          .forEach((key, value) -> mappedDocument.removeField(key));
+
+      return mappedDocument;
+    } catch (CloneNotSupportedException e) {
+      LOG.warn("An instance of class {} can't be cloned.",
+          document.getClass().getName());
+      return document;
+    }
+  }
+
+  /**
+   * Initializes the internal variables of index writers.
+   *
+   * @param conf Nutch configuration.
+   * @param name
+   * @throws IOException Some exception thrown by some writer.
+   */
+  public void open(Configuration conf, String name) throws IOException {
+    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
+        .entrySet()) {
+      entry.getValue().getIndexWriter().open(conf, name);
+      entry.getValue().getIndexWriter()
+          .open(entry.getValue().getIndexWriterConfig().getParams());
     }
   }
 
   public void write(NutchDocument doc) throws IOException {
-    for (int i = 0; i < this.indexWriters.length; i++) {
-      try {
-        this.indexWriters[i].write(doc);
-      } catch (IOException ioe) {
-        throw ioe;
-      }
+    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
+        .entrySet()) {
+      NutchDocument mappedDocument = mapDocument(doc,
+          entry.getValue().getIndexWriterConfig().getMapping());
+      entry.getValue().getIndexWriter().write(mappedDocument);
     }
   }
 
   public void update(NutchDocument doc) throws IOException {
-    for (int i = 0; i < this.indexWriters.length; i++) {
-      try {
-        this.indexWriters[i].update(doc);
-      } catch (IOException ioe) {
-        throw ioe;
-      }
+    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
+        .entrySet()) {
+      entry.getValue().getIndexWriter().update(mapDocument(doc,
+          entry.getValue().getIndexWriterConfig().getMapping()));
     }
   }
 
   public void delete(String key) throws IOException {
-    for (int i = 0; i < this.indexWriters.length; i++) {
-      try {
-        this.indexWriters[i].delete(key);
-      } catch (IOException ioe) {
-        throw ioe;
-      }
+    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
+        .entrySet()) {
+      entry.getValue().getIndexWriter().delete(key);
     }
   }
 
   public void close() throws IOException {
-    for (int i = 0; i < this.indexWriters.length; i++) {
-      try {
-        this.indexWriters[i].close();
-      } catch (IOException ioe) {
-        throw ioe;
-      }
+    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
+        .entrySet()) {
+      entry.getValue().getIndexWriter().close();
     }
   }
 
   public void commit() throws IOException {
-    for (int i = 0; i < this.indexWriters.length; i++) {
-      try {
-        this.indexWriters[i].commit();
-      } catch (IOException ioe) {
-        throw ioe;
-      }
+    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
+        .entrySet()) {
+      entry.getValue().getIndexWriter().commit();
     }
   }
 
-  // lists the active IndexWriters and their configuration
-  public String describe() throws IOException {
-    StringBuffer buffer = new StringBuffer();
-    if (this.indexWriters.length == 0)
-      buffer.append("No IndexWriters activated - check your configuration\n");
+  /**
+   * Lists the active IndexWriters and their configuration.
+   *
+   * @return The full description.
+   */
+  public String describe() {
+    StringBuilder builder = new StringBuilder();
+    if (this.indexWriters.size() == 0)
+      builder.append("No IndexWriters activated - check your configuration\n");
     else
-      buffer.append("Active IndexWriters :\n");
-    for (int i = 0; i < this.indexWriters.length; i++) {
-      buffer.append(this.indexWriters[i].describe()).append("\n");
+      builder.append("Active IndexWriters :\n");
+
+    for (IndexWriterWrapper indexWriterWrapper : this.indexWriters.values()) {
+      builder.append(indexWriterWrapper.getIndexWriter().describe())
+          .append("\n");
     }
-    return buffer.toString();
+
+    return builder.toString();
   }
 
+  public class IndexWriterWrapper {
+    private IndexWriterConfig indexWriterConfig;
+
+    private IndexWriter indexWriter;
+
+    IndexWriterConfig getIndexWriterConfig() {
+      return indexWriterConfig;
+    }
+
+    void setIndexWriterConfig(IndexWriterConfig indexWriterConfig) {
+      this.indexWriterConfig = indexWriterConfig;
+    }
+
+    IndexWriter getIndexWriter() {
+      return indexWriter;
+    }
+
+    void setIndexWriter(IndexWriter indexWriter) {
+      this.indexWriter = indexWriter;
+    }
+  }
 }
diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
index bc1c82a16..6fa203271 100644
--- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
+++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -56,6 +56,7 @@
 
   public static final String INDEXER_PARAMS = "indexer.additional.params";
   public static final String INDEXER_DELETE = "indexer.delete";
+  public static final String INDEXER_NO_COMMIT = "indexer.nocommit";
   public static final String INDEXER_DELETE_ROBOTS_NOINDEX = "indexer.delete.robots.noindex";
   public static final String INDEXER_DELETE_SKIPPED = "indexer.delete.skipped.by.indexingfilter";
   public static final String INDEXER_SKIP_NOTMODIFIED = "indexer.skip.notmodified";
diff --git a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
index 359f9d1fc..3ce4f8061 100644
--- a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
+++ b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
@@ -24,15 +24,15 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-public class IndexerOutputFormat extends
-    FileOutputFormat<Text, NutchIndexAction> {
+public class IndexerOutputFormat
+    extends FileOutputFormat<Text, NutchIndexAction> {
 
   @Override
   public RecordWriter<Text, NutchIndexAction> getRecordWriter(
       TaskAttemptContext context) throws IOException {
 
     Configuration conf = context.getConfiguration();
-    final IndexWriters writers = new IndexWriters(conf);
+    final IndexWriters writers = IndexWriters.get(conf);
 
     String name = getUniqueFile(context, "part", "");
     writers.open(conf, name);
@@ -40,6 +40,12 @@
     return new RecordWriter<Text, NutchIndexAction>() {
 
       public void close(TaskAttemptContext context) throws IOException {
+        // do the commits once and for all the reducers in one go
+        boolean noCommit = conf
+            .getBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, false);
+        if (!noCommit) {
+          writers.commit();
+        }
         writers.close();
       }
 
diff --git a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
index 284d4adc7..afc6aaf3c 100644
--- a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
+++ b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
@@ -269,7 +269,7 @@ protected int process(String url, StringBuilder output) throws Exception {
     output.append("\n"); // For readability if keepClientCnxOpen
 
     if (getConf().getBoolean("doIndex", false) && doc != null) {
-      IndexWriters writers = new IndexWriters(getConf());
+      IndexWriters writers = IndexWriters.get(getConf());
       writers.open(getConf(), "IndexingFilterChecker");
       writers.write(doc);
       writers.close();
diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java b/src/java/org/apache/nutch/indexer/IndexingJob.java
index 67572915b..ae77da4a0 100644
--- a/src/java/org/apache/nutch/indexer/IndexingJob.java
+++ b/src/java/org/apache/nutch/indexer/IndexingJob.java
@@ -71,13 +71,13 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
   }
 
   public void index(Path crawlDb, Path linkDb, List<Path> segments,
-      boolean noCommit, boolean deleteGone) 
+      boolean noCommit, boolean deleteGone)
       throws IOException, InterruptedException, ClassNotFoundException {
     index(crawlDb, linkDb, segments, noCommit, deleteGone, null);
   }
 
   public void index(Path crawlDb, Path linkDb, List<Path> segments,
-      boolean noCommit, boolean deleteGone, String params) 
+      boolean noCommit, boolean deleteGone, String params)
       throws IOException, InterruptedException, ClassNotFoundException {
     index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false, false);
   }
@@ -91,7 +91,7 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
 
   public void index(Path crawlDb, Path linkDb, List<Path> segments,
       boolean noCommit, boolean deleteGone, String params,
-      boolean filter, boolean normalize, boolean addBinaryContent) 
+      boolean filter, boolean normalize, boolean addBinaryContent)
       throws IOException, InterruptedException, ClassNotFoundException {
     index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false,
         false, false, false);
@@ -102,7 +102,6 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
       boolean filter, boolean normalize, boolean addBinaryContent,
       boolean base64) throws IOException, InterruptedException, ClassNotFoundException {
 
-
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
     LOG.info("Indexer: starting at {}", sdf.format(start));
@@ -120,8 +119,8 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
       } else {
         LOG.info("Indexer: adding binary content");
       }
-    }        
-    IndexWriters writers = new IndexWriters(getConf());
+    }
+    IndexWriters writers = IndexWriters.get(conf);
     LOG.info(writers.describe());
 
     IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job, addBinaryContent);
@@ -133,6 +132,7 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
     conf.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
     conf.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
     conf.setBoolean(IndexerMapReduce.INDEXER_BINARY_AS_BASE64, base64);
+    conf.setBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, noCommit);
 
     if (params != null) {
       conf.set(IndexerMapReduce.INDEXER_PARAMS, params);
@@ -158,11 +158,6 @@ public void index(Path crawlDb, Path linkDb, List<Path> segments,
         LOG.error(StringUtils.stringifyException(e));
         throw e;
       }
-      // do the commits once and for all the reducers in one go
-      if (!noCommit) {
-        writers.open(conf, "commit");
-        writers.commit();
-      }
       LOG.info("Indexer: number of documents indexed, deleted, or skipped:");
       for (Counter counter : job.getCounters().getGroup("IndexerStatus")) {
         LOG.info("Indexer: {}  {}",
@@ -182,7 +177,7 @@ public int run(String[] args) throws Exception {
       System.err
       //.println("Usage: Indexer <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] [-filter] [-normalize]");
       .println("Usage: Indexer <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] [-filter] [-normalize] [-addBinaryContent] [-base64]");
-      IndexWriters writers = new IndexWriters(getConf());
+      IndexWriters writers = IndexWriters.get(getConf());
       System.err.println(writers.describe());
       return -1;
     }
@@ -318,15 +313,15 @@ public static void main(String[] args) throws Exception {
 
     if(args.containsKey(Nutch.ARG_SEGMENTS)) {
       Object segmentsFromArg = args.get(Nutch.ARG_SEGMENTS);
-      ArrayList<String> segmentList = new ArrayList<String>(); 
+      ArrayList<String> segmentList = new ArrayList<String>();
       if(segmentsFromArg instanceof ArrayList) {
-    	segmentList = (ArrayList<String>)segmentsFromArg; }
+    	  segmentList = (ArrayList<String>)segmentsFromArg; }
       else if(segmentsFromArg instanceof Path){
         segmentList.add(segmentsFromArg.toString());
       }
-    	      
+
       for(String segment: segmentList) {
-    	segments.add(new Path(segment));
+    	  segments.add(new Path(segment));
       }
     }
 
diff --git a/src/java/org/apache/nutch/indexer/MappingReader.java b/src/java/org/apache/nutch/indexer/MappingReader.java
new file mode 100644
index 000000000..65cc9d2bf
--- /dev/null
+++ b/src/java/org/apache/nutch/indexer/MappingReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class MappingReader {
+
+  /**
+   * Converts the tag "mapping" to a {@link Map} instance.
+   *
+   * @param mappingElement The tag "mapping" wrapped into an {@link Element} instance.
+   * @return The {@link Map} instance with the actions for mapping the fields.
+   */
+  static Map<Actions, Map<String, List<String>>> parseMapping(Element mappingElement) {
+    Map<Actions, Map<String, List<String>>> parsedMapping = new HashMap<>();
+
+    //Getting rename action
+    Node node = mappingElement.getElementsByTagName("rename").item(0);
+
+    if (node != null) {
+      NodeList fieldList = ((Element) node).getElementsByTagName("field");
+
+      Map<String, List<String>> fieldsMap = new HashMap<>();
+
+      for (int j = 0; j < fieldList.getLength(); j++) {
+        Element field = (Element) fieldList.item(j);
+        fieldsMap.put(field.getAttribute("source"), Collections.singletonList(field.getAttribute("dest")));
+      }
+
+      parsedMapping.put(Actions.RENAME, fieldsMap);
+    }
+
+    //Getting copy action
+    node = mappingElement.getElementsByTagName("copy").item(0);
+
+    if (node != null) {
+      NodeList fieldList = ((Element) node).getElementsByTagName("field");
+
+      Map<String, List<String>> fieldsMap = new HashMap<>();
+
+      for (int j = 0; j < fieldList.getLength(); j++) {
+        Element field = (Element) fieldList.item(j);
+        fieldsMap.put(field.getAttribute("source"), Arrays.asList(field.getAttribute("dest").split(",")));
+      }
+
+      parsedMapping.put(Actions.COPY, fieldsMap);
+    }
+
+    //Getting remove action
+    node = mappingElement.getElementsByTagName("remove").item(0);
+
+    if (node != null) {
+      NodeList fieldList = ((Element) node).getElementsByTagName("field");
+
+      Map<String, List<String>> fieldsMap = new HashMap<>();
+
+      for (int j = 0; j < fieldList.getLength(); j++) {
+        Element field = (Element) fieldList.item(j);
+        fieldsMap.put(field.getAttribute("source"), null);
+      }
+
+      parsedMapping.put(Actions.REMOVE, fieldsMap);
+    }
+
+    return parsedMapping;
+  }
+
+  /**
+   * Available actions for mapping fields.
+   */
+  enum Actions {
+    RENAME, COPY, REMOVE
+  }
+}
diff --git a/src/java/org/apache/nutch/indexer/NutchDocument.java b/src/java/org/apache/nutch/indexer/NutchDocument.java
index 9807ebf47..bbe622195 100644
--- a/src/java/org/apache/nutch/indexer/NutchDocument.java
+++ b/src/java/org/apache/nutch/indexer/NutchDocument.java
@@ -33,11 +33,11 @@
 
 /** A {@link NutchDocument} is the unit of indexing. */
 public class NutchDocument implements Writable,
-    Iterable<Entry<String, NutchField>> {
+    Iterable<Entry<String, NutchField>>, Cloneable {
 
   public static final byte VERSION = 2;
 
-  private Map<String, NutchField> fields;
+  private HashMap<String, NutchField> fields;
 
   private Metadata documentMeta;
 
@@ -141,4 +141,17 @@ public String toString() {
     sb.append("}\n");
     return sb.toString();
   }
+
+  @Override
+  public NutchDocument clone() throws CloneNotSupportedException {
+    NutchDocument clonedDocument = (NutchDocument) super.clone();
+
+    clonedDocument.fields = new HashMap<>();
+
+    for (Entry<String, NutchField> field : this.fields.entrySet()) {
+      clonedDocument.fields.put(field.getKey(), field.getValue().clone());
+    }
+
+    return clonedDocument;
+  }
 }
diff --git a/src/java/org/apache/nutch/indexer/NutchField.java b/src/java/org/apache/nutch/indexer/NutchField.java
index d6b2b3b2c..de76e230b 100644
--- a/src/java/org/apache/nutch/indexer/NutchField.java
+++ b/src/java/org/apache/nutch/indexer/NutchField.java
@@ -32,9 +32,9 @@
  * This class represents a multi-valued field with a weight. Values are
  * arbitrary objects.
  */
-public class NutchField implements Writable {
+public class NutchField implements Writable, Cloneable {
   private float weight;
-  private List<Object> values = new ArrayList<>();
+  private ArrayList<Object> values = new ArrayList<>();
 
   public NutchField() {
     //default constructor
@@ -75,10 +75,10 @@ public void reset() {
   }
 
   @Override
-  public Object clone() throws CloneNotSupportedException {
+  public NutchField clone() throws CloneNotSupportedException {
     NutchField result = (NutchField) super.clone();
     result.weight = weight;
-    result.values = values;
+    result.values = (ArrayList<Object>) values.clone();
 
     return result;
   }
diff --git a/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java b/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java
index 8bfb16146..064a4f6ed 100644
--- a/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java
+++ b/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchConstants.java
@@ -18,10 +18,8 @@
 package org.apache.nutch.indexwriter.cloudsearch;
 
 public interface CloudSearchConstants {
-  public static final String CLOUDSEARCH_PREFIX = "cloudsearch.";
-  public static final String ENDPOINT = CLOUDSEARCH_PREFIX + "endpoint";
-  public static final String REGION = CLOUDSEARCH_PREFIX + "region";
-  public static final String BATCH_DUMP = CLOUDSEARCH_PREFIX + "batch.dump";
-  public static final String MAX_DOCS_BATCH = CLOUDSEARCH_PREFIX
-      + "batch.maxSize";
+  public static final String ENDPOINT = "endpoint";
+  public static final String REGION = "region";
+  public static final String BATCH_DUMP = "batch.dump";
+  public static final String MAX_DOCS_BATCH = "batch.maxSize";
 }
diff --git a/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java b/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java
index 29cfad3ae..3973485b0 100644
--- a/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java
+++ b/src/plugin/indexer-cloudsearch/src/java/org/apache/nutch/indexwriter/cloudsearch/CloudSearchIndexWriter.java
@@ -35,6 +35,7 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.NutchField;
 import org.slf4j.Logger;
@@ -86,14 +87,29 @@
 
   @Override
   public void open(Configuration conf, String name) throws IOException {
-    LOG.debug("CloudSearchIndexWriter.open() name={} ", name);
-
-    maxDocsInBatch = conf.getInt(CloudSearchConstants.MAX_DOCS_BATCH, -1);
+    //Implementation not required
+  }
 
-    buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('[');
+  @Override
+  public void open(IndexWriterParams parameters) throws IOException {
+//    LOG.debug("CloudSearchIndexWriter.open() name={} ", name);
 
-    dumpBatchFilesToTemp = conf.getBoolean(CloudSearchConstants.BATCH_DUMP,
+    String endpoint = parameters.get(CloudSearchConstants.ENDPOINT);
+    dumpBatchFilesToTemp = parameters.getBoolean(CloudSearchConstants.BATCH_DUMP,
         false);
+    this.regionName = parameters.get(CloudSearchConstants.REGION);
+
+    if (StringUtils.isBlank(endpoint) && !dumpBatchFilesToTemp) {
+      String message = "Missing CloudSearch endpoint. Should set it set via -D "
+          + CloudSearchConstants.ENDPOINT + " or in nutch-site.xml";
+      message += "\n" + describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+
+    maxDocsInBatch = parameters.getInt(CloudSearchConstants.MAX_DOCS_BATCH, -1);
+
+    buffer = new StringBuffer(MAX_SIZE_BATCH_BYTES).append('[');
 
     if (dumpBatchFilesToTemp) {
       // only dumping to local file
@@ -101,8 +117,6 @@ public void open(Configuration conf, String name) throws IOException {
       return;
     }
 
-    String endpoint = conf.get(CloudSearchConstants.ENDPOINT);
-
     if (StringUtils.isBlank(endpoint)) {
       throw new RuntimeException("endpoint not set for CloudSearch");
     }
@@ -144,7 +158,6 @@ public void open(Configuration conf, String name) throws IOException {
 
     client = new AmazonCloudSearchDomainClient();
     client.setEndpoint(endpoint);
-
   }
 
   @Override
@@ -326,18 +339,6 @@ public Configuration getConf() {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
-    String endpoint = getConf().get(CloudSearchConstants.ENDPOINT);
-    boolean dumpBatchFilesToTemp = getConf()
-        .getBoolean(CloudSearchConstants.BATCH_DUMP, false);
-    this.regionName = getConf().get(CloudSearchConstants.REGION);
-
-    if (StringUtils.isBlank(endpoint) && !dumpBatchFilesToTemp) {
-      String message = "Missing CloudSearch endpoint. Should set it set via -D "
-          + CloudSearchConstants.ENDPOINT + " or in nutch-site.xml";
-      message += "\n" + describe();
-      LOG.error(message);
-      throw new RuntimeException(message);
-    }
   }
 
   public String describe() {
diff --git a/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyConstants.java b/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyConstants.java
new file mode 100644
index 000000000..7dea970b4
--- /dev/null
+++ b/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyConstants.java
@@ -0,0 +1,7 @@
+package org.apache.nutch.indexwriter.dummy;
+
+public interface DummyConstants {
+    String DELETE = "delete";
+
+    String PATH = "path";
+}
diff --git a/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyIndexWriter.java b/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyIndexWriter.java
index 071cbb1f1..064d8f627 100644
--- a/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyIndexWriter.java
+++ b/src/plugin/indexer-dummy/src/java/org/apache/nutch/indexwriter/dummy/DummyIndexWriter.java
@@ -21,9 +21,13 @@
 import java.io.IOException;
 import java.io.FileWriter;
 import java.io.Writer;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.IndexerMapReduce;
 import org.apache.nutch.indexer.NutchDocument;
 import org.slf4j.Logger;
@@ -42,9 +46,41 @@
   private boolean delete = false;
 
   public void open(Configuration conf, String name) throws IOException {
-    delete = conf.getBoolean(IndexerMapReduce.INDEXER_DELETE, false);
+      //Implementation not required
   }
 
+    /**
+     * Initializes the internal variables from a given index writer configuration.
+     *
+     * @param parameters Params from the index writer configuration.
+     * @throws IOException Some exception thrown by writer.
+     */
+    @Override
+    public void open(IndexWriterParams parameters) throws IOException {
+        delete = parameters.getBoolean(DummyConstants.DELETE, false);
+
+        String path = parameters.get(DummyConstants.PATH, "/");
+        if (path == null) {
+            String message = "Missing path.";
+            message += "\n" + describe();
+            LOG.error(message);
+            throw new RuntimeException(message);
+        }
+
+        if (writer != null) {
+            LOG.warn("Dummy index file already open for writing");
+            return;
+        }
+
+        try {
+            LOG.debug("Opening dummy index file {}", path);
+            writer = new BufferedWriter(new FileWriter(path));
+        } catch (IOException ex) {
+            LOG.error("Failed to open index file {}: {}", path,
+                    StringUtils.stringifyException(ex));
+        }
+    }
+
   @Override
   public void delete(String key) throws IOException {
     if (delete) {
@@ -63,6 +99,7 @@ public void write(NutchDocument doc) throws IOException {
   }
 
   public void close() throws IOException {
+    LOG.debug("Closing dummy index file");
     writer.flush();
     writer.close();
   }
@@ -80,18 +117,6 @@ public Configuration getConf() {
   @Override
   public void setConf(Configuration conf) {
     config = conf;
-    String path = conf.get("dummy.path");
-    if (path == null) {
-      String message = "Missing path. Should be set via -Ddummy.path";
-      message += "\n" + describe();
-      LOG.error(message);
-      throw new RuntimeException(message);
-    }
-
-    try {
-      writer = new BufferedWriter(new FileWriter(conf.get("dummy.path")));
-    } catch (IOException e) {
-    }
   }
 
   public String describe() {
diff --git a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestConstants.java b/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestConstants.java
index b36f0271c..cbbc29757 100644
--- a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestConstants.java
+++ b/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestConstants.java
@@ -17,21 +17,19 @@
 package org.apache.nutch.indexwriter.elasticrest;
 
 public interface ElasticRestConstants {
-  public static final String ELASTIC_PREFIX = "elastic.rest.";
+  public static final String HOST = "host";
+  public static final String PORT = "port";
+  public static final String INDEX = "index";
+  public static final String MAX_BULK_DOCS = "max.bulk.docs";
+  public static final String MAX_BULK_LENGTH = "max.bulk.size";
 
-  public static final String HOST = ELASTIC_PREFIX + "host";
-  public static final String PORT = ELASTIC_PREFIX + "port";
-  public static final String INDEX = ELASTIC_PREFIX + "index";
-  public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
-  public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
-
-  public static final String USER = ELASTIC_PREFIX + "user";
-  public static final String PASSWORD = ELASTIC_PREFIX + "password";
-  public static final String TYPE = ELASTIC_PREFIX + "type";
-  public static final String HTTPS = ELASTIC_PREFIX + "https";
-  public static final String HOSTNAME_TRUST = ELASTIC_PREFIX + "trustallhostnames";
+  public static final String USER = "user";
+  public static final String PASSWORD = "password";
+  public static final String TYPE = "type";
+  public static final String HTTPS = "https";
+  public static final String HOSTNAME_TRUST = "trustallhostnames";
   
-  public static final String LANGUAGES = INDEX + ".languages";
-  public static final String SEPARATOR = INDEX + ".separator";
-  public static final String SINK = INDEX + ".sink";
+  public static final String LANGUAGES = "languages";
+  public static final String SEPARATOR = "separator";
+  public static final String SINK = "sink";
 }
diff --git a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestIndexWriter.java b/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestIndexWriter.java
index 02600618f..3bd9d41d6 100644
--- a/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestIndexWriter.java
+++ b/src/plugin/indexer-elastic-rest/src/java/org/apache/nutch/indexwriter/elasticrest/ElasticRestIndexWriter.java
@@ -40,6 +40,7 @@
 import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.TrustStrategy;
 import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +70,7 @@
       .getLogger(ElasticRestIndexWriter.class);
 
   private static final int DEFAULT_MAX_BULK_DOCS = 250;
-  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;  
+  private static final int DEFAULT_MAX_BULK_LENGTH = 2500500;
   private static final String DEFAULT_SEPARATOR = "_";
   private static final String DEFAULT_SINK = "others";
 
@@ -81,7 +82,7 @@
 
   private Bulk.Builder bulkBuilder;
   private int port = -1;
-  private String[] hosts = null;
+  private String host = null;
   private Boolean https = null;
   private String user = null;
   private String password = null;
@@ -95,30 +96,45 @@
   private boolean createNewBulk = false;
   private long millis;
   private BasicFuture<JestResult> basicFuture = null;
-  
+
   private String[] languages = null;
   private String separator = null;
   private String sink = null;
 
   @Override
   public void open(Configuration conf, String name) throws IOException {
+    //Implementation not required
+  }
+
+  @Override
+  public void open(IndexWriterParams parameters) throws IOException {
+    host = parameters.get(ElasticRestConstants.HOST);
+    if (StringUtils.isBlank(host)) {
+      String message = "Missing host. It should be set in index-writers.xml";
+      message += "\n" + describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+
+    port = parameters.getInt(ElasticRestConstants.PORT, 9200);
+    user = parameters.get(ElasticRestConstants.USER);
+    password = parameters.get(ElasticRestConstants.PASSWORD);
+    https = parameters.getBoolean(ElasticRestConstants.HTTPS, false);
+    trustAllHostnames = parameters
+        .getBoolean(ElasticRestConstants.HOSTNAME_TRUST, false);
 
-    hosts = conf.getStrings(ElasticRestConstants.HOST);
-    port = conf.getInt(ElasticRestConstants.PORT, 9200);
-    user = conf.get(ElasticRestConstants.USER);
-    password = conf.get(ElasticRestConstants.PASSWORD);
-    https = conf.getBoolean(ElasticRestConstants.HTTPS, false);
-    trustAllHostnames = conf.getBoolean(ElasticRestConstants.HOSTNAME_TRUST, false);
-    languages = conf.getStrings(ElasticRestConstants.LANGUAGES);
-    separator = conf.get(ElasticRestConstants.SEPARATOR, DEFAULT_SEPARATOR);
-    sink = conf.get(ElasticRestConstants.SINK, DEFAULT_SINK);
+    languages = parameters.getStrings(ElasticRestConstants.LANGUAGES);
+    separator = parameters
+        .get(ElasticRestConstants.SEPARATOR, DEFAULT_SEPARATOR);
+    sink = parameters.get(ElasticRestConstants.SINK, DEFAULT_SINK);
 
     // trust ALL certificates
     SSLContext sslContext = null;
     try {
       sslContext = new SSLContextBuilder()
           .loadTrustMaterial(new TrustStrategy() {
-            public boolean isTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+            public boolean isTrusted(X509Certificate[] arg0, String arg1)
+                throws CertificateException {
               return true;
             }
           }).build();
@@ -136,49 +152,53 @@ public boolean isTrusted(X509Certificate[] arg0, String arg1) throws Certificate
       hostnameVerifier = new DefaultHostnameVerifier();
     }
 
-    SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(sslContext);
-    SchemeIOSessionStrategy httpsIOSessionStrategy = new SSLIOSessionStrategy(sslContext, hostnameVerifier);
+    SSLConnectionSocketFactory sslSocketFactory = new SSLConnectionSocketFactory(
+        sslContext);
+    SchemeIOSessionStrategy httpsIOSessionStrategy = new SSLIOSessionStrategy(
+        sslContext, hostnameVerifier);
 
     JestClientFactory jestClientFactory = new JestClientFactory();
-
-    if (hosts == null || hosts.length == 0 || port <= 1) {
-      throw new IllegalStateException("No hosts or port specified. Please set the host and port in nutch-site.xml");
-    }
-
-    List<String> urlsOfElasticsearchNodes = new ArrayList<String>();
-    for (String host : hosts) {
-      urlsOfElasticsearchNodes.add(new URL(https ? "https" : "http", host, port, "").toString());
-    }
-    HttpClientConfig.Builder builder = new HttpClientConfig.Builder(
-            urlsOfElasticsearchNodes).multiThreaded(true)
-            .connTimeout(300000).readTimeout(300000);
-    if (https) {
-      if (user != null && password != null) {
-        builder.defaultCredentials(user, password);
+    URL urlOfElasticsearchNode = new URL(https ? "https" : "http", host, port,
+        "");
+
+    if (host != null && port > 1) {
+      HttpClientConfig.Builder builder = new HttpClientConfig.Builder(
+          urlOfElasticsearchNode.toString()).multiThreaded(true)
+          .connTimeout(300000).readTimeout(300000);
+      if (https) {
+        if (user != null && password != null) {
+          builder.defaultCredentials(user, password);
+        }
+        builder.defaultSchemeForDiscoveredNodes("https")
+            .sslSocketFactory(sslSocketFactory) // this only affects sync calls
+            .httpsIOSessionStrategy(
+                httpsIOSessionStrategy); // this only affects async calls
       }
-      builder.defaultSchemeForDiscoveredNodes("https")
-          .sslSocketFactory(sslSocketFactory) // this only affects sync calls
-          .httpsIOSessionStrategy(httpsIOSessionStrategy); // this only affects async calls
+      jestClientFactory.setHttpClientConfig(builder.build());
+    } else {
+      throw new IllegalStateException(
+          "No host or port specified. Please set the host and port in nutch-site.xml");
     }
-    jestClientFactory.setHttpClientConfig(builder.build());
 
     client = jestClientFactory.getObject();
 
-    defaultIndex = conf.get(ElasticRestConstants.INDEX, "nutch");
-    defaultType = conf.get(ElasticRestConstants.TYPE, "doc");
-
-    maxBulkDocs = conf.getInt(ElasticRestConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
-    maxBulkLength = conf.getInt(ElasticRestConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+    defaultIndex = parameters.get(ElasticRestConstants.INDEX, "nutch");
+    defaultType = parameters.get(ElasticRestConstants.TYPE, "doc");
 
-    bulkBuilder = new Bulk.Builder().defaultIndex(defaultIndex).defaultType(defaultType);
+    maxBulkDocs = parameters
+        .getInt(ElasticRestConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = parameters
+        .getInt(ElasticRestConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
 
+    bulkBuilder = new Bulk.Builder().defaultIndex(defaultIndex)
+        .defaultType(defaultType);
   }
-  
+
   private static Object normalizeValue(Object value) {
     if (value == null) {
       return null;
     }
-    
+
     if (value instanceof Map || value instanceof Date) {
       return value;
     }
@@ -198,10 +218,12 @@ public void write(NutchDocument doc) throws IOException {
 
     // Loop through all fields of this doc
     for (String fieldName : doc.getFieldNames()) {
-      Set<Object> allFieldValues = new LinkedHashSet<>(doc.getField(fieldName).getValues());
-      
+      Set<Object> allFieldValues = new LinkedHashSet<>(
+          doc.getField(fieldName).getValues());
+
       if (allFieldValues.size() > 1) {
-        Object[] normalizedFieldValues = allFieldValues.stream().map(ElasticRestIndexWriter::normalizeValue).toArray();
+        Object[] normalizedFieldValues = allFieldValues.stream()
+            .map(ElasticRestIndexWriter::normalizeValue).toArray();
 
         // Loop through the values to keep track of the size of this document
         for (Object value : normalizedFieldValues) {
@@ -209,13 +231,14 @@ public void write(NutchDocument doc) throws IOException {
         }
 
         source.put(fieldName, normalizedFieldValues);
-      } else if(allFieldValues.size() == 1) {
-        Object normalizedFieldValue = normalizeValue(allFieldValues.iterator().next());
+      } else if (allFieldValues.size() == 1) {
+        Object normalizedFieldValue = normalizeValue(
+            allFieldValues.iterator().next());
         source.put(fieldName, normalizedFieldValue);
         bulkLength += normalizedFieldValue.toString().length();
       }
     }
-    
+
     String index;
     if (languages != null && languages.length > 0) {
       String language = (String) doc.getFieldValue("lang");
@@ -234,8 +257,8 @@ public void write(NutchDocument doc) throws IOException {
     } else {
       index = defaultIndex;
     }
-    Index indexRequest = new Index.Builder(source).index(index)
-        .type(type).id(id).build();
+    Index indexRequest = new Index.Builder(source).index(index).type(type)
+        .id(id).build();
 
     // Add this indexing request to a bulk request
     bulkBuilder.addAction(indexRequest);
@@ -258,14 +281,19 @@ public void delete(String key) throws IOException {
     try {
       if (languages != null && languages.length > 0) {
         Bulk.Builder bulkBuilder = new Bulk.Builder().defaultType(defaultType);
-        for (String lang : languages) {          
-          bulkBuilder.addAction(new Delete.Builder(key).index(getLanguageIndexName(lang)).type(defaultType).build());
+        for (String lang : languages) {
+          bulkBuilder.addAction(
+              new Delete.Builder(key).index(getLanguageIndexName(lang))
+                  .type(defaultType).build());
         }
-        bulkBuilder.addAction(new Delete.Builder(key).index(getSinkIndexName()).type(defaultType).build());
+        bulkBuilder.addAction(
+            new Delete.Builder(key).index(getSinkIndexName()).type(defaultType)
+                .build());
         client.execute(bulkBuilder.build());
       } else {
-        client.execute(new Delete.Builder(key).index(defaultIndex)
-          .type(defaultType).build());
+        client.execute(
+            new Delete.Builder(key).index(defaultIndex).type(defaultType)
+                .build());
       }
     } catch (IOException e) {
       LOG.error(ExceptionUtils.getStackTrace(e));
@@ -366,15 +394,6 @@ public String describe() {
   @Override
   public void setConf(Configuration conf) {
     config = conf;
-    String[] hosts = conf.getStrings(ElasticRestConstants.HOST);
-    String port = conf.get(ElasticRestConstants.PORT);
-
-    if (hosts == null || hosts.length == 0 || StringUtils.isBlank(port)) {
-      String message = "No hosts or port specified. Please set the host and port in nutch-site.xml";
-      message += "\n" + describe();
-      LOG.error(message);
-      throw new RuntimeException(message);
-    }
   }
 
   @Override
@@ -385,11 +404,11 @@ public Configuration getConf() {
   private String getLanguageIndexName(String lang) {
     return getComposedIndexName(defaultIndex, lang);
   }
-  
+
   private String getSinkIndexName() {
     return getComposedIndexName(defaultIndex, sink);
   }
-  
+
   private String getComposedIndexName(String prefix, String postfix) {
     return prefix + separator + postfix;
   }
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
index dd7063910..a6465109c 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticConstants.java
@@ -17,15 +17,13 @@
 package org.apache.nutch.indexwriter.elastic;
 
 public interface ElasticConstants {
-  public static final String ELASTIC_PREFIX = "elastic.";
-
-  public static final String HOSTS = ELASTIC_PREFIX + "host";
-  public static final String PORT = ELASTIC_PREFIX + "port";
-  public static final String CLUSTER = ELASTIC_PREFIX + "cluster";
-  public static final String INDEX = ELASTIC_PREFIX + "index";
-  public static final String MAX_BULK_DOCS = ELASTIC_PREFIX + "max.bulk.docs";
-  public static final String MAX_BULK_LENGTH = ELASTIC_PREFIX + "max.bulk.size";
-  public static final String EXPONENTIAL_BACKOFF_MILLIS = ELASTIC_PREFIX + "exponential.backoff.millis";
-  public static final String EXPONENTIAL_BACKOFF_RETRIES = ELASTIC_PREFIX + "exponential.backoff.retries";
-  public static final String BULK_CLOSE_TIMEOUT = ELASTIC_PREFIX + "bulk.close.timeout";
+  public static final String HOSTS = "host";
+  public static final String PORT = "port";
+  public static final String CLUSTER = "cluster";
+  public static final String INDEX = "index";
+  public static final String MAX_BULK_DOCS = "max.bulk.docs";
+  public static final String MAX_BULK_LENGTH = "max.bulk.size";
+  public static final String EXPONENTIAL_BACKOFF_MILLIS = "exponential.backoff.millis";
+  public static final String EXPONENTIAL_BACKOFF_RETRIES = "exponential.backoff.retries";
+  public static final String BULK_CLOSE_TIMEOUT = "bulk.close.timeout";
 }
diff --git a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
index a328b7fbd..1540241b0 100644
--- a/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
+++ b/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
@@ -29,6 +29,7 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.NutchField;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -75,47 +76,71 @@
 
   @Override
   public void open(Configuration conf, String name) throws IOException {
-    bulkCloseTimeout = conf.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT,
-        DEFAULT_BULK_CLOSE_TIMEOUT);
-    defaultIndex = conf.get(ElasticConstants.INDEX, DEFAULT_INDEX);
+    //Implementation not required
+  }
 
-    int maxBulkDocs = conf.getInt(ElasticConstants.MAX_BULK_DOCS,
-        DEFAULT_MAX_BULK_DOCS);
-    int maxBulkLength = conf.getInt(ElasticConstants.MAX_BULK_LENGTH,
-        DEFAULT_MAX_BULK_LENGTH);
-    int expBackoffMillis = conf.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
-        DEFAULT_EXP_BACKOFF_MILLIS);
-    int expBackoffRetries = conf.getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
-        DEFAULT_EXP_BACKOFF_RETRIES);
+  /**
+   * Initializes the internal variables from a given index writer configuration.
+   *
+   * @param parameters Params from the index writer configuration.
+   * @throws IOException Some exception thrown by writer.
+   */
+  @Override
+  public void open(IndexWriterParams parameters) throws IOException {
+    String cluster = parameters.get(ElasticConstants.CLUSTER);
+    String hosts = parameters.get(ElasticConstants.HOSTS);
 
-    client = makeClient(conf);
+    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
+      String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in index-writers.xml ";
+      message += "\n" + describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
 
-    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", maxBulkDocs, maxBulkLength);
+    bulkCloseTimeout = parameters.getLong(ElasticConstants.BULK_CLOSE_TIMEOUT,
+        DEFAULT_BULK_CLOSE_TIMEOUT);
+    defaultIndex = parameters.get(ElasticConstants.INDEX, DEFAULT_INDEX);
+
+    int maxBulkDocs = parameters
+        .getInt(ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
+    int maxBulkLength = parameters
+        .getInt(ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+    int expBackoffMillis = parameters
+        .getInt(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS,
+            DEFAULT_EXP_BACKOFF_MILLIS);
+    int expBackoffRetries = parameters
+        .getInt(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES,
+            DEFAULT_EXP_BACKOFF_RETRIES);
+
+    client = makeClient(parameters);
+
+    LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}",
+        maxBulkDocs, maxBulkLength);
     bulkProcessor = BulkProcessor.builder(client, bulkProcessorListener())
-      .setBulkActions(maxBulkDocs)
-      .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
-      .setConcurrentRequests(1)
-      .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
-          TimeValue.timeValueMillis(expBackoffMillis), expBackoffRetries))
-      .build();
+        .setBulkActions(maxBulkDocs)
+        .setBulkSize(new ByteSizeValue(maxBulkLength, ByteSizeUnit.BYTES))
+        .setConcurrentRequests(1).setBackoffPolicy(BackoffPolicy
+            .exponentialBackoff(TimeValue.timeValueMillis(expBackoffMillis),
+                expBackoffRetries)).build();
   }
 
-  /** Generates a TransportClient or NodeClient */
-  protected Client makeClient(Configuration conf) throws IOException {
-    String clusterName = conf.get(ElasticConstants.CLUSTER);
-    String[] hosts = conf.getStrings(ElasticConstants.HOSTS);
-    int port = conf.getInt(ElasticConstants.PORT, DEFAULT_PORT);
+  /**
+   * Generates a TransportClient or NodeClient
+   */
+  protected Client makeClient(IndexWriterParams parameters) throws IOException {
+    String clusterName = parameters.get(ElasticConstants.CLUSTER);
+    String[] hosts = parameters.getStrings(ElasticConstants.HOSTS);
+    int port = parameters.getInt(ElasticConstants.PORT, DEFAULT_PORT);
 
     Settings.Builder settingsBuilder = Settings.builder();
 
     BufferedReader reader = new BufferedReader(
-        conf.getConfResourceAsReader("elasticsearch.conf"));
+        config.getConfResourceAsReader("elasticsearch.conf"));
     String line;
-    String parts[];
+    String[] parts;
     while ((line = reader.readLine()) != null) {
       if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
-        line = line.trim();
-        parts = line.split("=");
+        parts = line.trim().split("=");
 
         if (parts.length == 2) {
           settingsBuilder.put(parts[0].trim(), parts[1].trim());
@@ -124,8 +149,9 @@ protected Client makeClient(Configuration conf) throws IOException {
     }
 
     // Set the cluster name and build the settings
-    if (StringUtils.isNotBlank(clusterName))
+    if (StringUtils.isNotBlank(clusterName)) {
       settingsBuilder.put("cluster.name", clusterName);
+    }
 
     Settings settings = settingsBuilder.build();
 
@@ -135,8 +161,9 @@ protected Client makeClient(Configuration conf) throws IOException {
     if (hosts != null && port > 1) {
       TransportClient transportClient = new PreBuiltTransportClient(settings);
 
-      for (String host: hosts)
-        transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
+      for (String host : hosts)
+        transportClient.addTransportAddress(
+            new InetSocketTransportAddress(InetAddress.getByName(host), port));
       client = transportClient;
     } else if (clusterName != null) {
       node = new Node(settings);
@@ -146,19 +173,24 @@ protected Client makeClient(Configuration conf) throws IOException {
     return client;
   }
 
-  /** Generates a default BulkProcessor.Listener */
+  /**
+   * Generates a default BulkProcessor.Listener
+   */
   protected BulkProcessor.Listener bulkProcessorListener() {
     return new BulkProcessor.Listener() {
       @Override
-      public void beforeBulk(long executionId, BulkRequest request) { }
+      public void beforeBulk(long executionId, BulkRequest request) {
+      }
 
       @Override
-      public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+      public void afterBulk(long executionId, BulkRequest request,
+          Throwable failure) {
         throw new RuntimeException(failure);
       }
 
       @Override
-      public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+      public void afterBulk(long executionId, BulkRequest request,
+          BulkResponse response) {
         if (response.hasFailures()) {
           LOG.warn("Failures occurred during bulk request");
         }
@@ -185,7 +217,8 @@ public void write(NutchDocument doc) throws IOException {
       }
     }
 
-    IndexRequest request = new IndexRequest(defaultIndex, type, id).source(source);
+    IndexRequest request = new IndexRequest(defaultIndex, type, id)
+        .source(source);
     bulkProcessor.add(request);
   }
 
@@ -211,7 +244,8 @@ public void close() throws IOException {
     try {
       bulkProcessor.awaitClose(bulkCloseTimeout, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
-      LOG.warn("interrupted while waiting for BulkProcessor to complete ({})", e.getMessage());
+      LOG.warn("interrupted while waiting for BulkProcessor to complete ({})",
+          e.getMessage());
     }
 
     client.close();
@@ -235,8 +269,8 @@ public String describe() {
     sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
         .append(" : elastic bulk index length in bytes. (default ")
         .append(DEFAULT_MAX_BULK_LENGTH).append(")\n");
-    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS)
-        .append(" : elastic bulk exponential backoff initial delay in milliseconds. (default ")
+    sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_MILLIS).append(
+        " : elastic bulk exponential backoff initial delay in milliseconds. (default ")
         .append(DEFAULT_EXP_BACKOFF_MILLIS).append(")\n");
     sb.append("\t").append(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES)
         .append(" : elastic bulk exponential backoff max retries. (default ")
@@ -250,15 +284,6 @@ public String describe() {
   @Override
   public void setConf(Configuration conf) {
     config = conf;
-    String cluster = conf.get(ElasticConstants.CLUSTER);
-    String hosts = conf.get(ElasticConstants.HOSTS);
-
-    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(hosts)) {
-      String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml ";
-      message += "\n" + describe();
-      LOG.error(message);
-      throw new RuntimeException(message);
-    }
   }
 
   @Override
diff --git a/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java b/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
index 58ac8b08e..dc59cd46f 100644
--- a/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
+++ b/src/plugin/indexer-elastic/src/test/org/apache/nutch/indexwriter/elastic/TestElasticIndexWriter.java
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.util.NutchConfiguration;
 import org.elasticsearch.action.Action;
@@ -43,6 +44,9 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 public class TestElasticIndexWriter {
@@ -103,7 +107,7 @@ public void close() { }
     // customize the plugin to signal successful bulk operations
     testIndexWriter = new ElasticIndexWriter() {
       @Override
-      protected Client makeClient(Configuration conf) {
+      protected Client makeClient(IndexWriterParams parameters) {
         return client;
       }
 
@@ -134,8 +138,12 @@ public void testBulkMaxDocs() throws IOException {
     conf.setInt(ElasticConstants.MAX_BULK_DOCS, numDocs);
     Job job = Job.getInstance(conf);
 
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put(ElasticConstants.CLUSTER, "nutch");
+    parameters.put(ElasticConstants.MAX_BULK_DOCS, String.valueOf(numDocs));
+
     testIndexWriter.setConf(conf);
-    testIndexWriter.open(conf, "name");
+    testIndexWriter.open(new IndexWriterParams(parameters));
 
     NutchDocument doc = new NutchDocument();
     doc.add("id", "http://www.example.com");
@@ -169,8 +177,12 @@ public void testBulkMaxLength() throws IOException {
     conf.setInt(ElasticConstants.MAX_BULK_LENGTH, testMaxBulkLength);
     Job job = Job.getInstance(conf);
 
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put(ElasticConstants.CLUSTER, "nutch");
+    parameters.put(ElasticConstants.MAX_BULK_LENGTH, String.valueOf(testMaxBulkLength));
+
     testIndexWriter.setConf(conf);
-    testIndexWriter.open(conf, "name");
+    testIndexWriter.open(new IndexWriterParams(parameters));
 
     NutchDocument doc = new NutchDocument();
     doc.add(key, value);
@@ -197,8 +209,13 @@ public void testBackoffPolicy() throws IOException {
 
     Job job = Job.getInstance(conf);
 
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put(ElasticConstants.CLUSTER, "nutch");
+    parameters.put(ElasticConstants.EXPONENTIAL_BACKOFF_RETRIES, String.valueOf(maxNumFailures));
+    parameters.put(ElasticConstants.MAX_BULK_DOCS, String.valueOf(numDocs));
+
     testIndexWriter.setConf(conf);
-    testIndexWriter.open(conf, "name");
+    testIndexWriter.open(new IndexWriterParams(parameters));
 
     NutchDocument doc = new NutchDocument();
     doc.add("id", "http://www.example.com");
diff --git a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java
index 86547b890..66a7bdc96 100644
--- a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java
+++ b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java
@@ -22,6 +22,7 @@
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
 
 import org.apache.nutch.indexer.IndexWriter;
@@ -30,7 +31,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
 public class RabbitIndexWriter implements IndexWriter {
@@ -50,7 +53,8 @@
 
   private int commitSize;
 
-  public static final Logger LOG = LoggerFactory.getLogger(RabbitIndexWriter.class);
+  private static final Logger LOG = LoggerFactory
+      .getLogger(MethodHandles.lookup().lookupClass());
 
   private Configuration config;
 
@@ -67,26 +71,40 @@ public Configuration getConf() {
   @Override
   public void setConf(Configuration conf) {
     config = conf;
+  }
 
-    serverHost = conf.get(RabbitMQConstants.SERVER_HOST, "localhost");
-    serverPort = conf.getInt(RabbitMQConstants.SERVER_PORT, 15672);
-    serverVirtualHost = conf.get(RabbitMQConstants.SERVER_VIRTUAL_HOST, null);
+  @Override
+  public void open(Configuration conf, String name) throws IOException {
+    //Implementation not required
+  }
 
-    serverUsername = conf.get(RabbitMQConstants.SERVER_USERNAME, "admin");
-    serverPassword = conf.get(RabbitMQConstants.SERVER_PASSWORD, "admin");
+  /**
+   * Initializes the internal variables from a given index writer configuration.
+   *
+   * @param parameters Params from the index writer configuration.
+   * @throws IOException Some exception thrown by writer.
+   */
+  @Override
+  public void open(IndexWriterParams parameters) throws IOException {
+    serverHost = parameters.get(RabbitMQConstants.SERVER_HOST, "localhost");
+    serverPort = parameters.getInt(RabbitMQConstants.SERVER_PORT, 5672);
+    serverVirtualHost = parameters
+        .get(RabbitMQConstants.SERVER_VIRTUAL_HOST, null);
 
-    exchangeServer = conf.get(RabbitMQConstants.EXCHANGE_SERVER, "nutch.exchange");
-    exchangeType = conf.get(RabbitMQConstants.EXCHANGE_TYPE, "direct");
+    serverUsername = parameters.get(RabbitMQConstants.SERVER_USERNAME, "admin");
+    serverPassword = parameters.get(RabbitMQConstants.SERVER_PASSWORD, "admin");
 
-    queueName = conf.get(RabbitMQConstants.QUEUE_NAME, "nutch.queue");
-    queueDurable = conf.getBoolean(RabbitMQConstants.QUEUE_DURABLE, true);
-    queueRoutingKey = conf.get(RabbitMQConstants.QUEUE_ROUTING_KEY, "nutch.key");
+    exchangeServer = parameters
+        .get(RabbitMQConstants.EXCHANGE_SERVER, "nutch.exchange");
+    exchangeType = parameters.get(RabbitMQConstants.EXCHANGE_TYPE, "direct");
 
-    commitSize = conf.getInt(RabbitMQConstants.COMMIT_SIZE, 250);
-  }
+    queueName = parameters.get(RabbitMQConstants.QUEUE_NAME, "nutch.queue");
+    queueDurable = parameters.getBoolean(RabbitMQConstants.QUEUE_DURABLE, true);
+    queueRoutingKey = parameters
+        .get(RabbitMQConstants.QUEUE_ROUTING_KEY, "nutch.key");
+
+    commitSize = parameters.getInt(RabbitMQConstants.COMMIT_SIZE, 250);
 
-  @Override
-  public void open(Configuration conf, String name) throws IOException {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost(serverHost);
     factory.setPort(serverPort);
@@ -99,7 +117,7 @@ public void open(Configuration conf, String name) throws IOException {
     factory.setPassword(serverPassword);
 
     try {
-      connection = factory.newConnection();
+      connection = factory.newConnection(UUID.randomUUID().toString());
       channel = connection.createChannel();
 
       channel.exchangeDeclare(exchangeServer, exchangeType, true);
@@ -117,9 +135,7 @@ public void update(NutchDocument doc) throws IOException {
 
     for (final Map.Entry<String, NutchField> e : doc) {
       RabbitDocument.RabbitDocumentField field = new RabbitDocument.RabbitDocumentField(
-              e.getKey(),
-              e.getValue().getWeight(),
-              e.getValue().getValues());
+          e.getKey(), e.getValue().getWeight(), e.getValue().getValues());
       rabbitDocument.addField(field);
     }
     rabbitDocument.setDocumentBoost(doc.getWeight());
@@ -132,8 +148,9 @@ public void update(NutchDocument doc) throws IOException {
 
   @Override
   public void commit() throws IOException {
-    if (!rabbitMessage.isEmpty()) {
-      channel.basicPublish(exchangeServer, queueRoutingKey, null, rabbitMessage.getBytes());
+    if(!rabbitMessage.isEmpty()) {
+      channel.basicPublish(exchangeServer, queueRoutingKey, null,
+          rabbitMessage.getBytes());
     }
     rabbitMessage.clear();
   }
@@ -144,9 +161,7 @@ public void write(NutchDocument doc) throws IOException {
 
     for (final Map.Entry<String, NutchField> e : doc) {
       RabbitDocument.RabbitDocumentField field = new RabbitDocument.RabbitDocumentField(
-              e.getKey(),
-              e.getValue().getWeight(),
-              e.getValue().getValues());
+          e.getKey(), e.getValue().getWeight(), e.getValue().getValues());
       rabbitDocument.addField(field);
     }
     rabbitDocument.setDocumentBoost(doc.getWeight());
@@ -162,8 +177,12 @@ public void write(NutchDocument doc) throws IOException {
   public void close() throws IOException {
     commit();//TODO: This is because indexing job never call commit method. It should be fixed.
     try {
-      channel.close();
-      connection.close();
+      if(channel.isOpen()) {
+        channel.close();
+      }
+      if(connection.isOpen()) {
+        connection.close();
+      }
     } catch (IOException | TimeoutException e) {
       throw makeIOException(e);
     }
@@ -183,11 +202,19 @@ private static IOException makeIOException(Exception e) {
   }
 
   public String describe() {
-    return "RabbitIndexWriter\n" +
-            "\t" + serverHost +  ":" + serverPort + " : URL of RabbitMQ server\n" +
-            "\t" + RabbitMQConstants.SERVER_VIRTUAL_HOST + " : Virtualhost name\n" +
-            "\t" + RabbitMQConstants.SERVER_USERNAME + " : Username for authentication\n" +
-            "\t" + RabbitMQConstants.SERVER_PASSWORD + " : Password for authentication\n" +
-            "\t" + RabbitMQConstants.COMMIT_SIZE + " : Buffer size when sending to RabbitMQ (default 250)\n";
+    StringBuilder sb = new StringBuilder("RabbitIndexWriter\n");
+    sb.append("\t").append(RabbitMQConstants.SERVER_HOST)
+        .append(" : Host of RabbitMQ server\n");
+    sb.append("\t").append(RabbitMQConstants.SERVER_PORT)
+        .append(" : Port of RabbitMQ server\n");
+    sb.append("\t").append(RabbitMQConstants.SERVER_VIRTUAL_HOST)
+        .append(" : Virtualhost name\n");
+    sb.append("\t").append(RabbitMQConstants.SERVER_USERNAME)
+        .append(" : Username for authentication\n");
+    sb.append("\t").append(RabbitMQConstants.SERVER_PASSWORD)
+        .append(" : Password for authentication\n");
+    sb.append("\t").append(RabbitMQConstants.COMMIT_SIZE)
+        .append(" : Buffer size when sending to RabbitMQ (default 250)\n");
+    return sb.toString();
   }
 }
diff --git a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitMQConstants.java b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitMQConstants.java
index 86d88d01d..fa671e819 100644
--- a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitMQConstants.java
+++ b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitMQConstants.java
@@ -17,28 +17,26 @@
 package org.apache.nutch.indexwriter.rabbit;
 
 interface RabbitMQConstants {
-  String RABBIT_PREFIX = "rabbitmq.indexer";
 
-  String SERVER_HOST = RABBIT_PREFIX + "server.host";
+    String SERVER_HOST = "server.host";
 
-  String SERVER_PORT = RABBIT_PREFIX + "server.port";
+    String SERVER_PORT = "server.port";
 
-  String SERVER_VIRTUAL_HOST = RABBIT_PREFIX + "server.virtualhost";
+    String SERVER_VIRTUAL_HOST = "server.virtualhost";
 
-  String SERVER_USERNAME = RABBIT_PREFIX + "server.username";
+    String SERVER_USERNAME = "server.username";
 
-  String SERVER_PASSWORD = RABBIT_PREFIX + "server.password";
+    String SERVER_PASSWORD = "server.password";
 
-  String EXCHANGE_SERVER = RABBIT_PREFIX + "exchange.server";
+    String EXCHANGE_SERVER = "exchange.server";
 
-  String EXCHANGE_TYPE = RABBIT_PREFIX + "exchange.type";
+    String EXCHANGE_TYPE = "exchange.type";
 
-  String QUEUE_NAME = RABBIT_PREFIX + "queue.name";
+    String QUEUE_NAME = "queue.name";
 
-  String QUEUE_DURABLE = RABBIT_PREFIX + "queue.durable";
+    String QUEUE_DURABLE = "queue.durable";
 
-  String QUEUE_ROUTING_KEY = RABBIT_PREFIX + "queue.routingkey";
+    String QUEUE_ROUTING_KEY = "queue.routingkey";
 
-
-  String COMMIT_SIZE = RABBIT_PREFIX + "commit.size";
+    String COMMIT_SIZE = "commit.size";
 }
diff --git a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java
index 44a382eab..5c793110e 100644
--- a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java
+++ b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java
@@ -19,19 +19,23 @@
 public interface SolrConstants {
   public static final String SOLR_PREFIX = "solr.";
 
-  public static final String SERVER_URL = SOLR_PREFIX + "server.url";
+  public static final String SERVER_TYPE = "type";
 
-  public static final String COMMIT_SIZE = SOLR_PREFIX + "commit.size";
+  public static final String SERVER_URL = "url";
+
+  public static final String COMMIT_SIZE = "commitSize";
 
   public static final String MAPPING_FILE = SOLR_PREFIX + "mapping.file";
 
-  public static final String USE_AUTH = SOLR_PREFIX + "auth";
+  public static final String USE_AUTH = "auth";
+
+  public static final String USERNAME = "username";
 
-  public static final String USERNAME = SOLR_PREFIX + "auth.username";
+  public static final String PASSWORD = "password";
 
-  public static final String PASSWORD = SOLR_PREFIX + "auth.password";
+  public static final String LOAD_BALANCE_URL = "loadbalanceURL";
 
-  public static final String COLLECTION = SOLR_PREFIX + "collection";
+  public static final String COLLECTION = "collection";
 
   public static final String ZOOKEEPER_HOSTS = SOLR_PREFIX + "zookeeper.hosts";
 
@@ -48,9 +52,8 @@
 
 
   @Deprecated
-  public static final String COMMIT_INDEX = SOLR_PREFIX + "commit.index";
+  public static final String COMMIT_INDEX = "commitIndex";
 
   @Deprecated
   public static final String PARAMS = SOLR_PREFIX + "params";
-
 }
diff --git a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
index 10000a78e..09a096a84 100644
--- a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
+++ b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java
@@ -19,6 +19,7 @@
 import java.lang.invoke.MethodHandles;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -26,13 +27,15 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.indexer.IndexWriter;
+import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.IndexerMapReduce;
 import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.NutchField;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.DateUtil;
@@ -41,7 +44,6 @@
 import org.slf4j.LoggerFactory;
 
 // WORK AROUND FOR NOT REMOVING URL ENCODED URLS!!!
-import java.net.URLDecoder;
 
 public class SolrIndexWriter implements IndexWriter {
 
@@ -49,7 +51,6 @@
       .getLogger(MethodHandles.lookup().lookupClass());
 
   private List<SolrClient> solrClients;
-  private SolrMappingReader solrMapping;
   private ModifiableSolrParams params;
 
   private Configuration config;
@@ -57,7 +58,7 @@
   private final List<SolrInputDocument> inputDocs = new ArrayList<SolrInputDocument>();
 
   private final List<SolrInputDocument> updateDocs = new ArrayList<SolrInputDocument>();
-    
+
   private final List<String> deleteIds = new ArrayList<String>();
 
   private int batchSize;
@@ -67,19 +68,67 @@
   private int totalUpdates = 0;
   private boolean delete = false;
 
+  @Override
   public void open(Configuration conf, String name) throws IOException {
-    solrClients = SolrUtils.getSolrClients(conf);
-    init(solrClients, conf);
+    //Implementation not required
   }
 
-  // package protected for tests
-  void init(List<SolrClient> solrClients, Configuration conf) throws IOException {
-    batchSize = conf.getInt(SolrConstants.COMMIT_SIZE, 1000);
-    solrMapping = SolrMappingReader.getInstance(conf);
-    delete = conf.getBoolean(IndexerMapReduce.INDEXER_DELETE, false);
+  /**
+   * Initializes the internal variables from a given index writer configuration.
+   *
+   * @param parameters Params from the index writer configuration.
+   * @throws IOException Some exception thrown by writer.
+   */
+  @Override
+  public void open(IndexWriterParams parameters) throws IOException {
+    String type = parameters.get("type", "http");
+
+    String[] urls = parameters.getStrings("url");
+
+    if (urls == null) {
+      String message = "Missing SOLR URL.\n" + describe();
+      LOG.error(message);
+      throw new RuntimeException(message);
+    }
+
+    this.solrClients = new ArrayList<>();
+
+    switch (type) {
+    case "http":
+      for (String url : urls) {
+        solrClients.add(SolrUtils.getHttpSolrClient(url));
+      }
+      break;
+    case "cloud":
+      for (String url : urls) {
+        CloudSolrClient sc = SolrUtils.getCloudSolrClient(url);
+        sc.setDefaultCollection(parameters.get(SolrConstants.COLLECTION));
+        solrClients.add(sc);
+      }
+      break;
+    case "concurrent":
+      // TODO: 1/08/17 Implement this
+      throw new UnsupportedOperationException(
+          "The type \"concurrent\" is not yet supported.");
+    case "lb":
+      // TODO: 1/08/17 Implement this
+      throw new UnsupportedOperationException(
+          "The type \"lb\" is not yet supported.");
+    default:
+      throw new IllegalArgumentException(
+          "The type \"" + type + "\" is not supported.");
+    }
+
+    init(parameters);
+  }
+
+  private void init(IndexWriterParams properties) {
+    batchSize = Integer
+        .parseInt(properties.getOrDefault(SolrConstants.COMMIT_SIZE, "1000"));
+    delete = config.getBoolean(IndexerMapReduce.INDEXER_DELETE, false);
     // parse optional params
     params = new ModifiableSolrParams();
-    String paramString = conf.get(IndexerMapReduce.INDEXER_PARAMS);
+    String paramString = config.get(IndexerMapReduce.INDEXER_PARAMS);
     if (paramString != null) {
       String[] values = paramString.split("&");
       for (String v : values) {
@@ -99,17 +148,18 @@ public void delete(String key) throws IOException {
       LOG.error("Error decoding: " + key);
       throw new IOException("UnsupportedEncodingException for " + key);
     } catch (IllegalArgumentException e) {
-      LOG.warn("Could not decode: " + key + ", it probably wasn't encoded in the first place..");
+      LOG.warn("Could not decode: " + key
+          + ", it probably wasn't encoded in the first place..");
     }
-    
+
     // escape solr hash separator
     key = key.replaceAll("!", "\\!");
-    
+
     if (delete) {
       deleteIds.add(key);
       totalDeletes++;
     }
-    
+
     if (deleteIds.size() >= batchSize) {
       push();
     }
@@ -149,12 +199,7 @@ public void write(NutchDocument doc) throws IOException {
           val2 = SolrUtils.stripNonCharCodepoints((String) val);
         }
 
-        inputDoc.addField(solrMapping.mapKey(e.getKey()), val2, e.getValue()
-            .getWeight());
-        String sCopy = solrMapping.mapCopyKey(e.getKey());
-        if (sCopy != e.getKey()) {
-          inputDoc.addField(sCopy, val);
-        }
+        inputDoc.addField(e.getKey(), val2, e.getValue().getWeight());
       }
     }
 
@@ -186,12 +231,13 @@ public void commit() throws IOException {
       LOG.error("Failed to commit solr connection: " + e.getMessage()); // FIXME
     }
   }
-    
+
   public void push() throws IOException {
     if (inputDocs.size() > 0) {
       try {
-        LOG.info("Indexing " + Integer.toString(inputDocs.size())
-            + "/" + Integer.toString(totalAdds) + " documents");
+        LOG.info(
+            "Indexing " + Integer.toString(inputDocs.size()) + "/" + Integer
+                .toString(totalAdds) + " documents");
         LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
         numDeletes = 0;
         UpdateRequest req = new UpdateRequest();
@@ -209,8 +255,9 @@ public void push() throws IOException {
 
     if (deleteIds.size() > 0) {
       try {
-        LOG.info("SolrIndexer: deleting " + Integer.toString(deleteIds.size()) 
-            + "/" + Integer.toString(totalDeletes) + " documents");
+        LOG.info(
+            "SolrIndexer: deleting " + Integer.toString(deleteIds.size()) + "/"
+                + Integer.toString(totalDeletes) + " documents");
         for (SolrClient solrClient : solrClients) {
           solrClient.deleteById(deleteIds);
         }
@@ -236,29 +283,22 @@ public Configuration getConf() {
   @Override
   public void setConf(Configuration conf) {
     config = conf;
-    String serverURL = conf.get(SolrConstants.SERVER_URL);
-    String zkHosts = conf.get(SolrConstants.ZOOKEEPER_HOSTS);
-    if (serverURL == null && zkHosts == null) {
-      String message = "Missing SOLR URL and Zookeeper URL. Either on should be set via -D "
-          + SolrConstants.SERVER_URL + " or -D " + SolrConstants.ZOOKEEPER_HOSTS;
-      message += "\n" + describe();
-      LOG.error(message);
-      throw new RuntimeException(message);
-    }
   }
 
+  /**
+   * Returns a String describing the IndexWriter instance and the specific parameters it can take.
+   *
+   * @return The full description.
+   */
+  @Override
   public String describe() {
     StringBuffer sb = new StringBuffer("SOLRIndexWriter\n");
+    sb.append("\t").append(SolrConstants.SERVER_TYPE).append(
+        " : Type of the server. Can be: \"cloud\", \"concurrent\", \"http\" or \"lb\"\n");
     sb.append("\t").append(SolrConstants.SERVER_URL)
-        .append(" : URL of the SOLR instance\n");
-    sb.append("\t").append(SolrConstants.ZOOKEEPER_HOSTS)
-        .append(" : URL of the Zookeeper quorum\n");
+        .append(" : URL of the SOLR instance or URL of the Zookeeper quorum\n");
     sb.append("\t").append(SolrConstants.COMMIT_SIZE)
         .append(" : buffer size when sending to SOLR (default 1000)\n");
-    sb.append("\t")
-        .append(SolrConstants.MAPPING_FILE)
-        .append(
-            " : name of the mapping file for fields (default solrindex-mapping.xml)\n");
     sb.append("\t").append(SolrConstants.USE_AUTH)
         .append(" : use authentication (default false)\n");
     sb.append("\t").append(SolrConstants.USERNAME)
diff --git a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrMappingReader.java b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrMappingReader.java
deleted file mode 100644
index 9b9ec4250..000000000
--- a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrMappingReader.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nutch.indexwriter.solr;
-
-import java.lang.invoke.MethodHandles;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.util.ObjectCache;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
-
-public class SolrMappingReader {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(MethodHandles.lookup().lookupClass());
-
-  private Configuration conf;
-
-  private Map<String, String> keyMap = new HashMap<String, String>();
-  private Map<String, String> copyMap = new HashMap<String, String>();
-  private String uniqueKey = "id";
-
-  public static synchronized SolrMappingReader getInstance(Configuration conf) {
-    ObjectCache cache = ObjectCache.get(conf);
-    SolrMappingReader instance = (SolrMappingReader) cache
-        .getObject(SolrMappingReader.class.getName());
-    if (instance == null) {
-      instance = new SolrMappingReader(conf);
-      cache.setObject(SolrMappingReader.class.getName(), instance);
-    }
-    return instance;
-  }
-
-  protected SolrMappingReader(Configuration conf) {
-    this.conf = conf;
-    parseMapping();
-  }
-
-  private void parseMapping() {
-    InputStream ssInputStream = null;
-    ssInputStream = conf.getConfResourceAsInputStream(conf.get(
-        SolrConstants.MAPPING_FILE, "solrindex-mapping.xml"));
-
-    InputSource inputSource = new InputSource(ssInputStream);
-    try {
-      DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-      DocumentBuilder builder = factory.newDocumentBuilder();
-      Document document = builder.parse(inputSource);
-      Element rootElement = document.getDocumentElement();
-      NodeList fieldList = rootElement.getElementsByTagName("field");
-      if (fieldList.getLength() > 0) {
-        for (int i = 0; i < fieldList.getLength(); i++) {
-          Element element = (Element) fieldList.item(i);
-          LOG.info("source: " + element.getAttribute("source") + " dest: "
-              + element.getAttribute("dest"));
-          keyMap.put(element.getAttribute("source"),
-              element.getAttribute("dest"));
-        }
-      }
-      NodeList copyFieldList = rootElement.getElementsByTagName("copyField");
-      if (copyFieldList.getLength() > 0) {
-        for (int i = 0; i < copyFieldList.getLength(); i++) {
-          Element element = (Element) copyFieldList.item(i);
-          LOG.info("source: " + element.getAttribute("source") + " dest: "
-              + element.getAttribute("dest"));
-          copyMap.put(element.getAttribute("source"),
-              element.getAttribute("dest"));
-        }
-      }
-      NodeList uniqueKeyItem = rootElement.getElementsByTagName("uniqueKey");
-      if (uniqueKeyItem.getLength() > 1) {
-        LOG.warn("More than one unique key definitions found in solr index mapping, using default 'id'");
-        uniqueKey = "id";
-      } else if (uniqueKeyItem.getLength() == 0) {
-        LOG.warn("No unique key definition found in solr index mapping using, default 'id'");
-      } else {
-        uniqueKey = uniqueKeyItem.item(0).getFirstChild().getNodeValue();
-      }
-    } catch (MalformedURLException e) {
-      LOG.warn(e.toString());
-    } catch (SAXException e) {
-      LOG.warn(e.toString());
-    } catch (IOException e) {
-      LOG.warn(e.toString());
-    } catch (ParserConfigurationException e) {
-      LOG.warn(e.toString());
-    }
-  }
-
-  public Map<String, String> getKeyMap() {
-    return keyMap;
-  }
-
-  public Map<String, String> getCopyMap() {
-    return copyMap;
-  }
-
-  public String getUniqueKey() {
-    return uniqueKey;
-  }
-
-  public String hasCopy(String key) {
-    if (copyMap.containsKey(key)) {
-      key = (String) copyMap.get(key);
-    }
-    return key;
-  }
-
-  public String mapKey(String key) throws IOException {
-    if (keyMap.containsKey(key)) {
-      key = (String) keyMap.get(key);
-    }
-    return key;
-  }
-
-  public String mapCopyKey(String key) throws IOException {
-    if (copyMap.containsKey(key)) {
-      key = (String) copyMap.get(key);
-    }
-    return key;
-  }
-}
diff --git a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
index c8ad54b38..74b405394 100644
--- a/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
+++ b/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java
@@ -23,8 +23,8 @@
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 
 import java.net.MalformedURLException;
 
@@ -34,16 +34,15 @@
       .getLogger(MethodHandles.lookup().lookupClass());
 
   /**
-   *
-   *
-   * @param job
+   * @param conf
    * @return SolrClient
    */
-  public static ArrayList<SolrClient> getSolrClients(Configuration conf) throws MalformedURLException {
+  public static ArrayList<SolrClient> getSolrClients(Configuration conf)
+      throws MalformedURLException {
     String[] urls = conf.getStrings(SolrConstants.SERVER_URL);
     String[] zkHostString = conf.getStrings(SolrConstants.ZOOKEEPER_HOSTS);
     ArrayList<SolrClient> solrClients = new ArrayList<SolrClient>();
-    
+
     if (zkHostString != null && zkHostString.length > 0) {
       for (int i = 0; i < zkHostString.length; i++) {
         CloudSolrClient sc = getCloudSolrClient(zkHostString[i]);
@@ -60,18 +59,20 @@
     return solrClients;
   }
 
-  public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException {
+  public static CloudSolrClient getCloudSolrClient(String url)
+      throws MalformedURLException {
     CloudSolrClient sc = new CloudSolrClient(url.replace('|', ','));
     sc.setParallelUpdates(true);
     sc.connect();
     return sc;
   }
 
-  public static SolrClient getHttpSolrClient(String url) throws MalformedURLException {
-    SolrClient sc =new HttpSolrClient(url);
+  public static SolrClient getHttpSolrClient(String url)
+      throws MalformedURLException {
+    SolrClient sc = new HttpSolrClient(url);
     return sc;
   }
-  
+
   public static String stripNonCharCodepoints(String input) {
     StringBuilder retval = new StringBuilder();
     char ch;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> SolrIndexer to write to multiple servers.
> -----------------------------------------
>
>                 Key: NUTCH-1480
>                 URL: https://issues.apache.org/jira/browse/NUTCH-1480
>             Project: Nutch
>          Issue Type: Improvement
>          Components: indexer
>            Reporter: Markus Jelsma
>            Assignee: Markus Jelsma
>            Priority: Minor
>         Attachments: NUTCH-1480-1.6.1.patch, adding-support-for-sharding-indexer-for-solr.patch
>
>
> SolrUtils should return an array of SolrServers and read the SolrUrl as a comma delimited list of URL's using Configuration.getString(). SolrWriter should be able to handle this list of SolrServers.
> This is useful if you want to send documents to multiple servers if no replication is available or if you want to send documents to multiple NOCs.
> edit:
> This does not replace NUTCH-1377 but complements it. With NUTCH-1377 this issue allows you to index to multiple SolrCloud clusters at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message