datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [12/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
new file mode 100644
index 0000000..20f5984
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sampling;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.sampling.SimpleRandomSample;
+import datafu.test.pig.PigTests;
+
+/**
+ * Tests for {@link SimpleRandomSample}.
+ * 
+ * @deprecated This tests the deprecated functionality of SimpleRandomSample
+ *             where the probability can be specified in the constructor.  
+ * @author ximeng
+ * 
+ */
+public class SimpleRandomSampleTestOld extends PigTests
+{
+  /**
+   * 
+   * 
+   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+   * 
+   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+   * 
+   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data) as sample_data;
+   * 
+   * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
+   * 
+   * STORE sampled INTO 'output';
+   */
+  @Multiline
+  private String simpleRandomSampleTest;
+
+  @Test
+  public void simpleRandomSampleTest() throws Exception
+  {
+    writeLinesToFile("input",
+                     "A1\tB1\t1",
+                     "A1\tB1\t4",
+                     "A1\tB3\t4",
+                     "A1\tB4\t4",
+                     "A2\tB1\t4",
+                     "A2\tB2\t4",
+                     "A3\tB1\t3",
+                     "A3\tB1\t1",
+                     "A3\tB3\t77",
+                     "A4\tB1\t3",
+                     "A4\tB2\t3",
+                     "A4\tB3\t59",
+                     "A4\tB4\t29",
+                     "A5\tB1\t4",
+                     "A6\tB2\t3",
+                     "A6\tB2\t55",
+                     "A6\tB3\t1",
+                     "A7\tB1\t39",
+                     "A7\tB2\t27",
+                     "A7\tB3\t85",
+                     "A8\tB1\t4",
+                     "A8\tB2\t45",
+                     "A9\tB3\t92",
+                     "A9\tB3\t0",
+                     "A9\tB6\t42",
+                     "A9\tB5\t1",
+                     "A10\tB1\t7",
+                     "A10\tB2\t23",
+                     "A10\tB2\t1",
+                     "A10\tB2\t31",
+                     "A10\tB6\t41",
+                     "A10\tB7\t52");
+
+    int n = 32;
+    double p = 0.3;
+    int s = (int) Math.ceil(p * n);
+    PigTest test =
+        createPigTestFromString(simpleRandomSampleTest, "SAMPLING_PROBABILITY=" + p);
+
+    test.runScript();
+
+    assertOutput(test, "sampled", "(" + s + ")");
+  }
+
+  /**
+   * 
+   * 
+   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+   * 
+   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+   * 
+   * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data) as sample_data;
+   * 
+   * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
+   * 
+   * sampled = ORDER sampled BY group;
+   * 
+   * STORE sampled INTO 'output';
+   */
+  @Multiline
+  private String stratifiedSampleTest;
+
+  @Test
+  public void stratifiedSampleTest() throws Exception
+  {
+    writeLinesToFile("input",
+                     "A1\tB1\t1",
+                     "A1\tB1\t4",
+                     "A1\tB3\t4",
+                     "A1\tB4\t4",
+                     "A2\tB1\t4",
+                     "A2\tB2\t4",
+                     "A3\tB1\t3",
+                     "A3\tB1\t1",
+                     "A3\tB3\t77",
+                     "A4\tB1\t3",
+                     "A4\tB2\t3",
+                     "A4\tB3\t59",
+                     "A4\tB4\t29",
+                     "A5\tB1\t4",
+                     "A6\tB2\t3",
+                     "A6\tB2\t55",
+                     "A6\tB3\t1",
+                     "A7\tB1\t39",
+                     "A7\tB2\t27",
+                     "A7\tB3\t85",
+                     "A8\tB1\t4",
+                     "A8\tB2\t45",
+                     "A9\tB3\t92",
+                     "A9\tB3\t0",
+                     "A9\tB6\t42",
+                     "A9\tB5\t1",
+                     "A10\tB1\t7",
+                     "A10\tB2\t23",
+                     "A10\tB2\t1",
+                     "A10\tB2\t31",
+                     "A10\tB6\t41",
+                     "A10\tB7\t52");
+
+    double p = 0.5;
+
+    PigTest test =
+        createPigTestFromString(stratifiedSampleTest, "SAMPLING_PROBABILITY=" + p);
+    test.runScript();
+    assertOutput(test,
+                 "sampled",
+                 "(A1,2)",
+                 "(A10,3)",
+                 "(A2,1)",
+                 "(A3,2)",
+                 "(A4,2)",
+                 "(A5,1)",
+                 "(A6,2)",
+                 "(A7,2)",
+                 "(A8,1)",
+                 "(A9,2)");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
new file mode 100644
index 0000000..4132203
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sampling;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.sampling.SimpleRandomSampleWithReplacementElect;
+import datafu.pig.sampling.SimpleRandomSampleWithReplacementVote;
+import datafu.test.pig.PigTests;
+
+/**
+ * Tests for {@link SimpleRandomSampleWithReplacementVote} and
+ * {@link SimpleRandomSampleWithReplacementElect}.
+ * 
+ * @author ximeng
+ * 
+ */
+public class SimpleRandomSampleWithReplacementTest extends PigTests
+{
+  // @formatter:off
+  /**
+   * 
+   * 
+   * DEFINE SRSWR_VOTE datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
+   * 
+   * DEFINE SRSWR_ELECT datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
+   * 
+   * item = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int); 
+   * 
+   * summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
+   * 
+   * candidates = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRSWR_VOTE(item, $SAMPLE_SIZE, summary.count));
+   * 
+   * sampled = FOREACH (GROUP candidates BY position) GENERATE FLATTEN(SRSWR_ELECT(candidates));
+   * 
+   * sampled = FOREACH (GROUP sampled ALL) GENERATE COUNT(sampled) AS size;
+   * 
+   * STORE sampled INTO 'output';
+   */
+  @Multiline
+  private String simpleRandomSampleWithReplacementTest;
+  // @formatter:on
+
+  @Test
+  public void testSimpleRandomSampleWithReplacement() throws Exception
+  {
+    writeLinesToFile("input",
+                     "A1\tB1\t1",
+                     "A1\tB1\t4",
+                     "A1\tB3\t4",
+                     "A1\tB4\t4",
+                     "A2\tB1\t4",
+                     "A2\tB2\t4",
+                     "A3\tB1\t3",
+                     "A3\tB1\t1",
+                     "A3\tB3\t77",
+                     "A4\tB1\t3",
+                     "A4\tB2\t3",
+                     "A4\tB3\t59",
+                     "A4\tB4\t29",
+                     "A5\tB1\t4",
+                     "A6\tB2\t3",
+                     "A6\tB2\t55",
+                     "A6\tB3\t1",
+                     "A7\tB1\t39",
+                     "A7\tB2\t27",
+                     "A7\tB3\t85",
+                     "A8\tB1\t4",
+                     "A8\tB2\t45",
+                     "A9\tB3\t92",
+                     "A9\tB3\t0",
+                     "A9\tB6\t42",
+                     "A9\tB5\t1",
+                     "A10\tB1\t7",
+                     "A10\tB2\t23",
+                     "A10\tB2\t1",
+                     "A10\tB2\t31",
+                     "A10\tB6\t41",
+                     "A10\tB7\t52");
+
+    int s = 32;
+    PigTest test =
+        createPigTestFromString(simpleRandomSampleWithReplacementTest, "SAMPLE_SIZE=" + s
+            + "L");
+
+    test.runScript();
+
+    assertOutput(test, "sampled", "(" + s + ")");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
new file mode 100644
index 0000000..24e7ec7
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sampling;
+
+import java.io.IOException;
+import java.util.*;
+
+import junit.framework.Assert;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.sampling.WeightedReservoirSample;
+import datafu.test.pig.PigTests;
+
+/**
+ * Tests for {@link WeightedReservoirSample}.
+ *
+ * @author wjian 
+ *
+ */
+public class WeightedReservoirSamplingTests extends PigTests
+{
+  /**
+  
+
+  DEFINE ReservoirSample datafu.pig.sampling.WeightedReservoirSample('$RESERVOIR_SIZE','2');
+  DEFINE Assert datafu.pig.util.Assert();
+  
+  data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int, W:double);
+  sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C,W)) as sample_data;
+  sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+  sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
+  STORE sampled INTO 'output';
+
+   */
+  @Multiline
+  private String weightedReservoirSampleGroupTest;
+  
+  /**
+   * Verifies that WeightedReservoirSample works when data grouped by a key.
+   * In particular it ensures that the reservoir is not reused across keys.
+   * 
+   * <p>
+   * This confirms the fix for DATAFU-11.
+   * </p>
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void weightedReservoirSampleGroupTest() throws Exception
+  {    
+    // first value is the key.  second to last value matches the key so we can
+    // verify the register is reset for each key.  values should not
+    // bleed across to other keys.
+    writeLinesToFile("input",
+                     "1\tB1\t1\t1.0",
+                     "1\tB1\t1\t1.0",
+                     "1\tB3\t1\t1.0",
+                     "1\tB4\t1\t1.0",
+                     "2\tB1\t2\t1.0",
+                     "2\tB2\t2\t1.0",
+                     "3\tB1\t3\t1.0",
+                     "3\tB1\t3\t1.0",
+                     "3\tB3\t3\t1.0",
+                     "4\tB1\t4\t1.0",
+                     "4\tB2\t4\t1.0",
+                     "4\tB3\t4\t1.0",
+                     "4\tB4\t4\t1.0",
+                     "5\tB1\t5\t1.0",
+                     "6\tB2\t6\t1.0",
+                     "6\tB2\t6\t1.0",
+                     "6\tB3\t6\t1.0",
+                     "7\tB1\t7\t1.0",
+                     "7\tB2\t7\t1.0",
+                     "7\tB3\t7\t1.0",
+                     "8\tB1\t8\t1.0",
+                     "8\tB2\t8\t1.0",
+                     "9\tB3\t9\t1.0",
+                     "9\tB3\t9\t1.0",
+                     "9\tB6\t9\t1.0",
+                     "9\tB5\t9\t1.0",
+                     "10\tB1\t10\t1.0",
+                     "10\tB2\t10\t1.0",
+                     "10\tB2\t10\t1.0",
+                     "10\tB2\t10\t1.0",
+                     "10\tB6\t10\t1.0",
+                     "10\tB7\t10\t1.0");
+   
+    for(int i=1; i<=3; i++) {
+      int reservoirSize = i ;
+      PigTest test = createPigTestFromString(weightedReservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
+      test.runScript();
+      
+      List<Tuple> tuples = getLinesForAlias(test, "sampled");
+      
+      for (Tuple tuple : tuples)
+      {
+        Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
+      }
+    }
+  }
+  
+ /** 
+   
+ 
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1'); 
+
+  data = LOAD 'input' AS (v1:chararray,v2:INT);
+  data_g = group data all;
+  sampled = FOREACH data_g GENERATE WeightedSample(data);
+  --describe sampled; 
+   
+  STORE sampled INTO 'output'; 
+ 
+  */ 
+  @Multiline 
+  private String weightedSampleTest; 
+   
+  @Test 
+  public void weightedSampleTest() throws Exception 
+  {
+     Map<String, Integer> count = new HashMap<String, Integer>();
+
+     count.put("a", 0);
+     count.put("b", 0);
+     count.put("c", 0);
+     count.put("d", 0);
+
+     PigTest test = createPigTestFromString(weightedSampleTest); 
+ 
+     writeLinesToFile("input",  
+                "a\t100","b\t1","c\t5","d\t2");
+
+     for(int i = 0; i < 10; i++) {
+        test.runScript();
+
+        List<Tuple> output = this.getLinesForAlias(test, "sampled");
+
+        Tuple t = output.get(0);
+
+        DataBag sampleBag = (DataBag)t.get(0);
+
+        for(Iterator<Tuple> sampleIter = sampleBag.iterator(); sampleIter.hasNext();) {
+           Tuple st = sampleIter.next();
+           String key = (String)st.get(0);
+           count.put(key, count.get(key) + 1);
+        }              
+     }
+
+     String maxKey = "";
+     int maxCount = 0;
+     for(String key : count.keySet()) {
+        if(maxCount < count.get(key)) {
+           maxKey = key; 
+           maxCount = count.get(key);
+        } 
+     }
+
+     Assert.assertEquals(maxKey, "a");
+  }
+
+  @Test
+  public void weightedReservoirSampleAccumulateTest() throws IOException
+  {
+     WeightedReservoirSample sampler = new WeightedReservoirSample("10", "1");
+
+     for (int i=0; i<100; i++)
+     {
+       Tuple t = TupleFactory.getInstance().newTuple(2);
+       t.set(0, i);
+       t.set(1, i + 1);
+       DataBag bag = BagFactory.getInstance().newDefaultBag();
+       bag.add(t);
+       Tuple input = TupleFactory.getInstance().newTuple(bag);
+       sampler.accumulate(input);
+     }
+
+     DataBag result = sampler.getValue();
+     verifyNoRepeatAllFound(result, 10, 0, 100); 
+  }
+
+  @Test
+  public void weightedReservoirSampleAlgebraicTest() throws IOException
+  {
+    WeightedReservoirSample.Initial initialSampler = new WeightedReservoirSample.Initial("10", "1");
+    WeightedReservoirSample.Intermediate intermediateSampler = new WeightedReservoirSample.Intermediate("10", "1");
+    WeightedReservoirSample.Final finalSampler = new WeightedReservoirSample.Final("10", "1");
+    
+    DataBag bag = BagFactory.getInstance().newDefaultBag();
+    for (int i=0; i<100; i++)
+    {
+      Tuple t = TupleFactory.getInstance().newTuple(2);
+      t.set(0, i);
+      t.set(1, i + 1);
+      bag.add(t);
+    }
+    
+    Tuple input = TupleFactory.getInstance().newTuple(bag);
+    
+    Tuple intermediateTuple = initialSampler.exec(input);  
+    DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
+    intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));  
+    intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
+    DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
+    verifyNoRepeatAllFound(result, 10, 0, 100); 
+   }
+
+  private void verifyNoRepeatAllFound(DataBag result,
+                                      int expectedResultSize,
+                                      int left,
+                                      int right) throws ExecException
+  {
+    Assert.assertEquals(expectedResultSize, result.size());
+    
+    // all must be found, no repeats
+    Set<Integer> found = new HashSet<Integer>();
+    for (Tuple t : result)
+    {
+      Integer i = (Integer)t.get(0);
+      Assert.assertTrue(i>=left && i<right);
+      Assert.assertFalse(String.format("Found duplicate of %d",i), found.contains(i));
+      found.add(i);
+    }
+  }
+
+  @Test
+  public void weightedReservoirSampleLimitExecTest() throws IOException
+  {
+    WeightedReservoirSample sampler = new WeightedReservoirSample("100", "1");
+   
+    DataBag bag = BagFactory.getInstance().newDefaultBag();
+    for (int i=0; i<10; i++)
+    {
+      Tuple t = TupleFactory.getInstance().newTuple(2);
+      t.set(0, i);
+      t.set(1, 1); // score is equal for all
+      bag.add(t);
+    }
+   
+    Tuple input = TupleFactory.getInstance().newTuple(1);
+    input.set(0, bag);
+   
+    DataBag result = sampler.exec(input);
+   
+    verifyNoRepeatAllFound(result, 10, 0, 10); 
+
+    Set<Integer> found = new HashSet<Integer>();
+    for (Tuple t : result)
+    {
+      Integer i = (Integer)t.get(0);
+      found.add(i);
+    }
+
+    for(int i = 0; i < 10; i++)
+    {
+      Assert.assertTrue(found.contains(i));
+    }
+  }
+
+  @Test
+  public void invalidConstructorArgTest() throws Exception
+  {
+    try {
+         WeightedReservoirSample sampler = new WeightedReservoirSample("1", "-1");
+         Assert.fail( "Testcase should fail");
+    } catch (Exception ex) {
+         Assert.assertTrue(ex.getMessage().indexOf("Invalid negative index of weight field argument for WeightedReserviorSample constructor: -1") >= 0);
+    }
+  }
+
+  @Test
+  public void invalidWeightTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(weightedSampleTest);
+
+    writeLinesToFile("input",  
+                "a\t100","b\t1","c\t0","d\t2");
+    try {
+         test.runScript();
+         List<Tuple> output = this.getLinesForAlias(test, "sampled");
+         Assert.fail( "Testcase should fail");
+    } catch (Exception ex) {
+    }
+  }
+
+ /** 
+   
+ 
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1'); 
+
+  data = LOAD 'input' AS (v1:chararray);
+  data_g = group data all;
+  sampled = FOREACH data_g GENERATE WeightedSample(data);
+  describe sampled; 
+   
+  STORE sampled INTO 'output'; 
+ 
+  */ 
+  @Multiline 
+  private String invalidInputTupleSizeTest; 
+ 
+  @Test
+  public void invalidInputTupleSizeTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(invalidInputTupleSizeTest);
+
+    writeLinesToFile("input",  
+                "a","b","c","d");
+    try {
+         test.runScript();
+         List<Tuple> output = this.getLinesForAlias(test, "sampled");
+         Assert.fail( "Testcase should fail");
+    } catch (Exception ex) {
+         Assert.assertTrue(ex.getMessage().indexOf("The field schema of the input tuple is null or the tuple size is no more than the weight field index: 1") >= 0);
+    }
+  }
+
+ /** 
+   
+ 
+  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0'); 
+
+  data = LOAD 'input' AS (v1:chararray, v2:INT);
+  data_g = group data all;
+  sampled = FOREACH data_g GENERATE WeightedSample(data);
+  describe sampled; 
+   
+  STORE sampled INTO 'output'; 
+ 
+  */ 
+  @Multiline 
+  private String invalidWeightFieldSchemaTest; 
+ 
+  @Test
+  public void invalidWeightFieldSchemaTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(invalidWeightFieldSchemaTest);
+
+    writeLinesToFile("input",  
+                "a\t100","b\t1","c\t5","d\t2");
+    try {
+         test.runScript();
+         List<Tuple> output = this.getLinesForAlias(test, "sampled");
+         Assert.fail( "Testcase should fail");
+    } catch (Exception ex) {
+         Assert.assertTrue(ex.getMessage().indexOf("Expect the type of the weight field of the input tuple to be of ([int, long, float, double]), but instead found (chararray), weight field: 0") >= 0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java b/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
new file mode 100644
index 0000000..76ec1d3
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/sessions/SessionTests.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sessions;
+
+import static org.testng.Assert.*;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.pigunit.PigTest;
+import org.joda.time.DateTime;
+import org.testng.annotations.Test;
+
+import datafu.pig.sessions.SessionCount;
+import datafu.pig.sessions.Sessionize;
+import datafu.test.pig.PigTests;
+
+public class SessionTests extends PigTests
+{
+  /**
+  
+
+  define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
+  
+  views = LOAD 'input' AS (time:$TIME_TYPE, user_id:int, value:int);
+  
+  views_grouped = GROUP views BY user_id;
+  view_counts = FOREACH views_grouped {
+    views = ORDER views BY time;
+    GENERATE flatten(Sessionize(views)) as (time,user_id,value,session_id);
+  }
+  
+  max_value = GROUP view_counts BY (user_id, session_id);
+  
+  max_value = FOREACH max_value GENERATE group.user_id, MAX(view_counts.value) AS val;
+  
+  STORE max_value INTO 'output';
+   */
+  @Multiline private String sessionizeTest;
+  
+  private String[] inputData = new String[] {
+      "2010-01-01T01:00:00Z\t1\t10",
+      "2010-01-01T01:15:00Z\t1\t20",
+      "2010-01-01T01:31:00Z\t1\t10",
+      "2010-01-01T01:35:00Z\t1\t20",
+      "2010-01-01T02:30:00Z\t1\t30",
+
+      "2010-01-01T01:00:00Z\t2\t10",
+      "2010-01-01T01:31:00Z\t2\t20",
+      "2010-01-01T02:10:00Z\t2\t30",
+      "2010-01-01T02:40:30Z\t2\t40",
+      "2010-01-01T03:30:00Z\t2\t50",
+
+      "2010-01-01T01:00:00Z\t3\t10",
+      "2010-01-01T01:01:00Z\t3\t20",
+      "2010-01-01T01:02:00Z\t3\t5",
+      "2010-01-01T01:10:00Z\t3\t25",
+      "2010-01-01T01:15:00Z\t3\t50",
+      "2010-01-01T01:25:00Z\t3\t30",
+      "2010-01-01T01:30:00Z\t3\t15"  
+  };
+  
+  @Test
+  public void sessionizeTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(sessionizeTest,
+                                 "TIME_WINDOW=30m",
+                                 "TIME_TYPE=chararray");
+
+    this.writeLinesToFile("input", 
+                          inputData);
+    
+    test.runScript();
+    
+    HashMap<Integer,HashMap<Integer,Boolean>> userValues = new HashMap<Integer,HashMap<Integer,Boolean>>();
+    
+    for (Tuple t : this.getLinesForAlias(test, "max_value"))
+    {
+      Integer userId = (Integer)t.get(0);
+      Integer max = (Integer)t.get(1);
+      if (!userValues.containsKey(userId))
+      {
+        userValues.put(userId, new HashMap<Integer,Boolean>());
+      }
+      userValues.get(userId).put(max, true);
+    }
+    
+    assertEquals(userValues.get(1).size(), 2);
+    assertEquals(userValues.get(2).size(), 5);
+    assertEquals(userValues.get(3).size(), 1);    
+    
+    assertTrue(userValues.get(1).containsKey(20));
+    assertTrue(userValues.get(1).containsKey(30));
+    
+    assertTrue(userValues.get(2).containsKey(10));
+    assertTrue(userValues.get(2).containsKey(20));
+    assertTrue(userValues.get(2).containsKey(30));
+    assertTrue(userValues.get(2).containsKey(40));
+    assertTrue(userValues.get(2).containsKey(50));    
+
+    assertTrue(userValues.get(3).containsKey(50));
+  }
+  
+  private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+     
+  @Test
+  public void sessionizeLongTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(sessionizeTest,
+                                 "TIME_WINDOW=30m",
+                                 "TIME_TYPE=long");
+
+    List<String> lines = new ArrayList<String>();
+        
+    for (String line : inputData)
+    {
+      String[] parts = line.split("\t");
+      Assert.assertEquals(3, parts.length);
+      parts[0] = Long.toString(dateFormat.parse(parts[0]).getTime());
+      lines.add(StringUtils.join(parts,"\t"));
+    }
+    
+    this.writeLinesToFile("input", 
+                          lines.toArray(new String[]{}));
+    
+    test.runScript();
+    
+    HashMap<Integer,HashMap<Integer,Boolean>> userValues = new HashMap<Integer,HashMap<Integer,Boolean>>();
+    
+    for (Tuple t : this.getLinesForAlias(test, "max_value"))
+    {
+      Integer userId = (Integer)t.get(0);
+      Integer max = (Integer)t.get(1);
+      if (!userValues.containsKey(userId))
+      {
+        userValues.put(userId, new HashMap<Integer,Boolean>());
+      }
+      userValues.get(userId).put(max, true);
+    }
+    
+    assertEquals(userValues.get(1).size(), 2);
+    assertEquals(userValues.get(2).size(), 5);
+    
+    assertTrue(userValues.get(1).containsKey(20));
+    assertTrue(userValues.get(1).containsKey(30));
+    
+    assertTrue(userValues.get(2).containsKey(10));
+    assertTrue(userValues.get(2).containsKey(20));
+    assertTrue(userValues.get(2).containsKey(30));
+    assertTrue(userValues.get(2).containsKey(40));
+    assertTrue(userValues.get(2).containsKey(50));
+  }
+  
+  @Test
+  public void sessionizeExecTest() throws Exception
+  {
+    Sessionize sessionize = new Sessionize("30m");
+    Tuple input = TupleFactory.getInstance().newTuple(1);
+    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
+    input.set(0,inputBag);
+    
+    Tuple item;
+    List<Tuple> result;
+    DateTime dt;
+    
+    // test same session id
+    inputBag.clear();
+    dt = new DateTime();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.getMillis());
+    inputBag.add(item);
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.plusMinutes(28).getMillis());
+    inputBag.add(item);
+    result = toList(sessionize.exec(input));
+    
+    Assert.assertEquals(2, result.size());
+    Assert.assertEquals(2,result.get(0).size());
+    Assert.assertEquals(2,result.get(1).size());
+    // session ids match
+    Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1))); 
+    
+    // test different session id
+    inputBag.clear();
+    dt = new DateTime();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.getMillis());
+    inputBag.add(item);
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.plusMinutes(31).getMillis());
+    inputBag.add(item);
+    result = toList(sessionize.exec(input));
+    
+    Assert.assertEquals(2, result.size());
+    Assert.assertEquals(2,result.get(0).size());
+    Assert.assertEquals(2,result.get(1).size());
+    // session ids don't match
+    Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+  }
+  
+  @Test
+  public void sessionizeAccumulateTest() throws Exception
+  {
+    Sessionize sessionize = new Sessionize("30m");
+    Tuple input = TupleFactory.getInstance().newTuple(1);
+    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
+    input.set(0,inputBag);
+    
+    Tuple item;
+    List<Tuple> result;
+    DateTime dt;
+    
+    // test same session id
+    dt = new DateTime();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.getMillis());
+    inputBag.add(item);
+    sessionize.accumulate(input);
+    inputBag.clear();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.plusMinutes(28).getMillis());
+    inputBag.add(item);
+    sessionize.accumulate(input);
+    inputBag.clear();
+    result = toList(sessionize.getValue());
+    
+    Assert.assertEquals(2, result.size());
+    Assert.assertEquals(2,result.get(0).size());
+    Assert.assertEquals(2,result.get(1).size());
+    // session ids match
+    Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1))); 
+    
+    // test different session id
+    sessionize.cleanup();
+    dt = new DateTime();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.getMillis());
+    inputBag.add(item);
+    sessionize.accumulate(input);
+    inputBag.clear();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.plusMinutes(31).getMillis());
+    inputBag.add(item);
+    sessionize.accumulate(input);
+    inputBag.clear();
+    result = toList(sessionize.getValue());
+    
+    Assert.assertEquals(2, result.size());
+    Assert.assertEquals(2,result.get(0).size());
+    Assert.assertEquals(2,result.get(1).size());
+    // session ids don't match
+    Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
+    
+    sessionize.cleanup();
+    Assert.assertEquals(0,sessionize.getValue().size());
+  }
+  
+  private List<Tuple> toList(DataBag bag)
+  {
+    List<Tuple> result = new ArrayList<Tuple>();
+    for (Tuple t : bag)
+    {
+      result.add(t);
+    }
+    return result;
+  }
+  
+  /**
+  
+
+  define SessionCount datafu.pig.sessions.SessionCount('$TIME_WINDOW');
+  
+  views = LOAD 'input' AS (user_id:int, page_id:int, time:chararray);
+  
+  views_grouped = GROUP views BY (user_id, page_id);
+  view_counts = foreach views_grouped {
+    views = order views by time;
+    generate group.user_id as user_id, group.page_id as page_id, SessionCount(views.(time)) as count;
+  }
+  
+  STORE view_counts INTO 'output';
+   */
+  @Multiline
+  private String sessionCountPageViewsTest;
+  
+  @Test
+  public void sessionCountPageViewsTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(sessionCountPageViewsTest,
+                                 "TIME_WINDOW=30m");
+        
+    String[] input = {
+      "1\t100\t2010-01-01T01:00:00Z",
+      "1\t100\t2010-01-01T01:15:00Z",
+      "1\t100\t2010-01-01T01:31:00Z",
+      "1\t100\t2010-01-01T01:35:00Z",
+      "1\t100\t2010-01-01T02:30:00Z",
+
+      "1\t101\t2010-01-01T01:00:00Z",
+      "1\t101\t2010-01-01T01:31:00Z",
+      "1\t101\t2010-01-01T02:10:00Z",
+      "1\t101\t2010-01-01T02:40:30Z",
+      "1\t101\t2010-01-01T03:30:00Z",      
+
+      "1\t102\t2010-01-01T01:00:00Z",
+      "1\t102\t2010-01-01T01:01:00Z",
+      "1\t102\t2010-01-01T01:02:00Z",
+      "1\t102\t2010-01-01T01:10:00Z",
+      "1\t102\t2010-01-01T01:15:00Z",
+      "1\t102\t2010-01-01T01:25:00Z",
+      "1\t102\t2010-01-01T01:30:00Z"
+    };
+    
+    String[] output = {
+        "(1,100,2)",
+        "(1,101,5)",
+        "(1,102,1)"
+      };
+    
+    test.assertOutput("views",input,"view_counts",output);
+  }
+  
+  @Test
+  public void sessionCountExecTest() throws Exception
+  {
+    SessionCount sessionize = new SessionCount("30m");
+    Tuple input = TupleFactory.getInstance().newTuple(1);
+    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
+    input.set(0,inputBag);
+    
+    Tuple item;
+    DateTime dt;
+    
+    // test same session id
+    inputBag.clear();
+    dt = new DateTime();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.getMillis());
+    inputBag.add(item);
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.plusMinutes(28).getMillis());
+    inputBag.add(item);
+    Assert.assertEquals(1L,sessionize.exec(input).longValue()); 
+    
+    // test different session id
+    inputBag.clear();
+    dt = new DateTime();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.getMillis());
+    inputBag.add(item);
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.plusMinutes(31).getMillis());
+    inputBag.add(item);
+    Assert.assertEquals(2L,sessionize.exec(input).longValue());
+  }
+  
+  @Test
+  public void sessionCountAccumulateTest() throws Exception
+  {
+    SessionCount sessionize = new SessionCount("30m");
+    Tuple input = TupleFactory.getInstance().newTuple(1);
+    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
+    input.set(0,inputBag);
+    
+    Tuple item;
+    DateTime dt;
+    
+    // test same session id
+    dt = new DateTime();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.getMillis());
+    inputBag.add(item);
+    sessionize.accumulate(input);
+    inputBag.clear();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.plusMinutes(28).getMillis());
+    inputBag.add(item);
+    sessionize.accumulate(input);
+    inputBag.clear();
+    Assert.assertEquals(1L,sessionize.getValue().longValue()); 
+    
+    // test different session id
+    sessionize.cleanup();
+    dt = new DateTime();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.getMillis());
+    inputBag.add(item);
+    sessionize.accumulate(input);
+    inputBag.clear();
+    item = TupleFactory.getInstance().newTuple(1);
+    item.set(0, dt.plusMinutes(31).getMillis());
+    inputBag.add(item);
+    sessionize.accumulate(input);
+    inputBag.clear();
+    Assert.assertEquals(2L,sessionize.exec(input).longValue());
+    
+    sessionize.cleanup();
+    Assert.assertEquals(0,sessionize.getValue().longValue());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/sets/SetTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/sets/SetTests.java b/datafu-pig/src/test/java/datafu/test/pig/sets/SetTests.java
new file mode 100644
index 0000000..9b24797
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/sets/SetTests.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sets;
+
+import java.util.Arrays;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import datafu.pig.sets.SetIntersect;
+import datafu.test.pig.PigTests;
+
+public class SetTests extends PigTests
+{
+  /**
+  
+
+  define SetIntersect datafu.pig.sets.SetIntersect();
+  
+  data = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
+  
+  data2 = FOREACH data GENERATE SetIntersect(B1,B2);
+  
+  STORE data2 INTO 'output';
+   */
+  @Multiline
+  private String setIntersectTest;
+  
+  @Test
+  public void setIntersectTest() throws Exception
+  {    
+    PigTest test = createPigTestFromString(setIntersectTest);    
+    
+    writeLinesToFile("input", 
+                     "{(1,10),(2,20),(3,30),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}",
+                     "{(1,10),(1,10),(2,20),(3,30),(3,30),(4,40),(4,40)}\t{(1,10),(3,30)}");
+                  
+    test.runScript();
+            
+    assertOutput(test, "data2",
+                 "({(2,20),(4,40)})",
+                 "({(1,10),(3,30)})");
+  }
+  
+  /**
+  
+
+  define SetIntersect datafu.pig.sets.SetIntersect();
+  
+  docs = LOAD 'docs' USING PigStorage(',') AS (id:int, line:chararray);
+  B = FOREACH docs GENERATE id, line;
+  C = FOREACH B GENERATE id, TOKENIZE(line) as gu;
+  
+  filtered = FOREACH C {
+    uniq = DISTINCT gu;
+    GENERATE id, uniq;
+  }
+    
+  query = LOAD 'query' AS (line_query:chararray);
+  bag_query = FOREACH query GENERATE TOKENIZE(line_query) AS query;
+  -- sort the bag of tokens, since SetIntersect requires it
+  bag_query = FOREACH bag_query {
+    query_sorted = ORDER query BY token;
+    GENERATE query_sorted;
+  }
+    
+  result = FOREACH filtered {
+    -- sort the tokens, since SetIntersect requires it
+    tokens_sorted = ORDER uniq BY token;
+    GENERATE id, 
+             SIZE(SetIntersect(tokens_sorted,bag_query.query_sorted)) as cnt;
+  }
+    
+  DUMP result;
+  
+   */
+  @Multiline
+  private String setIntersectTestExample;
+  
+  @Test
+  public void setIntersectTestExample() throws Exception
+  {    
+    PigTest test = createPigTestFromString(setIntersectTestExample);    
+    
+    // document with words to check for
+    writeLinesToFile("docs", 
+                     "1,word1 word4 word2 word1",
+                     "2,word2 word6 word1 word5 word3 word7",
+                     "3,word1 word3 word4 word5");
+    
+    // query to apply to document
+    writeLinesToFile("query", 
+                     "word2 word7 word5");
+                  
+    test.runScript();
+    
+    assertOutput(test, "result",
+                 "(1,1)",
+                 "(2,3)",
+                 "(3,1)");
+  }
+  
+  @Test
+  public void setIntersectEmptyBagsTest() throws Exception
+  { 
+    PigTest test = createPigTestFromString(setIntersectTest);    
+    
+    writeLinesToFile("input", 
+                     "{(1,10),(2,20),(3,30),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}",
+                     "{(1,10),(1,10),(2,20),(3,30),(3,30),(4,40),(4,40)}\t{(100,10),(300,30)}",
+                     "{(1,10),(2,20)}\t{(1,10),(2,20)}",
+                     "{(1,10),(2,20)}\t{}",
+                     "{}\t{(1,10),(2,20)}",
+                     "{}\t{}");
+                  
+    test.runScript();
+            
+    assertOutput(test, "data2",
+                 "({(2,20),(4,40)})",
+                 "({})",
+                 "({(1,10),(2,20)})",
+                 "({})",
+                 "({})",
+                 "({})");
+  }
+  
+  @Test
+  public void testIntersectWithNullTuples() throws Exception {
+     DataBag one = BagFactory.getInstance().newDefaultBag();
+     DataBag two = BagFactory.getInstance().newDefaultBag();
+
+     Tuple input = TupleFactory.getInstance().newTuple(Arrays.asList(one, two));
+     DataBag output = new SetIntersect().exec(input);
+     Assert.assertEquals(0, output.size());
+  }
+
+  @Test(expectedExceptions=org.apache.pig.impl.logicalLayer.FrontendException.class)
+  public void setIntersectOutOfOrderTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(setIntersectTest);
+    
+    this.writeLinesToFile("input", 
+                          "{(1,10),(3,30),(2,20),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}");
+        
+    test.runScript();
+    
+    this.getLinesForAlias(test, "data2");
+  }
+  
+  /**
+  
+
+  define SetUnion datafu.pig.sets.SetUnion();
+  
+  data = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
+  
+  --dump data
+  
+  data2 = FOREACH data GENERATE SetUnion(B1,B2) AS C;
+  data2 = FOREACH data2 {
+    C = ORDER C BY val1 ASC, val2 ASC;
+    generate C;
+  }
+  
+  --dump data2
+  
+  STORE data2 INTO 'output';
+   */
+  @Multiline
+  private String setUnionTest;
+  
+  @Test
+  public void setUnionTest() throws Exception
+  {    
+    PigTest test = createPigTestFromString(setUnionTest);    
+    
+    writeLinesToFile("input", 
+                     "{(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)}\t{(1,1),(1,20),(1,25),(1,25),(1,25),(1,40),(1,70),(1,80)}");
+                  
+    test.runScript();
+            
+    assertOutput(test, "data2",
+                 "({(1,1),(1,10),(1,20),(1,25),(1,30),(1,40),(1,50),(1,60),(1,70),(1,80)})");
+  }
+  
+  @Test
+  public void setUnionEmptyBagsTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(setUnionTest);    
+    
+    writeLinesToFile("input", 
+                     "{(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)}\t{}",
+                     "{}\t{(1,10),(1,20),(1,30),(1,40),(1,50)}",
+                     "{}\t{}");
+                  
+    test.runScript();
+            
+    assertOutput(test, "data2",
+                 "({(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)})",
+                 "({(1,10),(1,20),(1,30),(1,40),(1,50)})",
+                 "({})");
+  }
+  
+  /**
+  
+
+  define SetDifference datafu.pig.sets.SetDifference();
+  
+  data = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)});
+  
+  data2 = FOREACH data GENERATE SetDifference(B1,B2);
+  
+  STORE data2 INTO 'output';
+   */
+  @Multiline
+  private String setDifferenceTwoBagsTest;
+  
+  @Test
+  public void setDifferenceTwoBagsTest() throws Exception
+  {    
+    PigTest test = createPigTestFromString(setDifferenceTwoBagsTest);    
+    
+    writeLinesToFile("input", 
+                     "{(1),(2),(3)}\t",
+                     "{(1),(2),(3)}\t{}",
+                     "\t{(1),(2),(3)}",
+                     "{}\t{(1),(2),(3)}",
+                     "{(1),(2),(3)}\t{(1)}",
+                     "{(1),(2),(3)}\t{(1),(2)}",
+                     "{(1),(2),(3)}\t{(1),(2),(3)}",
+                     "{(1),(2),(3)}\t{(1),(2),(3),(4)}",
+                     "{(1),(2),(3),(4),(5),(6)}\t{(3),(4)}");
+                  
+    test.runScript();
+            
+    assertOutput(test, "data2",
+                 "({(1),(2),(3)})",
+                 "({(1),(2),(3)})",
+                 "({})",
+                 "({})",
+                 "({(2),(3)})",
+                 "({(3)})",
+                 "({})",
+                 "({})",
+                 "({(1),(2),(5),(6)})");
+  }
+  
+  /**
+  
+
+  define SetDifference datafu.pig.sets.SetDifference();
+  
+  data = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)},B3:bag{T:tuple(val:int)});
+  
+  data2 = FOREACH data GENERATE SetDifference(B1,B2,B3);
+  
+  STORE data2 INTO 'output';
+   */
+  @Multiline
+  private String setDifferenceThreeBagsTest;
+  
+  @Test
+  public void setDifferenceThreeBagsTest() throws Exception
+  {    
+    PigTest test = createPigTestFromString(setDifferenceThreeBagsTest);    
+    
+    writeLinesToFile("input", 
+                     "{(1),(2),(3)}\t\t",
+                     "{(1),(2),(3)}\t{}\t",
+                     "{(1),(2),(3)}\t\t{}",
+                     "{(1),(2),(3)}\t{}\t{}",
+                     "{(1),(2),(2),(2),(3),(3)}\t{}\t{}",
+                     
+                     "{(1),(2),(3)}\t{(2)}\t{}",
+                     "{(1),(2),(3)}\t{}\t{(2)}",
+                     
+                     "{(1),(2),(3)}\t{(2)}\t{(3)}",
+                     
+                     "{(1),(2),(3)}\t{(2),(2)}\t{(3),(3)}",
+                     "{(1),(1),(1),(2),(2),(3),(3),(3)}\t{(2),(2)}\t{(3),(3)}",
+                     "{(1),(2),(3)}\t{(0),(2)}\t{(3)}",
+                     
+                     "{(1),(2),(3)}\t{(0),(2)}\t{(1),(3)}",
+                     "{(1),(2),(3)}\t{(0),(2)}\t{(1),(3),(4)}");
+                  
+    test.runScript();
+            
+    assertOutput(test, "data2",
+                 "({(1),(2),(3)})",
+                 "({(1),(2),(3)})",
+                 "({(1),(2),(3)})",
+                 "({(1),(2),(3)})",
+                 "({(1),(2),(3)})",
+
+                 "({(1),(3)})",
+                 "({(1),(3)})",
+                 
+                 "({(1)})",
+                 
+                 "({(1)})",
+                 "({(1)})",
+                 "({(1)})",
+                 
+                 "({})",
+                 "({})");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/stats/EstimationTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/stats/EstimationTests.java b/datafu-pig/src/test/java/datafu/test/pig/stats/EstimationTests.java
new file mode 100644
index 0000000..174da84
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/stats/EstimationTests.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.stats;
+
+import java.util.List;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+import static org.testng.Assert.*;
+
+public class EstimationTests extends PigTests
+{
+  /**
+  
+  
+  define HyperLogLogPlusPlus datafu.pig.stats.HyperLogLogPlusPlus();
+  
+  data_in = LOAD 'input' as (val:int);
+    
+  data_out = FOREACH (GROUP data_in ALL) GENERATE
+    HyperLogLogPlusPlus(data_in) as cardinality;
+  
+  data_out = FOREACH data_out GENERATE cardinality;
+    
+  STORE data_out into 'output';
+   */
+  @Multiline private String hyperLogLogTest;
+  
+  @Test
+  public void hyperLogLogTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(hyperLogLogTest);
+
+    int count = 1000000;
+    String[] input = new String[count];
+    for (int i=0; i<count; i++)
+    {
+      input[i] = Integer.toString(i*10);
+    }
+    
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    double error = Math.abs(count-((Long)output.get(0).get(0)))/(double)count;
+    System.out.println("error: " + error*100.0 + "%");
+    assertTrue(error < 0.01);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/stats/MarkovPairTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/stats/MarkovPairTests.java b/datafu-pig/src/test/java/datafu/test/pig/stats/MarkovPairTests.java
new file mode 100644
index 0000000..5d701a7
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/stats/MarkovPairTests.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.stats;
+
+import static org.testng.Assert.*;
+
+import java.util.Iterator;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class MarkovPairTests extends PigTests
+{
+  /**
+  
+
+  define markovPairs datafu.pig.stats.MarkovPairs();
+  
+  data = load 'input' as $schema;
+  --describe data;
+  
+  data_out1 = foreach data generate data as orig_bag;
+  --describe data_out1;
+  
+  data_out = foreach data_out1 generate markovPairs(orig_bag) as markov_bag;
+  --describe data_out;
+  
+  store data_out into 'output';
+   */
+  @Multiline private String markovPairDefault;
+  
+  @Test
+  public void markovPairDefaultTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(markovPairDefault,
+                                 "schema=(data: bag {t: tuple(val:int)})");
+    
+    writeLinesToFile("input", "{(10),(20),(30),(40),(50),(60)}");
+    
+    String[] expectedOutput = {
+        "({((10),(20)),((20),(30)),((30),(40)),((40),(50)),((50),(60))})"
+      };
+    
+    test.runScript();
+    
+    Iterator<Tuple> actualOutput = test.getAlias("data_out");
+    
+    assertTuplesMatch(expectedOutput, actualOutput);
+  }
+  
+  @Test
+  public void markovPairMultipleInput() throws Exception
+  {    
+    PigTest test = createPigTestFromString(markovPairDefault,
+                                 "schema=(data: bag {t: tuple(val1:int,val2:int)})");
+    
+    writeLinesToFile("input", "{(10,100),(20,200),(30,300),(40,400),(50,500),(60,600)}");
+    
+    String[] expectedOutput = {
+        "({((10,100),(20,200)),((20,200),(30,300)),((30,300),(40,400)),((40,400),(50,500)),((50,500),(60,600))})"
+      };    
+    
+    
+    test.runScript();
+    
+    Iterator<Tuple> actualOutput = test.getAlias("data_out");
+    
+    assertTuplesMatch(expectedOutput, actualOutput);
+  }
+  
+  /**
+  
+
+  define markovPairs datafu.pig.stats.MarkovPairs('$lookahead');
+  
+  data = load 'input' as $schema;
+  --describe data;
+  
+  data_out1 = foreach data generate data as orig_bag;
+  --describe data_out1;
+  
+  data_out = foreach data_out1 generate markovPairs(orig_bag) as markov_bag;
+  --describe data_out;
+  
+  store data_out into 'output';
+   */
+  @Multiline private String markovPairLookahead;
+  
+  @Test
+  public void markovPairLookaheadTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(markovPairLookahead, 
+                                 "schema=(data: bag {t: tuple(val:int)})",
+                                 "lookahead=3");
+    
+    writeLinesToFile("input", "{(10),(20),(30),(40),(50)}");
+    
+    String[] expectedOutput = {
+        "({((10),(20)),((10),(30)),((10),(40)),((20),(30)),((20),(40)),((20),(50)),((30),(40)),((30),(50)),((40),(50))})"
+      };
+    
+    test.runScript();
+    
+    Iterator<Tuple> actualOutput = test.getAlias("data_out");
+    
+    assertTuplesMatch(expectedOutput, actualOutput);
+  }
+  
+  private void assertTuplesMatch(String[] expectedOutput, Iterator<Tuple> actualOutput)
+  {
+    Iterator<Tuple> tuples = actualOutput;
+    
+    for (String outputLine : expectedOutput)
+    {
+      assertTrue(tuples.hasNext());
+      Tuple outputTuple = tuples.next();
+      System.out.println(String.format("expected: %s", outputLine));
+      System.out.println(String.format("actual: %s", outputTuple.toString()));
+      assertEquals(outputLine,outputTuple.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/datafu-pig/src/test/java/datafu/test/pig/stats/QuantileTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/stats/QuantileTests.java b/datafu-pig/src/test/java/datafu/test/pig/stats/QuantileTests.java
new file mode 100644
index 0000000..b1940db
--- /dev/null
+++ b/datafu-pig/src/test/java/datafu/test/pig/stats/QuantileTests.java
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.stats;
+
+import static org.testng.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.stats.QuantileUtil;
+import datafu.pig.stats.StreamingQuantile;
+import datafu.test.pig.PigTests;
+
+public class QuantileTests  extends PigTests
+{
+  /**
+  
+  
+  define Quantile datafu.pig.stats.Quantile($QUANTILES);
+  
+  data_in = LOAD 'input' as (val:int);
+  
+  --describe data_in;
+  
+  data_out = GROUP data_in ALL;
+  
+  --describe data_out;
+  
+  data_out = FOREACH data_out {
+    sorted = ORDER data_in BY val;
+    GENERATE Quantile(sorted) as quantiles;
+  }
+  data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
+  
+  --describe data_out;
+  
+  STORE data_out into 'output';
+   */
+  @Multiline private String quantileTest;
+  
+  @Test
+  public void quantileTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileTest,
+                                 "QUANTILES='0.0','0.25','0.5','0.75','1.0'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,3.0,5.5,8.0,10.0)");
+  }
+  
+  @Test
+  public void quantile2Test() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileTest,
+                                 "QUANTILES='5'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,3.0,5.5,8.0,10.0)");
+  }
+
+  @Test
+  public void quantile3Test() throws Exception {
+    PigTest test = createPigTestFromString(quantileTest,
+                                 "QUANTILES='0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987'");
+
+    List<String> input = new ArrayList<String>();
+    for (int i=100000; i>=0; i--)
+    {
+      input.add(Integer.toString(i));
+    }
+    
+    writeLinesToFile("input", input.toArray(new String[0]));
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(130.0,2280.0,15870.0,50000.0,84130.0,97720.0,99870.0)");
+  }
+  
+  @Test
+  public void quantile4aTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileTest,
+                                 "QUANTILES='4'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
+  }
+  
+  @Test
+  public void quantile4bTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileTest,
+                                 "QUANTILES='0.0','0.333','0.666','1.0'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
+  }
+  
+  @Test
+  public void quantile5aTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileTest,
+                                 "QUANTILES='10'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
+  }
+  
+  @Test
+  public void quantile5bTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileTest,
+                                 "QUANTILES='0.0','0.111','0.222','0.333','0.444','0.555','0.666','0.777','0.888','1.0'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
+  }
+  
+  /**
+  
+
+  define Median datafu.pig.stats.Median();
+  
+  data_in = LOAD 'input' as (val:int);
+  
+  --describe data_in;
+  
+  data_out = GROUP data_in ALL;
+  
+  --describe data_out;
+  
+  data_out = FOREACH data_out {
+    sorted = ORDER data_in BY val;
+    GENERATE Median(sorted) as medians;
+  }
+  data_out = FOREACH data_out GENERATE FLATTEN(medians);
+  
+  --describe data_out;
+  
+  STORE data_out into 'output';
+   */
+  @Multiline private String medianTest;
+  
+  @Test
+  public void medianTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(medianTest);
+
+    String[] input = {"4","5","6","9","10","7","8","2","3","1"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(5.5)");
+  }
+  
+  /**
+  
+
+  define Median datafu.pig.stats.StreamingMedian();
+  
+  data_in = LOAD 'input' as (val:int);
+  
+  --describe data_in;
+  
+  data_out = GROUP data_in ALL;
+  
+  --describe data_out;
+  
+  data_out = FOREACH data_out {
+    GENERATE Median(data_in) as medians;
+  }
+  data_out = FOREACH data_out GENERATE FLATTEN(medians);
+  
+  --describe data_out;
+  
+  STORE data_out into 'output';
+   */
+  @Multiline private String streamingMedianTest;
+  
+  @Test
+  public void streamingMedianTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(streamingMedianTest);
+
+    String[] input = {"0","4","5","6","9","10","7","8","2","3","1"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(5.0)");
+  }
+  
+  /**
+  
+
+  define Quantile datafu.pig.stats.StreamingQuantile($QUANTILES);
+  
+  data_in = LOAD 'input' as (val:int);
+  
+  --describe data_in;
+  
+  data_out = GROUP data_in ALL;
+  
+  --describe data_out;
+  
+  data_out = FOREACH data_out GENERATE Quantile(data_in.val) as quantiles;
+  data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
+  
+  --describe data_out;
+  
+  STORE data_out into 'output';
+   */
+  @Multiline private String streamingQuantileTest;
+
+  @Test
+  public void streamingQuantileTest() throws Exception {
+    PigTest test = createPigTestFromString(streamingQuantileTest,
+                                 "QUANTILES='5'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0,8.0,10.0)");
+  }
+  
+  @Test
+  public void streamingQuantile2Test() throws Exception {
+    PigTest test = createPigTestFromString(streamingQuantileTest,
+                                 "QUANTILES='0.5','0.75','1.0'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(5.0,8.0,10.0)");
+  }
+  
+  @Test
+  public void streamingQuantile3Test() throws Exception {
+    PigTest test = createPigTestFromString(streamingQuantileTest,
+                                 "QUANTILES='0.07','0.03','0.37','1.0','0.0'");
+
+    List<String> input = new ArrayList<String>();
+    for (int i=1000; i>=1; i--)
+    {
+      input.add(Integer.toString(i));
+    }
+    
+    writeLinesToFile("input", input.toArray(new String[0]));
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(70.0,30.0,370.0,1000.0,1.0)");
+  }
+  
+  @Test
+  public void streamingQuantile4Test() throws Exception {
+    PigTest test = createPigTestFromString(streamingQuantileTest,
+                                 "QUANTILES='0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987'");
+
+    List<String> input = new ArrayList<String>();
+    for (int i=100000; i>=0; i--)
+    {
+      input.add(Integer.toString(i));
+    }
+    
+    writeLinesToFile("input", input.toArray(new String[0]));
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(130.0,2280.0,15870.0,50000.0,84130.0,97720.0,99870.0)");
+  }
+  
+  @Test
+  public void streamingQuantile5aTest() throws Exception {
+    PigTest test = createPigTestFromString(streamingQuantileTest,
+                                 "QUANTILES='10'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
+  }
+  
+  @Test
+  public void streamingQuantile5bTest() throws Exception {
+    PigTest test = createPigTestFromString(streamingQuantileTest,
+                                 "QUANTILES='0.0','0.111','0.222','0.333','0.444','0.555','0.666','0.777','0.888','1.0'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
+  }
+  
+  @Test
+  public void streamingQuantile6Test() throws Exception {
+    PigTest test = createPigTestFromString(streamingQuantileTest,
+                                 "QUANTILES='0.0','0.333','0.666','1.0'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
+  }
+  
+  @Test
+  public void streamingQuantile7Test() throws Exception {
+    PigTest test = createPigTestFromString(streamingQuantileTest,
+                                 "QUANTILES='4'");
+
+    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
+  }
+  
+  @Test
+  public void streamingQuantileExecTest() throws Exception {
+    StreamingQuantile quantile = new StreamingQuantile("4");
+    
+    DataBag bag;
+    Tuple input;
+    Tuple result;
+    
+    bag = BagFactory.getInstance().newDefaultBag();
+    for (int i=1; i<=10; i++)
+    {
+      Tuple t = TupleFactory.getInstance().newTuple(1);
+      t.set(0, i);
+      bag.add(t);
+    }
+    input = TupleFactory.getInstance().newTuple(bag);
+    result = quantile.exec(input);
+    Assert.assertEquals(4, result.size());
+    Assert.assertEquals(1.0, result.get(0));
+    Assert.assertEquals(4.0, result.get(1));
+    Assert.assertEquals(7.0, result.get(2));
+    Assert.assertEquals(10.0, result.get(3));
+    
+    // do twice to check cleanup works
+    
+    bag = BagFactory.getInstance().newDefaultBag();
+    for (int i=11; i<=20; i++)
+    {
+      Tuple t = TupleFactory.getInstance().newTuple(1);
+      t.set(0, i);
+      bag.add(t);
+    }
+    input = TupleFactory.getInstance().newTuple(bag);
+    result = quantile.exec(input);
+    Assert.assertEquals(4, result.size());
+    Assert.assertEquals(11.0, result.get(0));
+    Assert.assertEquals(14.0, result.get(1));
+    Assert.assertEquals(17.0, result.get(2));
+    Assert.assertEquals(20.0, result.get(3));
+  }
+  
+  @Test
+  public void streamingQuantileAccumulateTest() throws Exception {
+    StreamingQuantile quantile = new StreamingQuantile("4");
+    
+    Tuple result;    
+    
+    for (int i=1; i<=10; i++)
+    {
+      Tuple t = TupleFactory.getInstance().newTuple(1);
+      t.set(0, i);
+      DataBag bag = BagFactory.getInstance().newDefaultBag();
+      bag.add(t);
+      Tuple input = TupleFactory.getInstance().newTuple(bag);
+      quantile.accumulate(input);
+    }
+    result = quantile.getValue();
+    Assert.assertEquals(4, result.size());
+    Assert.assertEquals(1.0, result.get(0));
+    Assert.assertEquals(4.0, result.get(1));
+    Assert.assertEquals(7.0, result.get(2));
+    Assert.assertEquals(10.0, result.get(3));
+    
+    // do twice to check cleanup works
+    quantile.cleanup();
+    
+    for (int i=11; i<=20; i++)
+    {
+      Tuple t = TupleFactory.getInstance().newTuple(1);
+      t.set(0, i);
+      DataBag bag = BagFactory.getInstance().newDefaultBag();
+      bag.add(t);
+      Tuple input = TupleFactory.getInstance().newTuple(bag);
+      quantile.accumulate(input);
+    }
+    result = quantile.getValue();
+    Assert.assertEquals(4, result.size());
+    Assert.assertEquals(11.0, result.get(0));
+    Assert.assertEquals(14.0, result.get(1));
+    Assert.assertEquals(17.0, result.get(2));
+    Assert.assertEquals(20.0, result.get(3));
+  }
+  
+  @Test
+  public void quantileParamsTest() throws Exception {
+    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("5");
+    
+    assertEquals(quantiles.size(),5);
+    assertAboutEqual(quantiles.get(0), 0.0);
+    assertAboutEqual(quantiles.get(1), 0.25);
+    assertAboutEqual(quantiles.get(2), 0.5);
+    assertAboutEqual(quantiles.get(3), 0.75);
+    assertAboutEqual(quantiles.get(4), 1.0);
+  }
+  
+  @Test
+  public void quantileParamsTest2() throws Exception {
+    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("2");
+    
+    assertEquals(quantiles.size(),2);
+    assertAboutEqual(quantiles.get(0), 0.0);
+    assertAboutEqual(quantiles.get(1), 1.0);
+  }
+  
+  @Test
+  public void quantileParamsTest3() throws Exception {
+    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("11");
+    
+    assertEquals(quantiles.size(),11);
+    assertAboutEqual(quantiles.get(0), 0.0);
+    assertAboutEqual(quantiles.get(1), 0.1);
+    assertAboutEqual(quantiles.get(2), 0.2);
+    assertAboutEqual(quantiles.get(3), 0.3);
+    assertAboutEqual(quantiles.get(4), 0.4);
+    assertAboutEqual(quantiles.get(5), 0.5);
+    assertAboutEqual(quantiles.get(6), 0.6);
+    assertAboutEqual(quantiles.get(7), 0.7);
+    assertAboutEqual(quantiles.get(8), 0.8);
+    assertAboutEqual(quantiles.get(9), 0.9);
+    assertAboutEqual(quantiles.get(10), 1.0);
+  }
+  
+  @Test
+  public void quantileParamsTest4() throws Exception {
+    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("10");
+    
+    assertEquals(quantiles.size(),10);
+    assertAboutEqual(quantiles.get(0), 0.0);
+    assertAboutEqual(quantiles.get(1), 0.11111);
+    assertAboutEqual(quantiles.get(2), 0.22222);
+    assertAboutEqual(quantiles.get(3), 0.33333);
+    assertAboutEqual(quantiles.get(4), 0.44444);
+    assertAboutEqual(quantiles.get(5), 0.55555);
+    assertAboutEqual(quantiles.get(6), 0.66666);
+    assertAboutEqual(quantiles.get(7), 0.77777);
+    assertAboutEqual(quantiles.get(8), 0.88888);
+    assertAboutEqual(quantiles.get(9), 1.0);
+  }
+  
+  @Test
+  public void quantileParamsTest5() throws Exception {
+    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("4");
+    
+    assertEquals(quantiles.size(),4);
+    assertAboutEqual(quantiles.get(0), 0.0);
+    assertAboutEqual(quantiles.get(1), 0.333);
+    assertAboutEqual(quantiles.get(2), 0.666);
+    assertAboutEqual(quantiles.get(3), 1.0);
+  }
+  
+  private void assertAboutEqual(double actual, double expected) {
+    assertTrue(Math.abs(actual-expected) < 0.001);
+  }
+  
+  /**
+  
+  
+  define Quantile datafu.pig.stats.$UDF($QUANTILES);
+  
+  data_in = LOAD 'input' as (val:int);
+  
+  --describe data_in;
+  
+  data_out = GROUP data_in ALL;
+  
+  --describe data_out;
+  
+  data_out = FOREACH data_out {
+    sorted = ORDER data_in BY val;
+    GENERATE Quantile(sorted) as quantiles;
+  }
+  data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
+  
+  --describe data_out;
+  
+  data_out = FOREACH data_out GENERATE $EXPECTED_OUTPUT;
+  
+  STORE data_out into 'output';
+   */
+  @Multiline private String quantileSchemaTest;
+  
+  @Test
+  public void quantileSchemaTest() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileSchemaTest,
+                                 "UDF=Quantile",
+                                 "QUANTILES='0.0','0.5','1.0'",
+                                 "EXPECTED_OUTPUT=quantiles::quantile_0_0, "+
+                                                 "quantiles::quantile_0_5, "+
+                                                 "quantiles::quantile_1_0");
+
+    String[] input = {"1","5","3","4","2"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
+  }
+  
+  @Test
+  public void quantileSchemaTest2() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileSchemaTest,
+                                 "UDF=Quantile", 
+                                 "QUANTILES='3'",
+                                 "EXPECTED_OUTPUT=quantiles::quantile_0, "+
+                                                 "quantiles::quantile_1, "+
+                                                 "quantiles::quantile_2");
+
+    String[] input = {"1","5","3","4","2"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
+  }
+  
+  @Test
+  public void quantileSchemaTest3() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileSchemaTest,
+                                 "UDF=StreamingQuantile", 
+                                 "QUANTILES='0.0','0.5','1.0'",
+                                 "EXPECTED_OUTPUT=quantiles::quantile_0_0, "+
+                                                 "quantiles::quantile_0_5, "+
+                                                 "quantiles::quantile_1_0");
+
+    String[] input = {"1","5","3","4","2"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
+  }
+  
+  @Test
+  public void quantileSchemaTest4() throws Exception
+  {
+    PigTest test = createPigTestFromString(quantileSchemaTest,
+                                 "UDF=StreamingQuantile", 
+                                 "QUANTILES='3'",
+                                 "EXPECTED_OUTPUT=quantiles::quantile_0, "+
+                                                 "quantiles::quantile_1, "+
+                                                 "quantiles::quantile_2");
+
+    String[] input = {"1","5","3","4","2"};
+    writeLinesToFile("input", input);
+        
+    test.runScript();
+    
+    List<Tuple> output = getLinesForAlias(test, "data_out", true);
+    
+    assertEquals(output.size(),1);
+    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
+  }
+}


Mime
View raw message