nutch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sna...@apache.org
Subject [nutch] branch master updated: NUTCH-2627 Fetcher to optionally filter URLs - filter and normalize URLs in QueueFeeder if fetcher.filter.urls resp. fetcher.normalize.urls are true (default is false, i.e., fetcher does not filter and normalize)
Date Fri, 22 Feb 2019 14:51:20 GMT
This is an automated email from the ASF dual-hosted git repository.

snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new 546237d  NUTCH-2627 Fetcher to optionally filter URLs - filter and normalize URLs
in QueueFeeder   if fetcher.filter.urls resp. fetcher.normalize.urls are true   (default is
false, i.e., fetcher does not filter and normalize)
     new 8cf9e67  Merge pull request #370 from sebastian-nagel/NUTCH-2627-fetcher-filter-urls
546237d is described below

commit 546237d4789b2df958752a722053a89c31c24597
Author: Sebastian Nagel <snagel@apache.org>
AuthorDate: Fri Jul 27 13:52:27 2018 +0200

    NUTCH-2627 Fetcher to optionally filter URLs
    - filter and normalize URLs in QueueFeeder
      if fetcher.filter.urls resp. fetcher.normalize.urls are true
      (default is false, i.e., fetcher does not filter and normalize)
---
 conf/nutch-default.xml                             | 12 +++
 src/java/org/apache/nutch/fetcher/QueueFeeder.java | 91 ++++++++++++++++------
 2 files changed, 78 insertions(+), 25 deletions(-)

diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index a42e6a9..77ba170 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -1172,6 +1172,18 @@
 	Publisher implementation specific properties</description>
 </property> 
 
+<property>
+  <name>fetcher.filter.urls</name>
+  <value>false</value>
+  <description>Whether fetcher will filter URLs (with the configured URL filters).</description>
+</property>
+
+<property>
+  <name>fetcher.normalize.urls</name>
+  <value>false</value>
+  <description>Whether fetcher will normalize URLs (with the configured URL normalizers).</description>
+</property>
+
 <!--  any23 plugin properties -->
 
 <property>
diff --git a/src/java/org/apache/nutch/fetcher/QueueFeeder.java b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
index 72009ad..f5fa663 100644
--- a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
+++ b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -18,10 +18,15 @@ package org.apache.nutch.fetcher;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.net.MalformedURLException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.fetcher.Fetcher.FetcherRun;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +43,9 @@ public class QueueFeeder extends Thread {
   private FetchItemQueues queues;
   private int size;
   private long timelimit = -1;
+  private URLFilters urlFilters = null;
+  private URLNormalizers urlNormalizers = null;
+  private String urlNormalizerScope = URLNormalizers.SCOPE_DEFAULT;
 
   public QueueFeeder(FetcherRun.Context context,
       FetchItemQueues queues, int size) {
@@ -46,20 +54,43 @@ public class QueueFeeder extends Thread {
     this.size = size;
     this.setDaemon(true);
     this.setName("QueueFeeder");
+    Configuration conf = context.getConfiguration();
+    if (conf.getBoolean("fetcher.filter.urls", false)) {
+      urlFilters = new URLFilters(conf);
+    }
+    if (conf.getBoolean("fetcher.normalize.urls", false)) {
+      urlNormalizers = new URLNormalizers(conf, urlNormalizerScope);
+    }
   }
 
   public void setTimeLimit(long tl) {
     timelimit = tl;
   }
 
+  /** Filter and normalize the url */
+  private String filterNormalize(String url) {
+    if (url != null) {
+      try {
+        if (urlNormalizers != null)
+          url = urlNormalizers.normalize(url, urlNormalizerScope); // normalize the url
+        if (urlFilters != null)
+          url = urlFilters.filter(url);
+      } catch (MalformedURLException | URLFilterException e) {
+        LOG.warn("Skipping {}: {}", url, e);
+        url = null;
+      }
+    }
+    return url;
+  }
+
   public void run() {
     boolean hasMore = true;
     int cnt = 0;
     int timelimitcount = 0;
     while (hasMore) {
       if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
-        // enough .. lets' simply
-        // read all the entries from the input without processing them
+        // enough ... lets' simply read all the entries from the input without
+        // processing them
         try {
           hasMore = context.nextKeyValue();
           timelimitcount++;
@@ -77,33 +108,43 @@ public class QueueFeeder extends Thread {
         // queues are full - spin-wait until they have some free space
         try {
           Thread.sleep(1000);
-        } catch (Exception e) {
+        } catch (InterruptedException e) {
         }
-        ;
         continue;
-      } else {
-        LOG.debug("-feeding {} input urls ...", feed);
-        while (feed > 0 && hasMore) {
-          try {
-            hasMore = context.nextKeyValue();
-            if (hasMore) {
-              /*
-               * Need to copy key and value objects because MapReduce will reuse
-               * the original objects while the objects are stored in the queue.
-               */
-              Text url = new Text((Text)context.getCurrentKey());
-              CrawlDatum datum = new CrawlDatum();
-              datum.set((CrawlDatum)context.getCurrentValue());
-              queues.addFetchItem(url, datum);
-              cnt++;
-              feed--;
+      }
+      LOG.debug("-feeding {} input urls ...", feed);
+      while (feed > 0 && hasMore) {
+        try {
+          hasMore = context.nextKeyValue();
+          if (hasMore) {
+            Text url = context.getCurrentKey();
+            if (urlFilters != null || urlNormalizers != null) {
+              String u = filterNormalize(url.toString());
+              if (u == null) {
+                // filtered or failed to normalize
+                context.getCounter("FetcherStatus", "filtered").increment(1);
+                continue;
+              }
+              url = new Text(u);
             }
-          } catch (IOException e) {
-            LOG.error("QueueFeeder error reading input, record " + cnt, e);
-            return;
-          } catch (InterruptedException e) {
-            LOG.info("QueueFeeder interrupted, exception:", e);
+            /*
+             * Need to copy key and value objects because MapReduce will reuse
+             * the original objects while the objects are stored in the queue.
+             */
+            else {
+              url = new Text(url);
+            }
+            CrawlDatum datum = new CrawlDatum();
+            datum.set((CrawlDatum) context.getCurrentValue());
+            queues.addFetchItem(url, datum);
+            cnt++;
+            feed--;
           }
+        } catch (IOException e) {
+          LOG.error("QueueFeeder error reading input, record " + cnt, e);
+          return;
+        } catch (InterruptedException e) {
+          LOG.info("QueueFeeder interrupted, exception:", e);
         }
       }
     }


Mime
View raw message