nutch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NUTCH-2412) Exchange component for indexing job
Date Thu, 28 Jun 2018 07:48:00 GMT

    [ https://issues.apache.org/jira/browse/NUTCH-2412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16526049#comment-16526049
] 

ASF GitHub Bot commented on NUTCH-2412:
---------------------------------------

sebastian-nagel closed pull request #340: Fixes for NUTCH-2412 contributed by r0ann3l
URL: https://github.com/apache/nutch/pull/340
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.xml b/build.xml
index d4836a4f2..b112c5027 100644
--- a/build.xml
+++ b/build.xml
@@ -177,6 +177,7 @@
       <packageset dir="${plugins.dir}/creativecommons/src/java"/>
       <packageset dir="${plugins.dir}/feed/src/java"/>
       <packageset dir="${plugins.dir}/headings/src/java"/>
+      <packageset dir="${plugins.dir}/exchange-jexl/src/java"/>
       <packageset dir="${plugins.dir}/index-anchor/src/java"/>
       <packageset dir="${plugins.dir}/index-basic/src/java"/>
       <packageset dir="${plugins.dir}/index-geoip/src/java"/>
@@ -265,6 +266,7 @@
       <group title="Parse Plugins" packages="${plugins.parse}"/>
       <group title="Parse Filter Plugins" packages="${plugins.parsefilter}"/>
       <group title="Publisher Plugins" packages="${plugins.publisher}"/>
+      <group title="Exchange Plugins" packages="${plugins.exchange}"/>
       <group title="Indexing Filter Plugins" packages="${plugins.index}"/>
       <group title="Indexer Plugins" packages="${plugins.indexer}"/>
       <group title="Misc. Plugins" packages="${plugins.misc}"/>
@@ -636,6 +638,7 @@
       <packageset dir="${plugins.dir}/creativecommons/src/java"/>
       <packageset dir="${plugins.dir}/feed/src/java"/>
       <packageset dir="${plugins.dir}/headings/src/java"/>
+      <packageset dir="${plugins.dir}/exchange-jexl/src/java"/>
       <packageset dir="${plugins.dir}/index-anchor/src/java"/>
       <packageset dir="${plugins.dir}/index-basic/src/java"/>
       <packageset dir="${plugins.dir}/index-geoip/src/java"/>
@@ -724,6 +727,7 @@
       <group title="Parse Plugins" packages="${plugins.parse}"/>
       <group title="Parse Filter Plugins" packages="${plugins.parsefilter}"/>
       <group title="Publisher Plugins" packages="${plugins.publisher}"/>
+      <group title="Exchange Plugins" packages="${plugins.exchange}"/>
       <group title="Indexing Filter Plugins" packages="${plugins.index}"/>
       <group title="Indexer Plugins" packages="${plugins.indexer}"/>
       <group title="Misc. Plugins" packages="${plugins.misc}"/>
@@ -1049,6 +1053,7 @@
         <source path="${plugins.dir}/feed/src/java/" />
         <source path="${plugins.dir}/feed/src/test/" />
         <source path="${plugins.dir}/headings/src/java/" />
+        <source path="${plugins.dir}/exchange-jexl/src/java/" />
         <source path="${plugins.dir}/index-anchor/src/java/" />
         <source path="${plugins.dir}/index-anchor/src/test/" />
         <source path="${plugins.dir}/index-basic/src/java/" />
diff --git a/conf/exchanges.xml.template b/conf/exchanges.xml.template
new file mode 100644
index 000000000..ab2eb941a
--- /dev/null
+++ b/conf/exchanges.xml.template
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<exchanges xmlns="http://lucene.apache.org/nutch"
+           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+           xsi:schemaLocation="http://lucene.apache.org/nutch exchanges.xsd">
+  <!--
+  <exchange id="exchange_jexl_1" class="org.apache.nutch.exchange.jexl.JexlExchange">
+    <writers>
+      <writer id="indexer_solr_1" />
+    </writers>
+    <params>
+      <param name="expr" value="doc.getFieldValue('host')=='example.org'" />
+    </params>
+  </exchange>
+
+  <exchange id="default" class="default">
+    <writers>
+      <writer id="indexer_solr_1" />
+    </writers>
+    <params />
+  </exchange>
+  -->
+</exchanges>
\ No newline at end of file
diff --git a/conf/exchanges.xsd b/conf/exchanges.xsd
new file mode 100644
index 000000000..42caeb709
--- /dev/null
+++ b/conf/exchanges.xsd
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+           targetNamespace="http://lucene.apache.org/nutch"
+           xmlns="http://lucene.apache.org/nutch"
+           elementFormDefault="qualified">
+  <xs:element name="exchanges">
+    <xs:complexType>
+      <xs:sequence>
+        <xs:element name="exchange" type="exchangeType" maxOccurs="unbounded" minOccurs="0"/>
+      </xs:sequence>
+    </xs:complexType>
+  </xs:element>
+  <xs:complexType name="exchangeType">
+    <xs:sequence>
+      <xs:element type="writersType" name="writers" maxOccurs="1" minOccurs="1" />
+      <xs:element type="parametersType" name="params" maxOccurs="1" minOccurs="1" />
+    </xs:sequence>
+    <xs:attribute type="xs:string" name="class" use="required" />
+    <xs:attribute type="xs:ID" name="id" use="required" />
+  </xs:complexType>
+  <xs:complexType name="writersType">
+    <xs:sequence>
+      <xs:element type="writerType" name="writer" maxOccurs="unbounded" minOccurs="0"
/>
+    </xs:sequence>
+  </xs:complexType>
+  <xs:complexType name="writerType">
+    <xs:attribute type="xs:string" name="id" use="required" />
+  </xs:complexType>
+  <xs:complexType name="parametersType">
+    <xs:sequence>
+      <xs:element type="parameterType" name="param" maxOccurs="unbounded" minOccurs="0"
/>
+    </xs:sequence>
+  </xs:complexType>
+  <xs:complexType name="parameterType">
+    <xs:attribute type="xs:string" name="name" use="required" />
+    <xs:attribute type="xs:string" name="value" use="optional" />
+  </xs:complexType>
+</xs:schema>
\ No newline at end of file
diff --git a/conf/log4j.properties b/conf/log4j.properties
index 1939014dc..211dad5c6 100644
--- a/conf/log4j.properties
+++ b/conf/log4j.properties
@@ -47,6 +47,7 @@ log4j.logger.org.apache.nutch.indexer.IndexingJob=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexer.IndexerOutputFormat=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexwriter.solr.SolrIndexWriter=INFO,cmdstdout
 log4j.logger.org.apache.nutch.indexwriter.solr.SolrUtils=INFO,cmdstdout
+log4j.logger.org.apache.nutch.exchange.Exchanges=INFO,cmdstdout
 log4j.logger.org.apache.nutch.parse.ParserChecker=INFO,cmdstdout
 log4j.logger.org.apache.nutch.parse.ParseSegment=INFO,cmdstdout
 log4j.logger.org.apache.nutch.plugin.PluginRepository=WARN
diff --git a/default.properties b/default.properties
index f7be5f995..004a8c311 100644
--- a/default.properties
+++ b/default.properties
@@ -162,6 +162,12 @@ plugins.parsefilter=\
 plugins.publisher=\
     org.apache.nutch.publisher.rabbitmq*
 
+#
+# Exchange Plugins
+#
+plugins.exchange=\
+   org.apache.nutch.exchange.jexl*
+
 #
 # Indexing Filter Plugins
 #
diff --git a/src/java/org/apache/nutch/exchange/Exchange.java b/src/java/org/apache/nutch/exchange/Exchange.java
new file mode 100644
index 000000000..ef67e994f
--- /dev/null
+++ b/src/java/org/apache/nutch/exchange/Exchange.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.exchange;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.plugin.Pluggable;
+
+import java.util.Map;
+
+public interface Exchange extends Pluggable, Configurable {
+
+  /**
+   * The name of the extension point.
+   */
+  String X_POINT_ID = Exchange.class.getName();
+
+  /**
+   * Initializes the internal variables.
+   *
+   * @param parameters Params from the exchange configuration.
+   */
+  void open(Map<String, String> parameters);
+
+  /**
+   * Determines if the document must go to the related index writers.
+   *
+   * @param doc The given document.
+   * @return True if the given document match with this exchange. False in other case.
+   */
+  boolean match(NutchDocument doc);
+}
diff --git a/src/java/org/apache/nutch/exchange/ExchangeConfig.java b/src/java/org/apache/nutch/exchange/ExchangeConfig.java
new file mode 100644
index 000000000..6e37cfbb4
--- /dev/null
+++ b/src/java/org/apache/nutch/exchange/ExchangeConfig.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.exchange;
+
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ExchangeConfig {
+  private final String id;
+
+  private final String clazz;
+
+  private final String[] writersIDs;
+
+  private final Map<String, String> parameters;
+
+  private ExchangeConfig(String id, String clazz, String[] writersIDs,
+      Map<String, String> parameters) {
+    this.id = id;
+    this.clazz = clazz;
+    this.writersIDs = writersIDs;
+    this.parameters = parameters;
+  }
+
+  public static ExchangeConfig getInstance(Element element) {
+    String id = element.getAttribute("id");
+    String clazz = element.getAttribute("class");
+
+    //Getting the writers IDs
+    NodeList writerList = element.getElementsByTagName("writer");
+    String[] writers = new String[writerList.getLength()];
+    for (int i = 0; i < writerList.getLength(); i++) {
+      writers[i] = ((Element) writerList.item(i)).getAttribute("id");
+    }
+
+    //Getting params
+    NodeList paramList = element.getElementsByTagName("param");
+    Map<String, String> paramsMap = new HashMap<>();
+    for (int i = 0; i < paramList.getLength(); i++) {
+      Element param = (Element) paramList.item(i);
+      paramsMap.put(param.getAttribute("name"), param.getAttribute("value"));
+    }
+
+    return new ExchangeConfig(id, clazz, writers, paramsMap);
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getClazz() {
+    return clazz;
+  }
+
+  String[] getWritersIDs() {
+    return writersIDs;
+  }
+
+  public Map<String, String> getParameters() {
+    return parameters;
+  }
+}
diff --git a/src/java/org/apache/nutch/exchange/Exchanges.java b/src/java/org/apache/nutch/exchange/Exchanges.java
new file mode 100644
index 000000000..1f443d4c1
--- /dev/null
+++ b/src/java/org/apache/nutch/exchange/Exchanges.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.exchange;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.plugin.Extension;
+import org.apache.nutch.plugin.ExtensionPoint;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.plugin.PluginRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+
+public class Exchanges {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(MethodHandles.lookup().lookupClass());
+
+  private Map<String, ExchangeConfigRelation> exchanges;
+
+  private ExchangeConfig defaultExchangeConfig;
+
+  private boolean availableExchanges = true;
+
+  public Exchanges(Configuration conf) {
+    try {
+      ExtensionPoint point = PluginRepository.get(conf)
+          .getExtensionPoint(Exchange.X_POINT_ID);
+      if (point == null) {
+        throw new RuntimeException(Exchange.X_POINT_ID + " not found.");
+      }
+
+      HashMap<String, Extension> extensionMap = new HashMap<>();
+      for (Extension extension : point.getExtensions()) {
+        extensionMap.putIfAbsent(extension.getClazz(), extension);
+      }
+
+      exchanges = new HashMap<>();
+
+      ExchangeConfig[] exchangeConfigs = loadConfigurations(conf);
+
+      for (ExchangeConfig exchangeConfig : exchangeConfigs) {
+        final String clazz = exchangeConfig.getClazz();
+
+        // If was enabled in plugin.includes property
+        if (extensionMap.containsKey(clazz)) {
+          ExchangeConfigRelation exchangeConfigRelation = new ExchangeConfigRelation(
+              (Exchange) extensionMap.get(clazz).getExtensionInstance(),
+              exchangeConfig);
+          exchanges.put(exchangeConfig.getId(), exchangeConfigRelation);
+        }
+      }
+
+      if (exchanges.isEmpty() && defaultExchangeConfig == null) {
+        availableExchanges = false;
+        LOG.warn("No exchange was configured. The documents will be routed to all index writers.");
+      }
+    } catch (PluginRuntimeException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public boolean areAvailableExchanges() {
+    return availableExchanges;
+  }
+
+  /**
+   * Loads the configuration of each exchange.
+   *
+   * @param conf Nutch's configuration.
+   * @return An array with each exchange's configuration.
+   */
+  private ExchangeConfig[] loadConfigurations(Configuration conf) {
+    InputSource inputSource = new InputSource(
+        conf.getConfResourceAsInputStream("exchanges.xml"));
+
+    final List<ExchangeConfig> configList = new LinkedList<>();
+
+    try {
+      DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+      DocumentBuilder builder = factory.newDocumentBuilder();
+      Element rootElement = builder.parse(inputSource).getDocumentElement();
+      NodeList exchangeList = rootElement.getElementsByTagName("exchange");
+
+      for (int i = 0; i < exchangeList.getLength(); i++) {
+        Element element = (Element) exchangeList.item(i);
+        ExchangeConfig exchangeConfig = ExchangeConfig.getInstance(element);
+
+        if ("default".equals(exchangeConfig.getClazz())) {
+          this.defaultExchangeConfig = exchangeConfig;
+          continue;
+        }
+
+        configList.add(exchangeConfig);
+      }
+
+    } catch (SAXException | IOException | ParserConfigurationException e) {
+      LOG.warn(e.toString());
+    }
+
+    return configList.toArray(new ExchangeConfig[0]);
+  }
+
+  /**
+   * Opens each configured exchange.
+   */
+  public void open() {
+    exchanges.forEach(
+        (id, value) -> value.exchange.open(value.config.getParameters()));
+  }
+
+  /**
+   * Returns all the indexers where the document must be sent to.
+   *
+   * @param nutchDocument The document to process.
+   * @return Indexers.
+   */
+  public String[] indexWriters(final NutchDocument nutchDocument) {
+    final Set<String> writersIDs = new HashSet<>();
+
+    exchanges.forEach((id, value) -> {
+      if (value.exchange.match(nutchDocument)) {
+        writersIDs.addAll(Arrays.asList(value.config.getWritersIDs()));
+      }
+    });
+
+    // Using the default exchange if it's activated and there is not index writers for this
document yet.
+    if (defaultExchangeConfig != null && writersIDs.isEmpty()) {
+      return defaultExchangeConfig.getWritersIDs();
+    }
+
+    return writersIDs.toArray(new String[0]);
+  }
+
+  /**
+   * Wrapper for a single exchange and its configuration.
+   */
+  private class ExchangeConfigRelation {
+
+    private final Exchange exchange;
+
+    private final ExchangeConfig config;
+
+    ExchangeConfigRelation(Exchange exchange, ExchangeConfig config) {
+      this.exchange = exchange;
+      this.config = config;
+    }
+  }
+}
diff --git a/src/java/org/apache/nutch/exchange/package-info.java b/src/java/org/apache/nutch/exchange/package-info.java
new file mode 100644
index 000000000..c7829f053
--- /dev/null
+++ b/src/java/org/apache/nutch/exchange/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Control code for exchange component, which acts in indexing job and decides to
+ * which index writer a document should be routed, based on plugins behavior.
+ *
+ * @since 1.15
+ */
+package org.apache.nutch.exchange;
\ No newline at end of file
diff --git a/src/java/org/apache/nutch/indexer/IndexWriters.java b/src/java/org/apache/nutch/indexer/IndexWriters.java
index 085a01fa4..db37d62f1 100644
--- a/src/java/org/apache/nutch/indexer/IndexWriters.java
+++ b/src/java/org/apache/nutch/indexer/IndexWriters.java
@@ -17,6 +17,8 @@
 package org.apache.nutch.indexer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.exchange.Exchanges;
+import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.plugin.Extension;
 import org.apache.nutch.plugin.ExtensionPoint;
 import org.apache.nutch.plugin.PluginRepository;
@@ -36,10 +38,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
 
 /**
  * Creates and caches {@link IndexWriter} implementing plugins.
@@ -61,6 +60,8 @@ public static synchronized IndexWriters get(Configuration conf) {
 
   private HashMap<String, IndexWriterWrapper> indexWriters;
 
+  private Exchanges exchanges;
+
   private IndexWriters(Configuration conf) {
     //It's not cached yet
     if (this.indexWriters == null) {
@@ -80,8 +81,7 @@ private IndexWriters(Configuration conf) {
           extensionMap.putIfAbsent(extension.getClazz(), extension);
         }
 
-        IndexWriterConfig[] indexWriterConfigs = loadWritersConfiguration(
-            conf);
+        IndexWriterConfig[] indexWriterConfigs = loadWritersConfiguration(conf);
         this.indexWriters = new HashMap<>();
 
         for (IndexWriterConfig indexWriterConfig : indexWriterConfigs) {
@@ -97,6 +97,9 @@ private IndexWriters(Configuration conf) {
             indexWriters.put(indexWriterConfig.getId(), writerWrapper);
           }
         }
+
+        this.exchanges = new Exchanges(conf);
+        this.exchanges.open();
       } catch (PluginRuntimeException e) {
         throw new RuntimeException(e);
       }
@@ -181,6 +184,19 @@ private NutchDocument mapDocument(final NutchDocument document,
     }
   }
 
+  /**
+   * Ensures if there are not available exchanges, the document will be routed to all configured
index writers.
+   *
+   * @param doc Document to process.
+   * @return Index writers IDs.
+   */
+  private Collection<String> getIndexWriters(NutchDocument doc) {
+    if (this.exchanges.areAvailableExchanges()) {
+      return Arrays.asList(this.exchanges.indexWriters(doc));
+    }
+    return this.indexWriters.keySet();
+  }
+
   /**
    * Initializes the internal variables of index writers.
    *
@@ -198,29 +214,35 @@ public void open(Configuration conf, String name) throws IOException
{
   }
 
   public void write(NutchDocument doc) throws IOException {
-    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
-        .entrySet()) {
+    for (String indexWriterId : getIndexWriters(doc)) {
       NutchDocument mappedDocument = mapDocument(doc,
-          entry.getValue().getIndexWriterConfig().getMapping());
-      entry.getValue().getIndexWriter().write(mappedDocument);
+          this.indexWriters.get(indexWriterId).getIndexWriterConfig()
+              .getMapping());
+      this.indexWriters.get(indexWriterId).getIndexWriter()
+          .write(mappedDocument);
     }
   }
 
   public void update(NutchDocument doc) throws IOException {
-    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
-        .entrySet()) {
-      entry.getValue().getIndexWriter().update(mapDocument(doc,
-          entry.getValue().getIndexWriterConfig().getMapping()));
+    for (String indexWriterId : getIndexWriters(doc)) {
+      NutchDocument mappedDocument = mapDocument(doc,
+          this.indexWriters.get(indexWriterId).getIndexWriterConfig()
+              .getMapping());
+      this.indexWriters.get(indexWriterId).getIndexWriter()
+          .update(mappedDocument);
     }
   }
 
-  public void delete(String key) throws IOException {
-    for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
-        .entrySet()) {
-      entry.getValue().getIndexWriter().delete(key);
+  public void delete(String key, NutchDocument doc) throws IOException {
+    for (String indexWriterId : getIndexWriters(doc)) {
+      this.indexWriters.get(indexWriterId).getIndexWriter().delete(key);
     }
   }
 
+  public void delete(String key) throws IOException {
+
+  }
+
   public void close() throws IOException {
     for (Map.Entry<String, IndexWriterWrapper> entry : this.indexWriters
         .entrySet()) {
diff --git a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
index 3ce4f8061..1837e1fcd 100644
--- a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
+++ b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
@@ -54,7 +54,7 @@ public void write(Text key, NutchIndexAction indexAction)
         if (indexAction.action == NutchIndexAction.ADD) {
           writers.write(indexAction.doc);
         } else if (indexAction.action == NutchIndexAction.DELETE) {
-          writers.delete(key.toString());
+          writers.delete(key.toString(), indexAction.doc);
         }
       }
     };
diff --git a/src/plugin/build.xml b/src/plugin/build.xml
index a9cb912cc..0744167cc 100755
--- a/src/plugin/build.xml
+++ b/src/plugin/build.xml
@@ -39,6 +39,7 @@
     <ant dir="creativecommons" target="deploy"/>
     <ant dir="feed" target="deploy"/>
     <ant dir="headings" target="deploy"/>
+    <ant dir="exchange-jexl" target="deploy"/>
     <ant dir="index-anchor" target="deploy"/>
     <ant dir="index-basic" target="deploy"/>
     <ant dir="index-geoip" target="deploy"/>
@@ -170,6 +171,7 @@
     <ant dir="creativecommons" target="clean"/>
     <ant dir="feed" target="clean"/>
     <ant dir="headings" target="clean"/>
+    <ant dir="exchange-jexl" target="clean"/>
     <ant dir="index-anchor" target="clean"/>
     <ant dir="index-basic" target="clean"/>
     <ant dir="index-geoip" target="clean"/>
diff --git a/src/plugin/exchange-jexl/build-ivy.xml b/src/plugin/exchange-jexl/build-ivy.xml
new file mode 100644
index 000000000..eb83ef2a9
--- /dev/null
+++ b/src/plugin/exchange-jexl/build-ivy.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="exchange-jexl" default="deps-jar" xmlns:ivy="antlib:org.apache.ivy.ant">
+
+    <property name="ivy.install.version" value="2.1.0" />
+    <condition property="ivy.home" value="${env.IVY_HOME}">
+      <isset property="env.IVY_HOME" />
+    </condition>
+    <property name="ivy.home" value="${user.home}/.ant" />
+    <property name="ivy.checksums" value="" />
+    <property name="ivy.jar.dir" value="${ivy.home}/lib" />
+    <property name="ivy.jar.file" value="${ivy.jar.dir}/ivy.jar" />
+
+    <target name="download-ivy" unless="offline">
+
+        <mkdir dir="${ivy.jar.dir}"/>
+        <!-- download Ivy from web site so that it can be used even without any special
installation -->
+        <get src="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.install.version}/ivy-${ivy.install.version}.jar"

+             dest="${ivy.jar.file}" usetimestamp="true"/>
+    </target>
+
+    <target name="init-ivy" depends="download-ivy">
+      <!-- try to load ivy here from ivy home, in case the user has not already dropped
+              it into ant's lib dir (note that the latter copy will always take precedence).
+              We will not fail as long as local lib dir exists (it may be empty) and
+              ivy is in at least one of ant's lib dir or the local lib dir. -->
+        <path id="ivy.lib.path">
+            <fileset dir="${ivy.jar.dir}" includes="*.jar"/>
+
+        </path>
+        <taskdef resource="org/apache/ivy/ant/antlib.xml"
+                 uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
+    </target>
+
+  <target name="deps-jar" depends="init-ivy">
+    <ivy:retrieve pattern="lib/[artifact]-[revision].[ext]"/>
+  </target>
+
+</project>
diff --git a/src/plugin/exchange-jexl/build.xml b/src/plugin/exchange-jexl/build.xml
new file mode 100644
index 000000000..e42304715
--- /dev/null
+++ b/src/plugin/exchange-jexl/build.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project name="exchange-jexl" default="jar-core">
+
+  <import file="../build-plugin.xml" />
+
+</project>
diff --git a/src/plugin/exchange-jexl/ivy.xml b/src/plugin/exchange-jexl/ivy.xml
new file mode 100644
index 000000000..24d76063d
--- /dev/null
+++ b/src/plugin/exchange-jexl/ivy.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<ivy-module version="1.0">
+  <info organisation="org.apache.nutch" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
+    <description>
+        Apache Nutch
+    </description>
+  </info>
+
+  <configurations>
+    <include file="../../../ivy/ivy-configurations.xml"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+
+  <dependencies>
+  </dependencies>
+  
+</ivy-module>
diff --git a/src/plugin/exchange-jexl/plugin.xml b/src/plugin/exchange-jexl/plugin.xml
new file mode 100644
index 000000000..7507d2621
--- /dev/null
+++ b/src/plugin/exchange-jexl/plugin.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<plugin id="exchange-jexl" name="JEXLExchange" version="1.0.0"
+        provider-name="nutch.apache.org">
+
+  <runtime>
+    <library name="exchange-jexl.jar">
+      <export name="*"/>
+    </library>
+  </runtime>
+
+  <requires>
+    <import plugin="nutch-extensionpoints"/>
+  </requires>
+
+  <extension id="org.apache.nutch.exchange.jexl"
+             name="JEXL Exchange"
+             point="org.apache.nutch.exchange.Exchange">
+    <implementation id="JEXLExchange"
+                    class="org.apache.nutch.exchange.jexl.JexlExchange"/>
+  </extension>
+
+</plugin>
diff --git a/src/plugin/exchange-jexl/src/java/org/apache/nutch/exchange/jexl/JexlExchange.java
b/src/plugin/exchange-jexl/src/java/org/apache/nutch/exchange/jexl/JexlExchange.java
new file mode 100644
index 000000000..5273a5723
--- /dev/null
+++ b/src/plugin/exchange-jexl/src/java/org/apache/nutch/exchange/jexl/JexlExchange.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.exchange.jexl;
+
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.MapContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.exchange.Exchange;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.util.JexlUtil;
+
+import java.util.Map;
+
+public class JexlExchange implements Exchange {
+
+  private static final String EXPRESSION_KEY = "expr";
+
+  private Configuration conf;
+
+  private Expression expression;
+
+  /**
+   * Initializes the internal variables.
+   *
+   * @param parameters Params from the exchange configuration.
+   */
+  @Override
+  public void open(Map<String, String> parameters) {
+    expression = JexlUtil.parseExpression(parameters.get(EXPRESSION_KEY));
+  }
+
+  /**
+   * Determines if the document must go to the related index writers.
+   *
+   * @param doc The given document.
+   * @return True if the given document match with this exchange. False in other case.
+   */
+  @Override
+  public boolean match(NutchDocument doc) {
+    // Create a context and add data
+    JexlContext jexlContext = new MapContext();
+    jexlContext.set("doc", doc);
+
+    try {
+      if (Boolean.TRUE.equals(expression.evaluate(jexlContext))) {
+        return true;
+      }
+    } catch (Exception ignored) {
+    }
+
+    return false;
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+}
diff --git a/src/plugin/exchange-jexl/src/java/org/apache/nutch/exchange/jexl/package-info.java
b/src/plugin/exchange-jexl/src/java/org/apache/nutch/exchange/jexl/package-info.java
new file mode 100644
index 000000000..679306adb
--- /dev/null
+++ b/src/plugin/exchange-jexl/src/java/org/apache/nutch/exchange/jexl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Plugin of Exchange component based on JEXL expressions.
+ *
+ * @since 1.15
+ */
+package org.apache.nutch.exchange.jexl;
\ No newline at end of file
diff --git a/src/plugin/nutch-extensionpoints/plugin.xml b/src/plugin/nutch-extensionpoints/plugin.xml
index b6a2e9dcc..2ae815f59 100644
--- a/src/plugin/nutch-extensionpoints/plugin.xml
+++ b/src/plugin/nutch-extensionpoints/plugin.xml
@@ -24,6 +24,10 @@
    <!-- this file hosts all extension points nutch core code offers. 
    Please not that plugins can define extension points as well to be extendable.-->
 
+<extension-point
+      id="org.apache.nutch.exchange.Exchange"
+      name="Nutch Exchange"/>
+
 <extension-point
       id="org.apache.nutch.indexer.IndexingFilter"
       name="Nutch Indexing Filter"/>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Exchange component for indexing job
> -----------------------------------
>
>                 Key: NUTCH-2412
>                 URL: https://issues.apache.org/jira/browse/NUTCH-2412
>             Project: Nutch
>          Issue Type: New Feature
>          Components: indexer, plugin
>    Affects Versions: 1.14
>            Reporter: Roannel Fernández Hernández
>            Priority: Minor
>             Fix For: 1.15
>
>
> The exchange component acts in indexing job and decides which index writer a document
should go to. It includes an extension point to allow developers to develop plugins with their
own logic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Mime
View raw message