crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [27/28] Rename scrunch packages and add license headers
Date Sat, 07 Jul 2012 21:49:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/JoinTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/JoinTest.scala b/scrunch/src/test/scala/org/apache/scrunch/JoinTest.scala
new file mode 100644
index 0000000..5303c03
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/JoinTest.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class JoinTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[CogroupTest]
+
+  def wordCount(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+        .flatMap(_.toLowerCase.split("\\W+")).count
+  }
+
+  @Test def join {
+    val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
+    val maugham = FileHelper.createTempCopyOf("maugham.txt")
+    val output = FileHelper.createOutputPath()
+    output.deleteOnExit()
+    val filtered = wordCount(shakespeare).join(wordCount(maugham))
+        .map((k, v) => (k, v._1 - v._2))
+        .write(to.textFile(output.getAbsolutePath()))
+        .filter((k, d) => d > 0).materialize
+    assert(filtered.exists(_ == ("macbeth", 66)))
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/PageRankClassTest.scala b/scrunch/src/test/scala/org/apache/scrunch/PageRankClassTest.scala
new file mode 100644
index 0000000..9ab7897
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/PageRankClassTest.scala
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import Avros._
+
+import org.apache.crunch.{DoFn, Emitter, Pair => P}
+import org.apache.crunch.io.{From => from}
+import org.apache.crunch.test.FileHelper
+
+import scala.collection.mutable.HashMap
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+case class PageRankData(pr: Float, oldpr: Float, urls: Array[String]) {
+  def this() = this(0f, 0f, null)
+
+  def scaledPageRank = pr / urls.length
+
+  def next(newPageRank: Float) = new PageRankData(newPageRank, pr, urls)
+
+  def delta = math.abs(pr - oldpr)
+}
+
+class CachingPageRankClassFn extends DoFn[P[String, PageRankData], P[String, Float]] {
+  val cache = new HashMap[String, Float] {
+    override def default(key: String) = 0f
+  }
+
+  override def process(input: P[String, PageRankData], emitFn: Emitter[P[String, Float]])
{
+    val prd = input.second()
+    if (prd.urls.length > 0) {
+      val newpr = prd.pr / prd.urls.length
+      prd.urls.foreach(url => cache.put(url, cache(url) + newpr))
+      if (cache.size > 5000) {
+        cleanup(emitFn)
+      }
+    }
+  }
+
+  override def cleanup(emitFn: Emitter[P[String, Float]]) {
+    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
+    cache.clear
+  }
+}
+
+class PageRankClassTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[PageRankTest]
+
+  def initialInput(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
+      .groupByKey
+      .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray)))
+  }
+
+  def update(prev: PTable[String, PageRankData], d: Float) = {
+    val outbound = prev.flatMap((url, prd) => {
+      prd.urls.map(link => (link, prd.scaledPageRank))
+    })
+    cg(prev, outbound, d)
+  }
+
+  def cg(prev: PTable[String, PageRankData],
+         out: PTable[String, Float], d: Float) = {
+    prev.cogroup(out).map((url, v) => {
+      val (p, o) = v
+      val prd = p.head
+      (url, prd.next((1 - d) + d * o.sum))
+    })
+  }
+
+  def fastUpdate(prev: PTable[String, PageRankData], d: Float) = {
+    val outbound = prev.parallelDo(new CachingPageRankClassFn(), tableOf(strings, floats))
+    cg(prev, outbound, d)
+  }
+
+  @Test def testPageRank {
+    pipeline.getConfiguration.set("crunch.debug", "true")
+    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = update(prev, 0.5f)
+      delta = prev.values.map(_.delta).max.materialize.head
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+
+  def testFastPageRank {
+    pipeline.getConfiguration.set("crunch.debug", "true")
+    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = fastUpdate(prev, 0.5f)
+      delta = prev.values.map(_.delta).max.materialize.head
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/PageRankTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/PageRankTest.scala b/scrunch/src/test/scala/org/apache/scrunch/PageRankTest.scala
new file mode 100644
index 0000000..cbf7ebf
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/PageRankTest.scala
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import Avros._
+
+import org.apache.crunch.{DoFn, Emitter, Pair => P}
+import org.apache.crunch.io.{From => from}
+import org.apache.crunch.test.FileHelper
+
+import scala.collection.mutable.HashMap
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Assert._
+import _root_.org.junit.Test
+
+class CachingPageRankFn extends DoFn[P[String, (Float, Float, List[String])], P[String, Float]]
{
+  val cache = new HashMap[String, Float] {
+    override def default(key: String) = 0f
+  }
+
+  override def process(input: P[String, (Float, Float, List[String])], emitFn: Emitter[P[String,
Float]]) {
+    val (pr, oldpr, urls) = input.second()
+    val newpr = pr / urls.size
+    urls.foreach(url => cache.put(url, cache(url) + newpr))
+    if (cache.size > 5000) {
+      cleanup(emitFn)
+    }
+  }
+
+  override def cleanup(emitFn: Emitter[P[String, Float]]) {
+    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
+    cache.clear
+  }
+}
+
+class PageRankTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[PageRankTest]
+
+  def initialInput(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
+      .groupByKey
+      .map((url, links) => (url, (1f, 0f, links.toList)))
+  }
+
+  def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
+    val outbound = prev.flatMap((url, v) => {
+      val (pr, oldpr, links) = v
+      links.map(link => (link, pr / links.size))
+    })
+    cg(prev, outbound, d)
+  }
+
+  def cg(prev: PTable[String, (Float, Float, List[String])],
+         out: PTable[String, Float], d: Float) = {
+    prev.cogroup(out).map((url, v) => {
+      val (p, o) = v
+      val (pr, oldpr, links) = p.head
+      (url, ((1 - d) + d * o.sum, pr, links))
+    })
+  }
+
+  def fastUpdate(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
+    val outbound = prev.parallelDo(new CachingPageRankFn(), tableOf(strings, floats))
+    cg(prev, outbound, d)
+  }
+
+  @Test def testPageRank {
+    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = update(prev, 0.5f)
+      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+
+  @Test def testFastPageRank {
+    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
+    var delta = 1.0f
+    while (delta > 0.01f) {
+      prev = fastUpdate(prev, 0.5f)
+      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
+    }
+    assertEquals(0.0048, delta, 0.001)
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/PipelineAppTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/PipelineAppTest.scala b/scrunch/src/test/scala/org/apache/scrunch/PipelineAppTest.scala
new file mode 100644
index 0000000..fe33aac
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/PipelineAppTest.scala
@@ -0,0 +1,45 @@
+/**
+ * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+import org.apache.crunch.test.FileHelper
+import org.apache.scrunch.PipelineApp
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+object WordCount extends PipelineApp {
+
+  def wordSplit(line: String) = line.split("\\W+").filter(!_.isEmpty())
+
+  def countWords(filename: String) = {
+    val lines = read(from.textFile(filename))
+    val words = lines.flatMap(wordSplit)
+    words.count
+  }
+
+  val w1 = countWords(args(0))
+  val w2 = countWords(args(1))
+  cogroup(w1, w2).write(to.textFile(args(2)))
+}
+
+class PipelineAppTest extends JUnitSuite {
+  @Test def run {
+    val args = new Array[String](3)
+    args(0) = FileHelper.createTempCopyOf("shakes.txt")
+    args(1) = FileHelper.createTempCopyOf("maugham.txt")
+    args(2) = FileHelper.createOutputPath.getAbsolutePath
+    WordCount.main(args)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/TopTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/TopTest.scala b/scrunch/src/test/scala/org/apache/scrunch/TopTest.scala
new file mode 100644
index 0000000..bac27bd
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/TopTest.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class TopTest extends JUnitSuite {
+
+  @Test def topInMem {
+    val ptable = Mem.tableOf(("foo", 17), ("bar", 29), ("baz", 1729))
+    assert(ptable.top(1, true).materialize.head == ("baz", 1729))
+  }
+
+  @Test def top2 {
+    val pipeline = Pipeline.mapReduce[TopTest]
+    val input = FileHelper.createTempCopyOf("shakes.txt")
+
+    val wc = pipeline.read(from.textFile(input))
+        .flatMap(_.toLowerCase.split("\\s+"))
+        .filter(!_.isEmpty()).count
+    assert(wc.top(10, true).materialize.exists(_ == ("is", 205)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/UnionTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/UnionTest.scala b/scrunch/src/test/scala/org/apache/scrunch/UnionTest.scala
new file mode 100644
index 0000000..63fecdb
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/UnionTest.scala
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from}
+import org.apache.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class UnionTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[UnionTest]
+  val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
+  val maugham = FileHelper.createTempCopyOf("maugham.txt")
+
+  def wordCount(col: PCollection[String]) = {
+    col.flatMap(_.toLowerCase.split("\\W+")).count
+  }
+
+  @Test def testUnionCollection {
+    val union = pipeline.read(from.textFile(shakespeare)).union(
+        pipeline.read(from.textFile(maugham)))
+    val wc = wordCount(union).materialize
+    assert(wc.exists(_ == ("you", 3691)))
+    pipeline.done
+  }
+
+  @Test def testUnionTable {
+    val wcs = wordCount(pipeline.read(from.textFile(shakespeare)))
+    val wcm = wordCount(pipeline.read(from.textFile(maugham)))
+    val wc = wcs.union(wcm).groupByKey.combine(v => v.sum).materialize
+    assert(wc.exists(_ == ("you", 3691)))
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/WordCountTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/WordCountTest.scala b/scrunch/src/test/scala/org/apache/scrunch/WordCountTest.scala
new file mode 100644
index 0000000..e97a1fd
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/WordCountTest.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.test.FileHelper
+
+import java.io.File
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class WordCountTest extends JUnitSuite {
+  @Test def wordCount {
+    val pipeline = Pipeline.mapReduce[WordCountTest]
+    val input = FileHelper.createTempCopyOf("shakes.txt")
+    val wordCountOut = FileHelper.createOutputPath
+
+    val fcc = pipeline.read(from.textFile(input))
+        .flatMap(_.toLowerCase.split("\\s+"))
+        .filter(!_.isEmpty()).count
+        .write(to.textFile(wordCountOut.getAbsolutePath)) // Word counts
+        .map((w, c) => (w.slice(0, 1), c))
+        .groupByKey.combine(v => v.sum).materialize
+    assert(fcc.exists(_ == ("w", 1404)))
+
+    pipeline.done
+    wordCountOut.delete()
+  }
+}


Mime
View raw message