nutch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NUTCH-2474) CrawlDbReader -stats fails with ClassCastException
Date Thu, 14 Dec 2017 15:13:00 GMT

    [ https://issues.apache.org/jira/browse/NUTCH-2474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290969#comment-16290969
] 

ASF GitHub Bot commented on NUTCH-2474:
---------------------------------------

sebastian-nagel closed pull request #255: NUTCH-2474 CrawlDbReader -stats fails with ClassCastException
URL: https://github.com/apache/nutch/pull/255
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index 42b5a3bde..af30664c1 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -202,14 +202,28 @@ public void map(Text key, CrawlDatum value,
       output.collect(new Text("status " + value.getStatus()), COUNT_1);
       output.collect(new Text("retry " + value.getRetriesSinceFetch()),
           COUNT_1);
-      output.collect(new Text("sc"), new NutchWritable(
-          new FloatWritable(value.getScore())));
+
+      if (Float.isNaN(value.getScore())) {
+        output.collect(new Text("scNaN"), COUNT_1);
+      } else {
+        NutchWritable score = new NutchWritable(
+            new FloatWritable(value.getScore()));
+        output.collect(new Text("sc"), score);
+        output.collect(new Text("sct"), score);
+        output.collect(new Text("scd"), score);
+      }
+
       // fetch time (in minutes to prevent from overflows when summing up)
-      output.collect(new Text("ft"), new NutchWritable(
-          new LongWritable(value.getFetchTime() / (1000 * 60))));
+      NutchWritable fetchTime = new NutchWritable(
+          new LongWritable(value.getFetchTime() / (1000 * 60)));
+      output.collect(new Text("ft"), fetchTime);
+      output.collect(new Text("ftt"), fetchTime);
+
       // fetch interval (in seconds)
-      output.collect(new Text("fi"),
-          new NutchWritable(new LongWritable(value.getFetchInterval())));
+      NutchWritable fetchInterval = new NutchWritable(new LongWritable(value.getFetchInterval()));
+      output.collect(new Text("fi"), fetchInterval);
+      output.collect(new Text("fit"), fetchInterval);
+
       if (sort) {
         URL u = new URL(key.toString());
         String host = u.getHost();
@@ -219,88 +233,6 @@ public void map(Text key, CrawlDatum value,
     }
   }
 
-  public static class CrawlDbStatCombiner implements
-      Reducer<Text, NutchWritable, Text, NutchWritable> {
-    LongWritable val = new LongWritable();
-
-    public CrawlDbStatCombiner() {
-    }
-
-    public void configure(JobConf job) {
-    }
-
-    public void close() {
-    }
-
-    private void reduceMinMaxTotal(String keyPrefix, Iterator<NutchWritable> values,
-        OutputCollector<Text, NutchWritable> output, Reporter reporter)
-        throws IOException {
-      long total = 0;
-      long min = Long.MAX_VALUE;
-      long max = Long.MIN_VALUE;
-      while (values.hasNext()) {
-        long cnt = ((LongWritable) values.next().get()).get();
-        if (cnt < min)
-          min = cnt;
-        if (cnt > max)
-          max = cnt;
-        total += cnt;
-      }
-      output.collect(new Text(keyPrefix + "n"),
-          new NutchWritable(new LongWritable(min)));
-      output.collect(new Text(keyPrefix + "x"),
-          new NutchWritable(new LongWritable(max)));
-      output.collect(new Text(keyPrefix + "t"),
-          new NutchWritable(new LongWritable(total)));
-    }
-    
-    private void reduceMinMaxTotalFloat(String keyPrefix, Iterator<NutchWritable> values,
-        OutputCollector<Text, NutchWritable> output, Reporter reporter)
-        throws IOException {
-      double total = 0;
-      float min = Float.MAX_VALUE;
-      float max = Float.MIN_VALUE;
-      TDigest tdigest = TDigest.createMergingDigest(100.0);
-      while (values.hasNext()) {
-        float val = ((FloatWritable) values.next().get()).get();
-        tdigest.add(val);
-        if (val < min)
-          min = val;
-        if (val > max)
-          max = val;
-        total += val;
-      }
-      output.collect(new Text(keyPrefix + "n"),
-          new NutchWritable(new FloatWritable(min)));
-      output.collect(new Text(keyPrefix + "x"),
-          new NutchWritable(new FloatWritable(max)));
-      output.collect(new Text(keyPrefix + "t"),
-          new NutchWritable(new FloatWritable((float) total)));
-      ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize());
-      tdigest.asSmallBytes(tdigestBytes);
-      output.collect(new Text(keyPrefix + "d"),
-          new NutchWritable(new BytesWritable(tdigestBytes.array())));
-    }
-
-    public void reduce(Text key, Iterator<NutchWritable> values,
-        OutputCollector<Text, NutchWritable> output, Reporter reporter)
-        throws IOException {
-      val.set(0L);
-      String k = key.toString();
-      if (k.equals("sc")) {
-        reduceMinMaxTotalFloat(k, values, output, reporter);
-      } else if (k.equals("ft") || k.equals("fi")) {
-        reduceMinMaxTotal(k, values, output, reporter);
-      } else {
-        while (values.hasNext()) {
-          LongWritable cnt = (LongWritable) values.next().get();
-          val.set(val.get() + cnt.get());
-        }
-        output.collect(key, new NutchWritable(val));
-      }
-    }
-  }
-
   public static class CrawlDbStatReducer implements
       Reducer<Text, NutchWritable, Text, NutchWritable> {
     public void configure(JobConf job) {
@@ -314,7 +246,8 @@ public void reduce(Text key, Iterator<NutchWritable> values,
         throws IOException {
 
       String k = key.toString();
-      if (k.equals("T")) {
+      if (k.equals("T") || k.startsWith("status") || k.startsWith("retry")
+          || k.equals("ftt") || k.equals("fit")) {
         // sum all values for this key
         long sum = 0;
         while (values.hasNext()) {
@@ -323,68 +256,62 @@ public void reduce(Text key, Iterator<NutchWritable> values,
         }
         // output sum
         output.collect(key, new NutchWritable(new LongWritable(sum)));
-      } else if (k.startsWith("status") || k.startsWith("retry")) {
-        LongWritable cnt = new LongWritable();
-        while (values.hasNext()) {
-          LongWritable val = (LongWritable) values.next().get();
-          cnt.set(cnt.get() + val.get());
-        }
-        output.collect(key, new NutchWritable(cnt));
-      } else if (k.equals("scx")) {
-        FloatWritable max = new FloatWritable(Float.MIN_VALUE);
-        while (values.hasNext()) {
-          FloatWritable val = (FloatWritable) values.next().get();
-          if (max.get() < val.get())
-            max.set(val.get());
-        }
-        output.collect(key, new NutchWritable(max));
-      } else if (k.equals("ftx") || k.equals("fix")) {
-        LongWritable cnt = new LongWritable(Long.MIN_VALUE);
-        while (values.hasNext()) {
-          LongWritable val = (LongWritable) values.next().get();
-          if (cnt.get() < val.get())
-            cnt.set(val.get());
-        }
-        output.collect(key, new NutchWritable(cnt));
-      } else if (k.equals("scn")) {
-        FloatWritable min = new FloatWritable(Float.MAX_VALUE);
+      } else if (k.equals("sc")) {
+        float min = Float.MAX_VALUE;
+        float max = Float.MIN_VALUE;
         while (values.hasNext()) {
-          FloatWritable val = (FloatWritable) values.next().get();
-          if (min.get() > val.get())
-            min.set(val.get());
+          float value = ((FloatWritable) values.next().get()).get();
+          if (max < value) {
+            max = value;
+          }
+          if (min > value) {
+            min = value;
+          }
         }
-        output.collect(key, new NutchWritable(min));
-      } else if (k.equals("ftn") || k.equals("fin")) {
-        LongWritable cnt = new LongWritable(Long.MAX_VALUE);
+        output.collect(key, new NutchWritable(new FloatWritable(min)));
+        output.collect(key, new NutchWritable(new FloatWritable(max)));
+      } else if (k.equals("ft") || k.equals("fi")) {
+        long min = Long.MAX_VALUE;
+        long max = Long.MIN_VALUE;
         while (values.hasNext()) {
-          LongWritable val = (LongWritable) values.next().get();
-          if (cnt.get() > val.get())
-            cnt.set(val.get());
+          long value = ((LongWritable) values.next().get()).get();
+          if (max < value) {
+            max = value;
+          }
+          if (min > value) {
+            min = value;
+          }
         }
-        output.collect(key, new NutchWritable(cnt));
+        output.collect(key, new NutchWritable(new LongWritable(min)));
+        output.collect(key, new NutchWritable(new LongWritable(max)));
       } else if (k.equals("sct")) {
-        FloatWritable cnt = new FloatWritable();
-        while (values.hasNext()) {
-          FloatWritable val = (FloatWritable) values.next().get();
-          cnt.set(cnt.get() + val.get());
-        }
-        output.collect(key, new NutchWritable(cnt));
-      } else if (k.equals("ftt") || k.equals("fit")) {
-        LongWritable cnt = new LongWritable();
+        float cnt = 0.0f;
         while (values.hasNext()) {
-          LongWritable val = (LongWritable) values.next().get();
-          cnt.set(cnt.get() + val.get());
+          float value = ((FloatWritable) values.next().get()).get();
+          cnt += value;
         }
-        output.collect(key, new NutchWritable(cnt));
-      } else if (k.equals("scd") || k.equals("ftd") || k.equals("fid")) {
+        output.collect(key, new NutchWritable(new FloatWritable(cnt)));
+      } else if (k.equals("scd")) {
         MergingDigest tdigest = null;
         while (values.hasNext()) {
-          byte[] bytes = ((BytesWritable) values.next().get()).getBytes();
-          MergingDigest tdig = MergingDigest.fromBytes(ByteBuffer.wrap(bytes));
-          if (tdigest == null) {
-            tdigest = tdig;
-          } else {
-            tdigest.add(tdig);
+          Writable value = values.next().get();
+          if (value instanceof BytesWritable) {
+            byte[] bytes = ((BytesWritable) value).getBytes();
+            MergingDigest tdig = MergingDigest
+                .fromBytes(ByteBuffer.wrap(bytes));
+            if (tdigest == null) {
+              tdigest = tdig;
+            } else {
+              tdigest.add(tdig);
+            }
+          } else if (value instanceof FloatWritable) {
+            float val = ((FloatWritable) value).get();
+            if (!Float.isNaN(val)) {
+              if (tdigest == null) {
+                tdigest = (MergingDigest) TDigest.createMergingDigest(100.0);
+              }
+              tdigest.add(val);
+            }
           }
         }
         ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize());
@@ -455,7 +382,7 @@ public void close() {
 	  job.setInputFormat(SequenceFileInputFormat.class);
 
 	  job.setMapperClass(CrawlDbStatMapper.class);
-	  job.setCombinerClass(CrawlDbStatCombiner.class);
+	  job.setCombinerClass(CrawlDbStatReducer.class);
 	  job.setReducerClass(CrawlDbStatReducer.class);
 
 	  FileOutputFormat.setOutputPath(job, tmpFolder);
@@ -486,27 +413,57 @@ public void close() {
 			    stats.put(k, value.get());
 			    continue;
 			  }
-			  if (k.equals("scx")) {
-			    FloatWritable fvalue = (FloatWritable) value.get();
-			    if (((FloatWritable) val).get() < fvalue.get())
-			      ((FloatWritable) val).set(fvalue.get());
-        } else if (k.equals("ftx") || k.equals("fix")) {
-          LongWritable lvalue = (LongWritable) value.get();
-          if (((LongWritable) val).get() < lvalue.get())
-            ((LongWritable) val).set(lvalue.get());
-        } else if (k.equals("scn")) {
-          FloatWritable fvalue = (FloatWritable) value.get();
-          if (((FloatWritable) val).get() > fvalue.get())
-            ((FloatWritable) val).set(fvalue.get());
-			  } else if (k.equals("ftn") || k.equals("fin")) {
-          LongWritable lvalue = (LongWritable) value.get();
-				  if (((LongWritable) val).get() > lvalue.get())
-				    ((LongWritable) val).set(lvalue.get());
+			  if (k.equals("sc")) {
+			    float min = Float.MAX_VALUE;
+          float max = Float.MIN_VALUE;
+			    if (stats.containsKey("scn")) {
+			      min = ((FloatWritable) stats.get("scn")).get();
+			    } else {
+			      min = ((FloatWritable) stats.get("sc")).get();
+			    }
+          if (stats.containsKey("scx")) {
+            max = ((FloatWritable) stats.get("scx")).get();
+          } else {
+            max = ((FloatWritable) stats.get("sc")).get();
+          }
+			    float fvalue = ((FloatWritable) value.get()).get();
+			    if (min > fvalue) {
+			      min = fvalue;
+			    }
+          if (max < fvalue) {
+            max = fvalue;
+          }
+          stats.put("scn", new FloatWritable(min));
+          stats.put("scx", new FloatWritable(max));
+        } else if (k.equals("ft") || k.equals("fi")) {
+          long min = Long.MAX_VALUE;
+          long max = Long.MIN_VALUE;
+          String minKey = k + "n";
+          String maxKey = k + "x";
+          if (stats.containsKey(minKey)) {
+            min = ((LongWritable) stats.get(minKey)).get();
+          } else if (stats.containsKey(k)) {
+            min = ((LongWritable) stats.get(k)).get();
+          }
+          if (stats.containsKey(maxKey)) {
+            max = ((LongWritable) stats.get(maxKey)).get();
+          } else if (stats.containsKey(k)) {
+            max = ((LongWritable) stats.get(k)).get();
+          }
+          long lvalue = ((LongWritable) value.get()).get();
+          if (min > lvalue) {
+            min = lvalue;
+          }
+          if (max < lvalue) {
+            max = lvalue;
+          }
+          stats.put(k + "n", new LongWritable(min));
+          stats.put(k + "x", new LongWritable(max));
 			  } else if (k.equals("sct")) {
           FloatWritable fvalue = (FloatWritable) value.get();
           ((FloatWritable) val)
               .set(((FloatWritable) val).get() + fvalue.get());
-        } else if (k.equals("scd") || k.equals("ftd") || k.equals("fid")) {
+        } else if (k.equals("scd")) {
           MergingDigest tdigest = null;
           MergingDigest tdig = MergingDigest.fromBytes(
               ByteBuffer.wrap(((BytesWritable) value.get()).getBytes()));
@@ -529,6 +486,11 @@ public void close() {
 		  }
 		  reader.close();
 	  }
+    // remove score, fetch interval, and fetch time
+    // (used for min/max calculation)
+    stats.remove("sc");
+    stats.remove("fi");
+    stats.remove("ft");
 	  // removing the tmp folder
 	  fileSystem.delete(tmpFolder, true);
 	  return stats;
@@ -566,6 +528,8 @@ public void processStatJob(String crawlDb, Configuration config, boolean
sort)
           LOG.info("max score:\t" + fvalue);
         } else if (k.equals("sct")) {
           LOG.info("avg score:\t" + (fvalue / totalCnt.get()));
+        } else if (k.equals("scNaN")) {
+          LOG.info("score == NaN:\t" + value);
         } else if (k.equals("ftn")) {
           LOG.info("earliest fetch time:\t" + new Date(1000 * 60 * value));
         } else if (k.equals("ftx")) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> CrawlDbReader -stats fails with ClassCastException
> --------------------------------------------------
>
>                 Key: NUTCH-2474
>                 URL: https://issues.apache.org/jira/browse/NUTCH-2474
>             Project: Nutch
>          Issue Type: Bug
>          Components: crawldb
>    Affects Versions: 1.14
>         Environment: Java 8, distributed mode: Hadoop CDH 5.13.0
>            Reporter: Sebastian Nagel
>            Priority: Critical
>             Fix For: 1.14
>
>
> In distributed mode CrawlDbReader / readdb -stats fails with a ClassCastException in
the combiner:
> {noformat}
> 17/12/08 04:57:13 INFO mapreduce.Job: Task Id : attempt_1512553291624_0022_m_000039_0,
Status : FAILED
> Error: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast
to org.apache.hadoop.io.LongWritable
>         at org.apache.nutch.crawl.CrawlDbReader$CrawlDbStatCombiner.reduce(CrawlDbReader.java:296)
>         at org.apache.nutch.crawl.CrawlDbReader$CrawlDbStatCombiner.reduce(CrawlDbReader.java:222)
>         at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1639)
>         at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1946)
>         at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1514)
>         at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
>         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
> {noformat}
> FloatWritables are used since NUTCH-2470, so that's when this bug was introduced.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message