nutch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NUTCH-2580) Improvements for Rabbitmq support
Date Sat, 02 Jun 2018 11:25:00 GMT

    [ https://issues.apache.org/jira/browse/NUTCH-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16499012#comment-16499012 ] 

ASF GitHub Bot commented on NUTCH-2580:
---------------------------------------

sebastian-nagel closed pull request #335: fix for NUTCH-2580 contributed by r0ann3l
URL: https://github.com/apache/nutch/pull/335
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.xml b/build.xml
index 0d915e8b1..1d680d0bd 100644
--- a/build.xml
+++ b/build.xml
@@ -195,6 +195,7 @@
       <packageset dir="${plugins.dir}/language-identifier/src/java"/>
       <packageset dir="${plugins.dir}/lib-htmlunit/src/java"/>
       <packageset dir="${plugins.dir}/lib-http/src/java"/>
+      <packageset dir="${plugins.dir}/lib-rabbitmq/src/java"/>
       <packageset dir="${plugins.dir}/lib-regex-filter/src/java"/>
       <packageset dir="${plugins.dir}/lib-selenium/src/java"/>
       <packageset dir="${plugins.dir}/microformats-reltag/src/java"/>
@@ -652,6 +653,7 @@
       <packageset dir="${plugins.dir}/language-identifier/src/java"/>
       <packageset dir="${plugins.dir}/lib-htmlunit/src/java"/>
       <packageset dir="${plugins.dir}/lib-http/src/java"/>
+      <packageset dir="${plugins.dir}/lib-rabbitmq/src/java"/>
       <packageset dir="${plugins.dir}/lib-regex-filter/src/java"/>
       <packageset dir="${plugins.dir}/lib-selenium/src/java"/>
       <packageset dir="${plugins.dir}/microformats-reltag/src/java"/>
@@ -1073,6 +1075,7 @@
         <source path="${plugins.dir}/lib-htmlunit/src/java/" />
         <source path="${plugins.dir}/lib-http/src/java/" />
         <source path="${plugins.dir}/lib-http/src/test/" />
+        <source path="${plugins.dir}/lib-rabbitmq/src/java/" />
         <source path="${plugins.dir}/lib-regex-filter/src/java/" />
         <source path="${plugins.dir}/lib-regex-filter/src/test/" />
         <source path="${plugins.dir}/lib-selenium/src/java/" />
diff --git a/conf/index-writers.xml.template b/conf/index-writers.xml.template
index 118c8bc88..02c482629 100644
--- a/conf/index-writers.xml.template
+++ b/conf/index-writers.xml.template
@@ -46,17 +46,18 @@
   </writer>
   <writer id="indexer_rabbit_1" class="org.apache.nutch.indexwriter.rabbit.RabbitIndexWriter">
     <parameters>
-      <param name="server.host" value="localhost"/>
-      <param name="server.port" value="5672"/>
-      <param name="server.virtualhost" value="nutch"/>
-      <param name="server.username" value="admin"/>
-      <param name="server.password" value="admin"/>
-      <param name="exchange.server" value="nutch.exchange"/>
-      <param name="exchange.type" value="direct"/>
+      <param name="server.uri" value="amqp://guest:guest@localhost:5672/"/>
+      <param name="binding" value="false"/>
+      <param name="binding.arguments" value=""/>
+      <param name="exchange.name" value=""/>
+      <param name="exchange.options" value="type=direct,durable=true"/>
       <param name="queue.name" value="nutch.queue"/>
-      <param name="queue.durable" value="true"/>
-      <param name="queue.routingkey" value="nutch.key"/>
+      <param name="queue.options" value="durable=true,exclusive=false,auto-delete=false"/>
+      <param name="routingkey" value=""/>
+      <param name="commit.commit" value="multiple"/>
       <param name="commit.size" value="250"/>
+      <param name="headers.static" value=""/>
+      <param name="headers.dynamic" value=""/>
     </parameters>
     <mapping>
       <copy>
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 8874fc0bc..e034a4f3c 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -2055,92 +2055,132 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter-->
   last operation.</description>
 </property>
 
+<!-- RabbitMQ indexer properties -->
+
 <property>
-  <name>rabbitmq.indexer.server.host</name>
-  <value>localhost</value>
+  <name>rabbitmq.indexer.server.uri</name>
+  <value>amqp://guest:guest@localhost:5672/</value>
   <description>
-    Host on which the RabbitMQ server is running. Default "localhost".
+    URI with connection parameters in the form
+    amqp://username:password@hostname:port/virtualHost
+    Where:
+    username is the username for RabbitMQ server.
+    password is the password for RabbitMQ server.
+    hostname is where the RabbitMQ server is running.
+    port is where the RabbitMQ server is listening.
+    virtualHost is where where the exchange is and the user has access.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.server.port</name>
-  <value>5672</value>
+  <name>rabbitmq.indexer.binding</name>
+  <value>false</value>
   <description>
-    Port on which the RabbitMQ server is listening. Default "5672".
+    Whether the relationship between an exchange and a queue is created
+    automatically. Default "false".
+
+    NOTE: Binding between exchanges is not supported.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.server.virtualhost</name>
+  <name>rabbitmq.indexer.binding.arguments</name>
   <value></value>
   <description>
-    Virtual Host where the exchange is and the user has access. Default "/".
+    Arguments used in binding. It must have the form key1=value1,key2=value2.
+    This value is only used when the exchange's type is headers and
+    the value of 'rabbitmq.indexer.binding' property is true. In other cases
+    is ignored.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.server.username</name>
-  <value>admin</value>
+  <name>rabbitmq.indexer.exchange.name</name>
+  <value></value>
   <description>
-    Username for RabbitMQ server. Default "admin"
+    Name for the exchange where the messages will be sent. Default "".
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.server.password</name>
-  <value>admin</value>
+  <name>rabbitmq.indexer.exchange.options</name>
+  <value>type=direct,durable=true</value>
   <description>
-    Password for RabbitMQ server. Default "admin"
+    Options used when the exchange is created.
+    Only used when the value of 'rabbitmq.indexer.binding' property is true.
+    Default "type=direct,durable=true".
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.exchange.server</name>
-  <value></value>
+  <name>rabbitmq.indexer.queue.name</name>
+  <value>nutch.queue</value>
   <description>
-    Name for the exchange server to use. Default "nutch.exchange"
+    Name of the queue used to create the binding. Default "nutch.queue".
+    Only used when the value of 'rabbitmq.indexer.binding' property is true.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.exchange.type</name>
-  <value></value>
+  <name>rabbitmq.indexer.queue.options</name>
+  <value>durable=true,exclusive=false,auto-delete=false</value>
   <description>
-    Type of the exchange. Default "direct".
+    Options used when the queue is created.
+    Only used when the value of 'rabbitmq.indexer.binding' property is true.
+    Default "durable=true,exclusive=false,auto-delete=false".
+
+    It must have the form
+    durable={durable},exclusive={exclusive},auto-delete={auto-delete},arguments={arguments}
+    where:
+    durable is true or false
+    exclusive is true or false
+    auto-delete is true or false
+    arguments must be the for {key1:value1;key2:value2}
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.queue.durable</name>
+  <name>rabbitmq.indexer.routingkey</name>
   <value></value>
   <description>
-    Message durability into the queue. Default "true".
+    The routing key used to publish messages to specific queues.
+    It is only used when the exchange type is "topic" or "direct". Default
+    is the value of 'rabbitmq.indexer.queue.name' property.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.queue.name</name>
-  <value></value>
+  <name>rabbitmq.indexer.commit.mode</name>
+  <value>multiple</value>
   <description>
-    Name of the queue where the messages will be sent. Default "nutch.queue"
+    "single" if a message contains only one document. In this case a header
+    with the action (write, update or delete) will be added.
+    "multiple" if a message contains all documents. Default "multiple".
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.queue.routingkey</name>
+  <name>rabbitmq.indexer.commit.size</name>
+  <value>250</value>
+  <description>
+    Amount of documents to send into each message if the value of
+    'rabbitmq.indexer.commit.mode' property is "multiple". Default "250".
+  </description>
+</property>
+
+<property>
+  <name>rabbitmq.indexer.headers.static</name>
   <value></value>
   <description>
-    The routingKey used by indexer to publish messages to specific queues.
-    If the exchange type is "fanout", then this property is ignored. Default is "nutch.key"
+    Headers to add to each message. It must have the form key1=value1,key2=value2.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.indexer.commit.size</name>
-  <value>250</value>
+  <name>rabbitmq.indexer.headers.dynamic</name>
+  <value></value>
   <description>
-    Amount of documents to send into each message. Default "250"
+    Document's fields to add as headers to each message. It must have the form field1,field2.
   </description>
 </property>
 
@@ -2641,87 +2681,105 @@ visit https://wiki.apache.org/nutch/SimilarityScoringFilter-->
   </description>
 </property>
 
-<!-- RabbitMQ properties -->
+<!-- RabbitMQ publisher properties -->
 
 <property>
-  <name>rabbitmq.exchange.server</name>
-  <value></value>
+  <name>rabbitmq.publisher.server.uri</name>
+  <value>amqp://guest:guest@localhost:5672/</value>
   <description>
-    Name for the exchange server to use. Default - "fetcher_log"
+    URI with connection parameters in the form
+    amqp://username:password@hostname:port/virtualHost
+    where:
+    username is the username for RabbitMQ server.
+    password is the password for RabbitMQ server.
+    hostname is where the RabbitMQ server is running.
+    port is where the RabbitMQ server is listening.
+    virtualHost is where where the exchange is and the user has access.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.exchange.type</name>
-  <value></value>
+  <name>rabbitmq.publisher.binding</name>
+  <value>false</value>
   <description>
-    There are a few exchange types available: direct, topic, headers and fanout. Default "fanout".
-  </description>
-</property>
+    Whether the relationship between an exchange and a queue is created
+    automatically. Default "false".
 
-<property>
-  <name>rabbitmq.host</name>
-  <value></value>
-  <description>
-    Host on which the RabbitMQ server is running. Default "localhost".
+    NOTE: Binding between exchanges is not supported.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.port</name>
+  <name>rabbitmq.publisher.binding.arguments</name>
   <value></value>
   <description>
-    Port on which the RabbitMQ server is listening. Default "5672".
+    Arguments used in binding. It must have the form key1=value1,key2=value2.
+    This value is only used when the exchange's type is headers and
+    the value of 'rabbitmq.publisher.binding' property is true. In other cases
+    is ignored.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.virtualhost</name>
+  <name>rabbitmq.publisher.exchange.name</name>
   <value></value>
   <description>
-    Default 'null'
+    Name for the exchange where the messages will be sent. Default "".
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.username</name>
-  <value></value>
+  <name>rabbitmq.publisher.exchange.options</name>
+  <value>type=direct,durable=true</value>
   <description>
-    Default 'null'
+    Options used when the exchange is created.
+    Only used when the value of 'rabbitmq.publisher.binding' property is true.
+    Default "type=direct,durable=true".
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.password</name>
-  <value></value>
+  <name>rabbitmq.publisher.queue.name</name>
+  <value>nutch.events.queue</value>
   <description>
-    Default 'null'
+    Name of the queue used to create the binding. Default "nutch.queue".
+    Only used when the value of 'rabbitmq.publisher.binding' property is true.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.queue.name</name>
-  <value></value>
+  <name>rabbitmq.publisher.queue.options</name>
+  <value>durable=true,exclusive=false,auto-delete=false</value>
   <description>
-    Default 'fanout.queue'
+    Options used when the queue is created.
+    Only used when the value of 'rabbitmq.publisher.binding' property is true.
+    Default "durable=true,exclusive=false,auto-delete=false".
+
+    It must have the form
+    durable={durable},exclusive={exclusive},auto-delete={auto-delete},arguments={arguments}
+    where:
+    durable is true or false
+    exclusive is true or false
+    auto-delete is true or false
+    arguments must be the for {key1:value1;key2:value2}
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.queue.durable</name>
+  <name>rabbitmq.publisher.routingkey</name>
   <value></value>
   <description>
-    Default 'true'
+    The routing key used to publish messages to specific queues.
+    It is only used when the exchange type is "topic" or "direct". Default
+    is the value of 'rabbitmq.publisher.queue.name' property.
   </description>
 </property>
 
 <property>
-  <name>rabbitmq.queue.routingkey</name>
+  <name>rabbitmq.publisher.headers.static</name>
   <value></value>
   <description>
-    Default is 'fanout.key'
-    The routingKey used by publisher to publish messages to specific queues. 
-    If the exchange type is "fanout", then this property is ignored.
+    Headers to add to each message. It must have the form key1=value1,key2=value2.
   </description>
 </property>
 
diff --git a/src/java/org/apache/nutch/indexer/IndexWriterParams.java b/src/java/org/apache/nutch/indexer/IndexWriterParams.java
index 9c9ee72ef..cc91ec02a 100644
--- a/src/java/org/apache/nutch/indexer/IndexWriterParams.java
+++ b/src/java/org/apache/nutch/indexer/IndexWriterParams.java
@@ -7,52 +7,60 @@
 
 public class IndexWriterParams extends HashMap<String, String> {
 
-    /**
-     * Constructs a new <tt>HashMap</tt> with the same mappings as the
-     * specified <tt>Map</tt>.  The <tt>HashMap</tt> is created with
-     * default load factor (0.75) and an initial capacity sufficient to
-     * hold the mappings in the specified <tt>Map</tt>.
-     *
-     * @param m the map whose mappings are to be placed in this map
-     * @throws NullPointerException if the specified map is null
-     */
-    public IndexWriterParams(Map<? extends String, ? extends String> m) {
-        super(m);
-    }
+  /**
+   * Constructs a new <tt>HashMap</tt> with the same mappings as the
+   * specified <tt>Map</tt>.  The <tt>HashMap</tt> is created with
+   * default load factor (0.75) and an initial capacity sufficient to
+   * hold the mappings in the specified <tt>Map</tt>.
+   *
+   * @param m the map whose mappings are to be placed in this map
+   * @throws NullPointerException if the specified map is null
+   */
+  public IndexWriterParams(Map<? extends String, ? extends String> m) {
+    super(m);
+  }
+
+  public String get(String name, String defaultValue) {
+    return this.getOrDefault(name, defaultValue);
+  }
 
-    public String get(String name, String defaultValue) {
-        return this.getOrDefault(name, defaultValue);
+  public boolean getBoolean(String name, boolean defaultValue) {
+    String value;
+    if ((value = this.get(name)) != null && !"".equals(value)) {
+      return Boolean.parseBoolean(value);
     }
 
-    public boolean getBoolean(String name, boolean defaultValue) {
-        String value;
-        if ((value = this.get(name)) != null) {
-            return Boolean.parseBoolean(value);
-        }
+    return defaultValue;
+  }
 
-        return defaultValue;
+  public long getLong(String name, long defaultValue) {
+    String value;
+    if ((value = this.get(name)) != null && !"".equals(value)) {
+      return Long.parseLong(value);
     }
 
-    public long getLong(String name, long defaultValue) {
-        String value;
-        if ((value = this.get(name)) != null) {
-            return Long.parseLong(value);
-        }
+    return defaultValue;
+  }
 
-        return defaultValue;
+  public int getInt(String name, int defaultValue) {
+    String value;
+    if ((value = this.get(name)) != null && !"".equals(value)) {
+      return Integer.parseInt(value);
     }
 
-    public int getInt(String name, int defaultValue) {
-        String value;
-        if ((value = this.get(name)) != null) {
-            return Integer.parseInt(value);
-        }
+    return defaultValue;
+  }
 
-        return defaultValue;
-    }
+  public String[] getStrings(String name) {
+    String value = this.get(name);
+    return StringUtils.getStrings(value);
+  }
 
-    public String[] getStrings(String name) {
-        String value = this.get(name);
-        return StringUtils.getStrings(value);
+  public String[] getStrings(String name, String... defaultValue) {
+    String value;
+    if ((value = this.get(name)) != null && !"".equals(value)) {
+      return StringUtils.getStrings(value);
     }
+    return defaultValue;
+  }
 }
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index 3f579e841..5a3a8c910 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -30,6 +30,7 @@
     <ant dir="lib-htmlunit" target="deploy"/>
     <ant dir="lib-http" target="deploy"/>
     <ant dir="lib-nekohtml" target="deploy"/>
+    <ant dir="lib-rabbitmq" target="deploy"/>
     <ant dir="lib-regex-filter" target="deploy"/>
     <ant dir="lib-selenium" target="deploy"/>
     <ant dir="lib-xml" target="deploy"/>
@@ -186,6 +187,7 @@
     <ant dir="lib-htmlunit" target="clean"/>
     <ant dir="lib-http" target="clean"/>
     <ant dir="lib-nekohtml" target="clean"/>
+    <ant dir="lib-rabbitmq" target="clean"/>
     <ant dir="lib-regex-filter" target="clean"/>
     <ant dir="lib-selenium" target="clean"/>
     <ant dir="lib-xml" target="clean"/>
diff --git a/src/plugin/indexer-rabbit/build.xml b/src/plugin/indexer-rabbit/build.xml
index e996becc1..1e6124b1b 100644
--- a/src/plugin/indexer-rabbit/build.xml
+++ b/src/plugin/indexer-rabbit/build.xml
@@ -19,4 +19,16 @@
 
   <import file="../build-plugin.xml" />
 
+  <!-- Build compilation dependencies -->
+  <target name="deps-jar">
+    <ant target="jar" inheritall="false" dir="../lib-rabbitmq"/>
+  </target>
+
+  <!-- Add compilation dependencies to classpath -->
+  <path id="plugin.deps">
+    <fileset dir="${nutch.root}/build">
+      <include name="**/lib-rabbitmq/*.jar" />
+    </fileset>
+  </path>
+
 </project>
diff --git a/src/plugin/indexer-rabbit/ivy.xml b/src/plugin/indexer-rabbit/ivy.xml
index 0954a3d47..b88ccd6d4 100644
--- a/src/plugin/indexer-rabbit/ivy.xml
+++ b/src/plugin/indexer-rabbit/ivy.xml
@@ -27,17 +27,12 @@
   </info>
 
   <configurations>
-    <include file="../../..//ivy/ivy-configurations.xml"/>
+    <include file="../../../ivy/ivy-configurations.xml"/>
   </configurations>
 
   <publications>
     <!--get the artifact from our module name-->
     <artifact conf="master"/>
   </publications>
-
-  <dependencies>
-    <dependency org="com.rabbitmq" name="amqp-client" rev="3.6.5"/>
-    <dependency org="com.google.code.gson" name="gson" rev="2.7"/>
-  </dependencies>
   
 </ivy-module>
diff --git a/src/plugin/indexer-rabbit/plugin.xml b/src/plugin/indexer-rabbit/plugin.xml
index 051d34b33..3848b26e8 100644
--- a/src/plugin/indexer-rabbit/plugin.xml
+++ b/src/plugin/indexer-rabbit/plugin.xml
@@ -21,12 +21,11 @@
     <library name="indexer-rabbit.jar">
       <export name="*" />
     </library>
-    <library name="amqp-client-3.6.5.jar"/>
-    <library name="gson-2.7.jar"/>
   </runtime>
 
   <requires>
-    <import plugin="nutch-extensionpoints" />
+    <import plugin="nutch-extensionpoints"/>
+    <import plugin="lib-rabbitmq"/>
   </requires>
 
   <extension id="org.apache.nutch.indexer.rabbit"
diff --git a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitDocument.java b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitDocument.java
index 238f0171a..dd0c3092d 100644
--- a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitDocument.java
+++ b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitDocument.java
@@ -16,39 +16,54 @@
  */
 package org.apache.nutch.indexwriter.rabbit;
 
+import com.google.gson.Gson;
+
 import java.util.LinkedList;
 import java.util.List;
 
 class RabbitDocument {
-    private List<RabbitDocumentField> fields;
+  private List<RabbitDocumentField> fields;
 
-    private float documentBoost;
+  private float documentBoost;
 
-    RabbitDocument() {
-        this.fields = new LinkedList<>();
-    }
+  RabbitDocument() {
+    this.fields = new LinkedList<>();
+  }
 
-    List<RabbitDocumentField> getFields() {
-        return fields;
-    }
+  List<RabbitDocumentField> getFields() {
+    return fields;
+  }
 
-    void setDocumentBoost(float documentBoost) {
-        this.documentBoost = documentBoost;
-    }
+  void setDocumentBoost(float documentBoost) {
+    this.documentBoost = documentBoost;
+  }
 
-    void addField(RabbitDocumentField field) {
-        fields.add(field);
+  void addField(RabbitDocumentField field) {
+    fields.add(field);
+  }
+
+  byte[] getBytes() {
+    Gson gson = new Gson();
+    return gson.toJson(this).getBytes();
+  }
+
+  static class RabbitDocumentField {
+    private String key;
+    private float weight;
+    private List<Object> values;
+
+    RabbitDocumentField(String key, float weight, List<Object> values) {
+      this.key = key;
+      this.weight = weight;
+      this.values = values;
     }
 
-    static class RabbitDocumentField {
-        private String key;
-        private float weight;
-        private List<Object> values;
+    public String getKey() {
+      return key;
+    }
 
-        RabbitDocumentField(String key, float weight, List<Object> values) {
-            this.key = key;
-            this.weight = weight;
-            this.values = values;
-        }
+    public List<Object> getValues() {
+      return values;
     }
+  }
 }
diff --git a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java
index 66a7bdc96..301d7e9dc 100644
--- a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java
+++ b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitIndexWriter.java
@@ -18,9 +18,6 @@
 
 import java.io.IOException;
 
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.indexer.IndexWriterParams;
 import org.apache.nutch.indexer.NutchDocument;
@@ -28,40 +25,34 @@
 import org.apache.nutch.indexer.IndexWriter;
 
 import org.apache.nutch.indexer.NutchField;
+import org.apache.nutch.rabbitmq.RabbitMQClient;
+import org.apache.nutch.rabbitmq.RabbitMQMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeoutException;
 
 public class RabbitIndexWriter implements IndexWriter {
 
-  private String serverHost;
-  private int serverPort;
-  private String serverVirtualHost;
-  private String serverUsername;
-  private String serverPassword;
+  public static final Logger LOG = LoggerFactory
+      .getLogger(RabbitIndexWriter.class);
 
-  private String exchangeServer;
-  private String exchangeType;
-
-  private String queueName;
-  private boolean queueDurable;
-  private String queueRoutingKey;
+  private String exchange;
+  private String routingKey;
 
   private int commitSize;
+  private String commitMode;
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(MethodHandles.lookup().lookupClass());
+  private String headersStatic;
+  private List<String> headersDynamic;
 
   private Configuration config;
 
   private RabbitMessage rabbitMessage = new RabbitMessage();
 
-  private Channel channel;
-  private Connection connection;
+  private RabbitMQClient client;
 
   @Override
   public Configuration getConf() {
@@ -86,51 +77,39 @@ public void open(Configuration conf, String name) throws IOException {
    */
   @Override
   public void open(IndexWriterParams parameters) throws IOException {
-    serverHost = parameters.get(RabbitMQConstants.SERVER_HOST, "localhost");
-    serverPort = parameters.getInt(RabbitMQConstants.SERVER_PORT, 5672);
-    serverVirtualHost = parameters
-        .get(RabbitMQConstants.SERVER_VIRTUAL_HOST, null);
-
-    serverUsername = parameters.get(RabbitMQConstants.SERVER_USERNAME, "admin");
-    serverPassword = parameters.get(RabbitMQConstants.SERVER_PASSWORD, "admin");
-
-    exchangeServer = parameters
-        .get(RabbitMQConstants.EXCHANGE_SERVER, "nutch.exchange");
-    exchangeType = parameters.get(RabbitMQConstants.EXCHANGE_TYPE, "direct");
-
-    queueName = parameters.get(RabbitMQConstants.QUEUE_NAME, "nutch.queue");
-    queueDurable = parameters.getBoolean(RabbitMQConstants.QUEUE_DURABLE, true);
-    queueRoutingKey = parameters
-        .get(RabbitMQConstants.QUEUE_ROUTING_KEY, "nutch.key");
+    exchange = parameters.get(RabbitMQConstants.EXCHANGE_NAME);
+    routingKey = parameters.get(RabbitMQConstants.ROUTING_KEY);
 
     commitSize = parameters.getInt(RabbitMQConstants.COMMIT_SIZE, 250);
+    commitMode = parameters.get(RabbitMQConstants.COMMIT_MODE, "multiple");
 
-    ConnectionFactory factory = new ConnectionFactory();
-    factory.setHost(serverHost);
-    factory.setPort(serverPort);
+    headersStatic = parameters.get(RabbitMQConstants.HEADERS_STATIC, "");
+    headersDynamic = Arrays
+        .asList(parameters.getStrings(RabbitMQConstants.HEADERS_DYNAMIC, ""));
 
-    if(serverVirtualHost != null) {
-      factory.setVirtualHost(serverVirtualHost);
-    }
+    String uri = parameters.get(RabbitMQConstants.SERVER_URI);
 
-    factory.setUsername(serverUsername);
-    factory.setPassword(serverPassword);
+    client = new RabbitMQClient(uri);
+    client.openChannel();
 
-    try {
-      connection = factory.newConnection(UUID.randomUUID().toString());
-      channel = connection.createChannel();
+    boolean binding = parameters.getBoolean(RabbitMQConstants.BINDING, false);
+    if (binding) {
+      String queueName = parameters.get(RabbitMQConstants.QUEUE_NAME);
+      String queueOptions = parameters.get(RabbitMQConstants.QUEUE_OPTIONS);
 
-      channel.exchangeDeclare(exchangeServer, exchangeType, true);
-      channel.queueDeclare(queueName, queueDurable, false, false, null);
-      channel.queueBind(queueName, exchangeServer, queueRoutingKey);
+      String exchangeOptions = parameters.get(RabbitMQConstants.EXCHANGE_OPTIONS);
 
-    } catch (TimeoutException | IOException ex) {
-      throw makeIOException(ex);
+      String bindingArguments = parameters
+          .get(RabbitMQConstants.BINDING_ARGUMENTS, "");
+
+      client
+          .bind(exchange, exchangeOptions, queueName, queueOptions, routingKey,
+              bindingArguments);
     }
   }
 
   @Override
-  public void update(NutchDocument doc) throws IOException {
+  public void write(NutchDocument doc) throws IOException {
     RabbitDocument rabbitDocument = new RabbitDocument();
 
     for (final Map.Entry<String, NutchField> e : doc) {
@@ -140,23 +119,24 @@ public void update(NutchDocument doc) throws IOException {
     }
     rabbitDocument.setDocumentBoost(doc.getWeight());
 
-    rabbitMessage.addDocToUpdate(rabbitDocument);
-    if(rabbitMessage.size() >= commitSize) {
+    rabbitMessage.addDocToWrite(rabbitDocument);
+
+    if (rabbitMessage.size() >= commitSize) {
       commit();
     }
   }
 
   @Override
-  public void commit() throws IOException {
-    if(!rabbitMessage.isEmpty()) {
-      channel.basicPublish(exchangeServer, queueRoutingKey, null,
-          rabbitMessage.getBytes());
+  public void delete(String url) throws IOException {
+    rabbitMessage.addDocToDelete(url);
+
+    if (rabbitMessage.size() >= commitSize) {
+      commit();
     }
-    rabbitMessage.clear();
   }
 
   @Override
-  public void write(NutchDocument doc) throws IOException {
+  public void update(NutchDocument doc) throws IOException {
     RabbitDocument rabbitDocument = new RabbitDocument();
 
     for (final Map.Entry<String, NutchField> e : doc) {
@@ -166,55 +146,98 @@ public void write(NutchDocument doc) throws IOException {
     }
     rabbitDocument.setDocumentBoost(doc.getWeight());
 
-    rabbitMessage.addDocToWrite(rabbitDocument);
-
-    if(rabbitMessage.size() >= commitSize) {
+    rabbitMessage.addDocToUpdate(rabbitDocument);
+    if (rabbitMessage.size() >= commitSize) {
       commit();
     }
   }
 
   @Override
-  public void close() throws IOException {
-    commit();//TODO: This is because indexing job never call commit method. It should be fixed.
-    try {
-      if(channel.isOpen()) {
-        channel.close();
-      }
-      if(connection.isOpen()) {
-        connection.close();
+  public void commit() throws IOException {
+    if (!rabbitMessage.isEmpty()) {
+
+      if ("single".equals(commitMode)) {
+        // The messages to delete
+        for (String s : rabbitMessage.getDocsToDelete()) {
+          RabbitMQMessage message = new RabbitMQMessage();
+          message.setBody(s.getBytes());
+          message.setHeaders(headersStatic);
+          message.addHeader("action", "delete");
+          client.publish(exchange, routingKey, message);
+        }
+
+        // The messages to update
+        for (RabbitDocument rabbitDocument : rabbitMessage.getDocsToUpdate()) {
+          RabbitMQMessage message = new RabbitMQMessage();
+          message.setBody(rabbitDocument.getBytes());
+          addHeaders(message, rabbitDocument);
+          message.addHeader("action", "update");
+          client.publish(exchange, routingKey, message);
+        }
+
+        // The messages to write
+        for (RabbitDocument rabbitDocument : rabbitMessage.getDocsToWrite()) {
+          RabbitMQMessage message = new RabbitMQMessage();
+          message.setBody(rabbitDocument.getBytes());
+          addHeaders(message, rabbitDocument);
+          message.addHeader("action", "write");
+          client.publish(exchange, routingKey, message);
+        }
+      } else {
+        RabbitMQMessage message = new RabbitMQMessage();
+        message.setBody(rabbitMessage.getBytes());
+        message.setHeaders(headersStatic);
+        client.publish(exchange, routingKey, message);
       }
-    } catch (IOException | TimeoutException e) {
-      throw makeIOException(e);
     }
+    rabbitMessage.clear();
   }
 
   @Override
-  public void delete(String url) throws IOException {
-    rabbitMessage.addDocToDelete(url);
-
-    if(rabbitMessage.size() >= commitSize) {
-      commit();
-    }
-  }
-
-  private static IOException makeIOException(Exception e) {
-    return new IOException(e);
+  public void close() throws IOException {
+    commit(); //TODO: This is because indexing job never call commit method. It should be fixed.
+    client.close();
   }
 
   public String describe() {
-    StringBuilder sb = new StringBuilder("RabbitIndexWriter\n");
-    sb.append("\t").append(RabbitMQConstants.SERVER_HOST)
-        .append(" : Host of RabbitMQ server\n");
-    sb.append("\t").append(RabbitMQConstants.SERVER_PORT)
-        .append(" : Port of RabbitMQ server\n");
-    sb.append("\t").append(RabbitMQConstants.SERVER_VIRTUAL_HOST)
-        .append(" : Virtualhost name\n");
-    sb.append("\t").append(RabbitMQConstants.SERVER_USERNAME)
-        .append(" : Username for authentication\n");
-    sb.append("\t").append(RabbitMQConstants.SERVER_PASSWORD)
-        .append(" : Password for authentication\n");
+    StringBuffer sb = new StringBuffer("RabbitIndexWriter\n");
+    sb.append("\t").append(RabbitMQConstants.SERVER_URI)
+        .append(" : URI of RabbitMQ server\n");
+    sb.append("\t").append(RabbitMQConstants.BINDING).append(
+        " : If binding is created automatically or not (default true)\n");
+    sb.append("\t").append(RabbitMQConstants.BINDING_ARGUMENTS)
+        .append(" : Arguments used in binding\n");
+    sb.append("\t").append(RabbitMQConstants.EXCHANGE_NAME)
+        .append(" : Exchange's name\n");
+    sb.append("\t").append(RabbitMQConstants.EXCHANGE_OPTIONS)
+        .append(" : Exchange's options\n");
+    sb.append("\t").append(RabbitMQConstants.QUEUE_NAME)
+        .append(" : Queue's name\n");
+    sb.append("\t").append(RabbitMQConstants.QUEUE_OPTIONS)
+        .append(" : Queue's options\n");
+    sb.append("\t").append(RabbitMQConstants.ROUTING_KEY)
+        .append(" : Routing key\n");
     sb.append("\t").append(RabbitMQConstants.COMMIT_SIZE)
         .append(" : Buffer size when sending to RabbitMQ (default 250)\n");
+    sb.append("\t").append(RabbitMQConstants.COMMIT_MODE)
+        .append(" : The mode to send the documents (default multiple)\n");
+    sb.append("\t").append(RabbitMQConstants.HEADERS_STATIC)
+        .append(" : Static headers that will be added to the messages\n");
+    sb.append("\t").append(RabbitMQConstants.HEADERS_DYNAMIC)
+        .append(" : Document's fields added as headers\n");
     return sb.toString();
   }
+
+  private void addHeaders(final RabbitMQMessage message,
+      RabbitDocument document) {
+    message.setHeaders(headersStatic);
+
+    for (RabbitDocument.RabbitDocumentField rabbitDocumentField : document
+        .getFields()) {
+      if (headersDynamic.contains(rabbitDocumentField.getKey())) {
+        message.addHeader(rabbitDocumentField.getKey(),
+            rabbitDocumentField.getValues().get(0));
+      }
+    }
+  }
 }
diff --git a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitMQConstants.java b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitMQConstants.java
index fa671e819..f1edf4449 100644
--- a/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitMQConstants.java
+++ b/src/plugin/indexer-rabbit/src/java/org/apache/nutch/indexwriter/rabbit/RabbitMQConstants.java
@@ -18,25 +18,30 @@
 
 interface RabbitMQConstants {
 
-    String SERVER_HOST = "server.host";
+  String SERVER_URI = "server.uri";
 
-    String SERVER_PORT = "server.port";
+  String EXCHANGE_NAME = "exchange.name";
 
-    String SERVER_VIRTUAL_HOST = "server.virtualhost";
+  String EXCHANGE_OPTIONS = "exchange.options";
 
-    String SERVER_USERNAME = "server.username";
+  String QUEUE_NAME = "queue.name";
 
-    String SERVER_PASSWORD = "server.password";
+  String QUEUE_OPTIONS = "queue.options";
 
-    String EXCHANGE_SERVER = "exchange.server";
+  String ROUTING_KEY = "routingkey";
 
-    String EXCHANGE_TYPE = "exchange.type";
 
-    String QUEUE_NAME = "queue.name";
+  String BINDING = "binding";
 
-    String QUEUE_DURABLE = "queue.durable";
+  String BINDING_ARGUMENTS = "binding.arguments";
 
-    String QUEUE_ROUTING_KEY = "queue.routingkey";
 
-    String COMMIT_SIZE = "commit.size";
+  String COMMIT_SIZE = "commit.size";
+
+  String COMMIT_MODE = "commit.mode";
+
+
+  String HEADERS_STATIC = "headers.static";
+
+  String HEADERS_DYNAMIC = "headers.dynamic";
 }
diff --git a/src/plugin/lib-rabbitmq/build-ivy.xml b/src/plugin/lib-rabbitmq/build-ivy.xml
new file mode 100644
index 000000000..eb665e92b
--- /dev/null
+++ b/src/plugin/lib-rabbitmq/build-ivy.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="lib-rabbitmq" default="deps-jar" xmlns:ivy="antlib:org.apache.ivy.ant">
+
+    <property name="ivy.install.version" value="2.1.0" />
+    <condition property="ivy.home" value="${env.IVY_HOME}">
+      <isset property="env.IVY_HOME" />
+    </condition>
+    <property name="ivy.home" value="${user.home}/.ant" />
+    <property name="ivy.checksums" value="" />
+    <property name="ivy.jar.dir" value="${ivy.home}/lib" />
+    <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy.jar" />
+
+    <target name="download-ivy" unless="offline">
+
+        <mkdir dir="${ivy.jar.dir}"/>
+        <!-- download Ivy from web site so that it can be used even without any special installation -->
+        <get src="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/ivy-${ivy.install.version}.jar" 
+             dest="${ivy.jar.file}" usetimestamp="true"/>
+    </target>
+
+    <target name="init-ivy" depends="download-ivy">
+      <!-- try to load ivy here from ivy home, in case the user has not already dropped
+              it into ant's lib dir (note that the latter copy will always take precedence).
+              We will not fail as long as local lib dir exists (it may be empty) and
+              ivy is in at least one of ant's lib dir or the local lib dir. -->
+        <path id="ivy.lib.path">
+            <fileset dir="${ivy.jar.dir}" includes="*.jar"/>
+
+        </path>
+        <taskdef resource="org/apache/ivy/ant/antlib.xml"
+                 uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
+    </target>
+
+  <target name="deps-jar" depends="init-ivy">
+    <ivy:retrieve pattern="lib/[artifact]-[revision].[ext]" sync="true"/>
+  </target>
+
+</project>
diff --git a/src/plugin/lib-rabbitmq/build.xml b/src/plugin/lib-rabbitmq/build.xml
new file mode 100644
index 000000000..24760bba1
--- /dev/null
+++ b/src/plugin/lib-rabbitmq/build.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="lib-rabbitmq" default="jar-core">
+
+  <import file="../build-plugin.xml"/>
+
+</project>
diff --git a/src/plugin/lib-rabbitmq/ivy.xml b/src/plugin/lib-rabbitmq/ivy.xml
new file mode 100644
index 000000000..6e89be8e6
--- /dev/null
+++ b/src/plugin/lib-rabbitmq/ivy.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
+    <description>
+        Apache Nutch
+    </description>
+  </info>
+
+  <configurations>
+    <include file="../../../ivy/ivy-configurations.xml"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+
+  <dependencies>
+    <dependency org="com.rabbitmq" name="amqp-client" rev="5.2.0"/>
+    <dependency org="com.google.code.gson" name="gson" rev="2.8.4"/>
+  </dependencies>
+  
+</ivy-module>
diff --git a/src/plugin/lib-rabbitmq/plugin.xml b/src/plugin/lib-rabbitmq/plugin.xml
new file mode 100644
index 000000000..846315c81
--- /dev/null
+++ b/src/plugin/lib-rabbitmq/plugin.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!--
+ ! A common framework for http protocol implementations
+ !-->
+<plugin id="lib-rabbitmq" name="RabbitMQ" version="1.0" provider-name="org.apache.nutch">
+
+  <runtime>
+    <library name="lib-rabbitmq.jar">
+      <export name="*" />
+    </library>
+    <library name="amqp-client-5.2.0.jar">
+      <export name="*"/>
+    </library>
+    <library name="gson-2.8.4.jar">
+      <export name="*"/>
+    </library>
+  </runtime>
+
+</plugin>
diff --git a/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQClient.java b/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQClient.java
new file mode 100644
index 000000000..909615831
--- /dev/null
+++ b/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQClient.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.rabbitmq;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Client for RabbitMQ
+ */
+public class RabbitMQClient {
+
+  private final static String DEFAULT_EXCHANGE_NAME = "";
+  private final static String DEFAULT_EXCHANGE_TYPE = "direct";
+  private final static String DEFAULT_EXCHANGE_DURABLE = "true";
+
+  private final static String DEFAULT_QUEUE_NAME = "nutch.queue";
+  private final static String DEFAULT_QUEUE_DURABLE = "true";
+  private final static String DEFAULT_QUEUE_EXCLUSIVE = "false";
+  private final static String DEFAULT_QUEUE_AUTO_DELETE = "false";
+  private final static String DEFAULT_QUEUE_ARGUMENTS = "";
+
+  private final static String DEFAULT_ROUTING_KEY = DEFAULT_QUEUE_NAME;
+
+  private Connection connection;
+  private Channel channel;
+
+  /**
+   * Builds a new instance of {@link RabbitMQClient}
+   *
+   * @param serverHost        The server host.
+   * @param serverPort        The server port.
+   * @param serverVirtualHost The virtual host into the RabbitMQ server.
+   * @param serverUsername    The username to access the server.
+   * @param serverPassword    The password to access the server.
+   * @throws IOException It is thrown if there is some issue during the connection creation.
+   */
+  public RabbitMQClient(String serverHost, int serverPort,
+      String serverVirtualHost, String serverUsername, String serverPassword)
+      throws IOException {
+    ConnectionFactory factory = new ConnectionFactory();
+    factory.setHost(getValue(serverHost, "localhost"));
+    factory.setPort(getValue(serverPort, 5672));
+
+    factory.setVirtualHost(getValue(serverVirtualHost, "/"));
+
+    factory.setUsername(getValue(serverUsername, "guest"));
+    factory.setPassword(getValue(serverPassword, "guest"));
+
+    try {
+      connection = factory.newConnection();
+    } catch (TimeoutException e) {
+      throw makeIOException(e);
+    }
+  }
+
+  /**
+   * Builds a new instance of {@link RabbitMQClient}
+   *
+   * @param uri The connection parameters in the form amqp://userName:password@hostName:portNumber/virtualHost
+   * @throws IOException It is thrown if there is some issue during the connection creation.
+   */
+  public RabbitMQClient(String uri) throws IOException {
+    ConnectionFactory factory = new ConnectionFactory();
+
+    try {
+      factory.setUri(uri);
+
+      connection = factory.newConnection();
+    } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException | TimeoutException e) {
+      throw makeIOException(e);
+    }
+  }
+
+  /**
+   * Opens a new channel into the opened connection.
+   *
+   * @throws IOException It is thrown if there is some issue during the channel creation.
+   */
+  public void openChannel() throws IOException {
+    channel = connection.createChannel();
+  }
+
+  /**
+   * Creates a relationship between an exchange and a queue.
+   *
+   * @param exchangeName     The exchange's name.
+   * @param exchangeOptions  Options used when the exchange is created.
+   *                         <br />
+   *                         It must have the form type={type},durable={durable} where:
+   *                         <ul>
+   *                         <li>{type} is fanout, direct, headers or topic</li>
+   *                         <li>{durable} is true or false</li>
+   *                         </ul>
+   * @param queueName        The queue's name.
+   * @param queueOptions     Options used when the queue is created.
+   *                         <br />
+   *                         It must have the form durable={type},exclusive={durable},auto-delete={durable},arguments={durable} where:
+   *                         <ul>
+   *                         <li>durable is true or false</li>
+   *                         <li>exclusive is true or false</li>
+   *                         <li>auto-delete is true or false</li>
+   *                         <li>arguments must have the for {key1:value1;key2:value2}</li>
+   *                         </ul>
+   * @param bindingKey       The routine key to use for the binding.
+   * @param bindingArguments This parameter is only used when the exchange's type is headers. In other cases is ignored.
+   *                         <br />
+   *                         It must have the form key1=value1,key2=value2
+   * @throws IOException If there is some issue creating the relationship.
+   */
+  public void bind(String exchangeName, String exchangeOptions,
+      String queueName, String queueOptions, String bindingKey,
+      String bindingArguments) throws IOException {
+    String exchangeType = exchangeDeclare(exchangeName, exchangeOptions);
+    queueDeclare(queueName, queueOptions);
+
+    switch (exchangeType) {
+    case "fanout":
+      channel.queueBind(queueName, exchangeName, "");
+      break;
+    case "direct":
+      channel.queueBind(queueName, exchangeName,
+          getValue(bindingKey, DEFAULT_ROUTING_KEY));
+      break;
+    case "headers":
+      channel.queueBind(queueName, exchangeName, "",
+          RabbitMQOptionParser.parseOptionAndConvertValue(bindingArguments));
+      break;
+    case "topic":
+      channel.queueBind(queueName, exchangeName,
+          getValue(bindingKey, DEFAULT_ROUTING_KEY));
+      break;
+    default:
+      break;
+    }
+  }
+
+  /**
+   * Publishes a new message over an exchange.
+   *
+   * @param exchangeName The exchange's name where the message will be published.
+   * @param routingKey   The routing key used to route the message in the exchange.
+   * @param message      The message itself.
+   * @throws IOException If there is some issue publishing the message.
+   */
+  public void publish(String exchangeName, String routingKey,
+      RabbitMQMessage message) throws IOException {
+    channel.basicPublish(getValue(exchangeName, DEFAULT_EXCHANGE_NAME),
+        getValue(routingKey, DEFAULT_ROUTING_KEY),
+        new AMQP.BasicProperties.Builder().contentType(message.getContentType())
+            .headers(message.getHeaders()).build(), message.getBody());
+  }
+
+  /**
+   * Closes the channel and the connection with the server.
+   *
+   * @throws IOException If there is some issue trying to close the channel or connection.
+   */
+  public void close() throws IOException {
+    try {
+      channel.close();
+      connection.close();
+    } catch (TimeoutException e) {
+      throw makeIOException(e);
+    }
+  }
+
+  /**
+   * Creates a new exchange into the server with the given name and options.
+   *
+   * @param name    The exchange's name.
+   * @param options Options used when the exchange is created.
+   *                <br />
+   *                It must have the form type={type},durable={durable} where:
+   *                <ul>
+   *                <li>{type} is fanout, direct, headers or topic</li>
+   *                <li>{durable} is true or false</li>
+   *                </ul>
+   * @return The exchange's type.
+   * @throws IOException If there is some issue creating the exchange.
+   */
+  private String exchangeDeclare(String name, String options)
+      throws IOException {
+    Map<String, String> values = RabbitMQOptionParser.parseOption(options);
+
+    String type = values.getOrDefault("type", DEFAULT_EXCHANGE_TYPE);
+
+    channel.exchangeDeclare(getValue(name, DEFAULT_EXCHANGE_NAME), type, Boolean
+        .parseBoolean(
+            values.getOrDefault("durable", DEFAULT_EXCHANGE_DURABLE)));
+
+    return type;
+  }
+
+  /**
+   * Creates a queue into the server with the given name and options.
+   *
+   * @param name    The queue's name.
+   * @param options Options used when the queue is created.
+   *                <br />
+   *                It must have the form durable={durable},exclusive={exclusive},auto-delete={auto-delete},arguments={arguments} where:
+   *                <ul>
+   *                <li>durable is true or false</li>
+   *                <li>exclusive is true or false</li>
+   *                <li>auto-delete is true or false</li>
+   *                <li>arguments must have the for {key1:value1;key2:value2}</li>
+   *                </ul>
+   * @throws IOException If there is some issue creating the queue.
+   */
+  private void queueDeclare(String name, String options) throws IOException {
+    Map<String, String> values = RabbitMQOptionParser.parseOption(options);
+
+    channel.queueDeclare(getValue(name, DEFAULT_QUEUE_NAME), Boolean
+            .parseBoolean(values.getOrDefault("durable", DEFAULT_QUEUE_DURABLE)),
+        Boolean.parseBoolean(
+            values.getOrDefault("exclusive", DEFAULT_QUEUE_EXCLUSIVE)), Boolean
+            .parseBoolean(
+                values.getOrDefault("auto-delete", DEFAULT_QUEUE_AUTO_DELETE)),
+        RabbitMQOptionParser.parseSubOption(
+            values.getOrDefault("arguments", DEFAULT_QUEUE_ARGUMENTS)));
+  }
+
+  private static String getValue(String value, String defaultValue) {
+    if (value == null || value.trim().isEmpty()) {
+      return defaultValue;
+    }
+    return value;
+  }
+
+  private static Integer getValue(Integer value, Integer defaultValue) {
+    if (value == null) {
+      return defaultValue;
+    }
+    return value;
+  }
+
+  private static IOException makeIOException(Exception e) {
+    return new IOException(e);
+  }
+}
diff --git a/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQMessage.java b/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQMessage.java
new file mode 100644
index 000000000..8e2a8b648
--- /dev/null
+++ b/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.rabbitmq;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RabbitMQMessage {
+
+  private Map<String, Object> headers = new HashMap<>();
+  private byte[] body;
+
+  private String contentType = "application/json";
+
+  public Map<String, Object> getHeaders() {
+    return headers;
+  }
+
+  public void setHeaders(final Map<String, Object> headers) {
+    this.headers = headers;
+  }
+
+  public void setHeaders(final String headers) {
+    this.headers = RabbitMQOptionParser.parseOptionAndConvertValue(headers);
+  }
+
+  public void addHeader(final String key, final Object value) {
+    this.headers.put(key, value);
+  }
+
+  public byte[] getBody() {
+    return body;
+  }
+
+  public void setBody(final byte[] body) {
+    this.body = body;
+  }
+
+  public String getContentType() {
+    return contentType;
+  }
+
+  public void setContentType(final String contentType) {
+    this.contentType = contentType;
+  }
+}
diff --git a/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQOptionParser.java b/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQOptionParser.java
new file mode 100644
index 000000000..32d6676c0
--- /dev/null
+++ b/src/plugin/lib-rabbitmq/src/java/org/apache/nutch/rabbitmq/RabbitMQOptionParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.rabbitmq;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class RabbitMQOptionParser {
+
+  static Map<String, String> parseOption(final String option) {
+    Map<String, String> values = new HashMap<>();
+
+    if (option.isEmpty()) {
+      return values;
+    }
+
+    String[] split = option.split(",");
+    for (String s : split) {
+      String[] ss = s.split("=");
+      values.put(ss[0], ss[1]);
+    }
+
+    return values;
+  }
+
+  static Map<String, Object> parseOptionAndConvertValue(final String option) {
+    Map<String, Object> values = new HashMap<>();
+
+    if (option.isEmpty()) {
+      return values;
+    }
+
+    String[] split = option.split(",");
+    for (String s : split) {
+      String[] ss = s.split("=");
+      values.put(ss[0], convert(ss[1]));
+    }
+
+    return values;
+  }
+
+  static Map<String, Object> parseSubOption(final String subOption) {
+    Map<String, Object> values = new HashMap<>();
+
+    if (subOption.isEmpty()) {
+      return values;
+    }
+
+    String[] split = subOption.replaceAll("\\{|}", "").split(";");
+    for (String s : split) {
+      String[] ss = s.split(":");
+      values.put(ss[0], convert(ss[1]));
+    }
+
+    return values;
+  }
+
+  private static Object convert(String s) {
+    try {
+      return Integer.parseInt(s);
+    } catch (Exception ex) {
+      // Do nothing
+    }
+
+    try {
+      return Float.parseFloat(s);
+    } catch (Exception ex) {
+      // Do nothing
+    }
+
+    if (s.equalsIgnoreCase("true") || s.equalsIgnoreCase("false")) {
+      return Boolean.parseBoolean(s);
+    }
+
+    return s;
+  }
+}
diff --git a/src/plugin/publish-rabbitmq/build.xml b/src/plugin/publish-rabbitmq/build.xml
index 9b48aa036..3972610f9 100644
--- a/src/plugin/publish-rabbitmq/build.xml
+++ b/src/plugin/publish-rabbitmq/build.xml
@@ -19,9 +19,16 @@
 
   <import file="../build-plugin.xml"/>
 
-  <!-- Deploy Unit test dependencies -->
-  <target name="deps-test">
-    <ant target="deploy" inheritall="false" dir="../nutch-extensionpoints"/>
+  <!-- Build compilation dependencies -->
+  <target name="deps-jar">
+    <ant target="jar" inheritall="false" dir="../lib-rabbitmq"/>
   </target>
 
+  <!-- Add compilation dependencies to classpath -->
+  <path id="plugin.deps">
+    <fileset dir="${nutch.root}/build">
+      <include name="**/lib-rabbitmq/*.jar" />
+    </fileset>
+  </path>
+
 </project>
diff --git a/src/plugin/publish-rabbitmq/ivy.xml b/src/plugin/publish-rabbitmq/ivy.xml
index 589786f3c..b88ccd6d4 100644
--- a/src/plugin/publish-rabbitmq/ivy.xml
+++ b/src/plugin/publish-rabbitmq/ivy.xml
@@ -27,16 +27,12 @@
   </info>
 
   <configurations>
-    <include file="../../..//ivy/ivy-configurations.xml"/>
+    <include file="../../../ivy/ivy-configurations.xml"/>
   </configurations>
 
   <publications>
     <!--get the artifact from our module name-->
     <artifact conf="master"/>
   </publications>
-
-  <dependencies>
-    <dependency org="com.rabbitmq" name="amqp-client" rev="3.6.5" conf="*->default" />
-  </dependencies>
   
 </ivy-module>
diff --git a/src/plugin/publish-rabbitmq/plugin.xml b/src/plugin/publish-rabbitmq/plugin.xml
index 2c6521929..d78473cc0 100644
--- a/src/plugin/publish-rabbitmq/plugin.xml
+++ b/src/plugin/publish-rabbitmq/plugin.xml
@@ -25,11 +25,11 @@
       <library name="publish-rabbitmq.jar">
          <export name="*"/>
       </library>
-      <library name="amqp-client-3.6.5.jar"/>
    </runtime>
 
    <requires>
       <import plugin="nutch-extensionpoints"/>
+      <import plugin="lib-rabbitmq"/>
    </requires>
    
    <extension id="org.apache.nutch.publisher.rabbitmq"
diff --git a/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQConstants.java b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQConstants.java
new file mode 100644
index 000000000..4e6242d97
--- /dev/null
+++ b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.publisher.rabbitmq;
+
+interface RabbitMQConstants {
+  String RABBIT_PREFIX = "rabbitmq.publisher.";
+
+  String SERVER_URI = RABBIT_PREFIX + "server.uri";
+
+  String EXCHANGE_NAME = RABBIT_PREFIX + "exchange.name";
+
+  String EXCHANGE_OPTIONS = RABBIT_PREFIX + "exchange.options";
+
+  String QUEUE_NAME = RABBIT_PREFIX + "queue.name";
+
+  String QUEUE_OPTIONS = RABBIT_PREFIX + "queue.options";
+
+  String ROUTING_KEY = RABBIT_PREFIX + "routingkey";
+
+
+  String BINDING = RABBIT_PREFIX + "binding";
+
+  String BINDING_ARGUMENTS = RABBIT_PREFIX + "binding.arguments";
+
+
+  String HEADERS_STATIC = RABBIT_PREFIX + "headers.static";
+}
diff --git a/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java
index dc2ca32f9..e71274164 100644
--- a/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java
+++ b/src/plugin/publish-rabbitmq/src/java/org/apache/nutch/publisher/rabbitmq/RabbitMQPublisherImpl.java
@@ -22,86 +22,71 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.nutch.publisher.NutchPublisher;
+import org.apache.nutch.rabbitmq.RabbitMQClient;
+import org.apache.nutch.rabbitmq.RabbitMQMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
-public class RabbitMQPublisherImpl implements NutchPublisher{
-
-  private static String EXCHANGE_SERVER;
-  private static String EXCHANGE_TYPE;
-
-  private static String HOST;
-    
-  private static int PORT;
-  private static String VIRTUAL_HOST;
-  private static String USERNAME;
-  private static String PASSWORD;
-
-  private static String QUEUE_NAME;
-  private static boolean QUEUE_DURABLE;
-  private static String QUEUE_ROUTING_KEY;
-  
+
+public class RabbitMQPublisherImpl implements NutchPublisher {
+
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
-  
-  private static Channel channel;
+
+  private String exchange;
+  private String routingKey;
+
+  private String headersStatic;
+
+  private RabbitMQClient client;
 
   @Override
   public boolean setConfig(Configuration conf) {
     try {
-      EXCHANGE_SERVER = conf.get("rabbitmq.exchange.server", "fetcher_log");
-      EXCHANGE_TYPE = conf.get("rabbitmq.exchange.type", "fanout");
+      exchange = conf.get(RabbitMQConstants.EXCHANGE_NAME);
+      routingKey = conf.get(RabbitMQConstants.ROUTING_KEY);
+      headersStatic = conf.get(RabbitMQConstants.HEADERS_STATIC, "");
 
-      HOST = conf.get("rabbitmq.host", "localhost");
-      PORT = conf.getInt("rabbitmq.port", 5672);
-      VIRTUAL_HOST = conf.get("rabbitmq.virtualhost", null);
-      USERNAME = conf.get("rabbitmq.username", null);
-      PASSWORD = conf.get("rabbitmq.password", null);
+      String uri = conf.get(RabbitMQConstants.SERVER_URI);
+      client = new RabbitMQClient(uri);
 
-      QUEUE_NAME = conf.get("rabbitmq.queue.name", "fanout.queue");
-      QUEUE_DURABLE = conf.getBoolean("rabbitmq.queue.durable", true);
-      QUEUE_ROUTING_KEY = conf.get("rabbitmq.queue.routingkey", "fanout.key");
+      client.openChannel();
 
-      ConnectionFactory factory = new ConnectionFactory();
-      factory.setHost(HOST);
-      factory.setPort(PORT);
+      boolean binding = conf.getBoolean(RabbitMQConstants.BINDING, false);
+      if (binding) {
+        String queueName = conf.get(RabbitMQConstants.QUEUE_NAME);
+        String queueOptions = conf.get(RabbitMQConstants.QUEUE_OPTIONS);
 
-      if(VIRTUAL_HOST != null) {
-        factory.setVirtualHost(VIRTUAL_HOST);
-      }
+        String exchangeOptions = conf.get(RabbitMQConstants.EXCHANGE_OPTIONS);
 
-      if(USERNAME != null) {
-        factory.setUsername(USERNAME);
-        factory.setPassword(PASSWORD);
+        String bindingArguments = conf
+            .get(RabbitMQConstants.BINDING_ARGUMENTS, "");
+
+        client.bind(exchange, exchangeOptions, queueName, queueOptions,
+            routingKey, bindingArguments);
       }
-    
-      Connection connection = factory.newConnection();
-      channel = connection.createChannel();
-      channel.exchangeDeclare(EXCHANGE_SERVER, EXCHANGE_TYPE);
-      channel.queueDeclare(QUEUE_NAME, QUEUE_DURABLE, false, false, null);
-      channel.queueBind(QUEUE_NAME, EXCHANGE_SERVER, QUEUE_ROUTING_KEY);
 
       LOG.info("Configured RabbitMQ publisher");
       return true;
-    }catch(Exception e) {
-      LOG.error("Could not initialize RabbitMQ publisher - {}", StringUtils.stringifyException(e));
+    } catch (Exception e) {
+      LOG.error("Could not initialize RabbitMQ publisher - {}",
+          StringUtils.stringifyException(e));
       return false;
     }
-
   }
 
   @Override
   public void publish(Object event, Configuration conf) {
     try {
-      channel.basicPublish(EXCHANGE_SERVER, QUEUE_ROUTING_KEY, null, getJSONString(event).getBytes());
+      RabbitMQMessage message = new RabbitMQMessage();
+      message.setBody(getJSONString(event).getBytes());
+      message.setHeaders(headersStatic);
+      client.publish(exchange, routingKey, message);
     } catch (Exception e) {
-      LOG.error("Error occured while publishing - {}", StringUtils.stringifyException(e));
+      LOG.error("Error occured while publishing - {}",
+          StringUtils.stringifyException(e));
     }
   }
 
@@ -110,15 +95,15 @@ private String getJSONString(Object obj) {
     try {
       return mapper.writeValueAsString(obj);
     } catch (JsonProcessingException e) {
-      LOG.error("Error converting event object to JSON String - {}", StringUtils.stringifyException(e));
+      LOG.error("Error converting event object to JSON String - {}",
+          StringUtils.stringifyException(e));
     }
     return null;
   }
 
-
   @Override
   public void setConf(Configuration arg0) {
-    
+
   }
 
   @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improvements for Rabbitmq support
> ---------------------------------
>
>                 Key: NUTCH-2580
>                 URL: https://issues.apache.org/jira/browse/NUTCH-2580
>             Project: Nutch
>          Issue Type: Improvement
>          Components: indexer, plugin
>    Affects Versions: 1.14
>            Reporter: Roannel Fernández Hernández
>            Priority: Minor
>             Fix For: 1.15
>
>
> This one includes:
>  # Creation of lib-rabbitmq for common functionalities (publish-rabbitmq and indexer-rabbit).
>  # Update of the RabbitMQ's library version.
>  # Headers selection from NutchDocument's fields (for indexer-rabbit).
>  # Optional binding.
>  # A single or multiple documents into each message.
>  # Options for the creation of exchange, queue and binding.
>  # Simplify the configuration options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message