datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wvaug...@apache.org
Subject [03/19] DATAFU-27 Migrate build system to Gradle
Date Tue, 04 Mar 2014 07:09:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
deleted file mode 100644
index bd34881..0000000
--- a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sampling;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.sampling.SimpleRandomSample;
-import datafu.test.pig.PigTests;
-
-/**
- * Tests for {@link SimpleRandomSample}.
- * 
- * @author ximeng
- * 
- */
-public class SimpleRandomSampleTest extends PigTests
-{
-  /**
-   * register $JAR_PATH
-   * 
-   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
-   * 
-   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
-   * 
-   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p) as sample_data;
-   * 
-   * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
-   * 
-   * STORE sampled INTO 'output';
-   */
-  @Multiline
-  private String simpleRandomSampleTest;
-
-  /**
-   * register $JAR_PATH
-   * 
-   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
-   * 
-   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
-   * 
-   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p, $n1) as sample_data;
-   * 
-   * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
-   * 
-   * STORE sampled INTO 'output';
-   */
-  @Multiline
-  private String simpleRandomSampleWithLowerBoundTest;
-
-  /**
-   * register $JAR_PATH
-   * 
-   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
-   * 
-   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
-   * 
-   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p1) as sample_1, SRS(data,
-   * $p2) AS sample_2;
-   * 
-   * sampled = FOREACH sampled GENERATE COUNT(sample_1) AS sample_count_1, COUNT(sample_2)
-   * AS sample_count_2;
-   * 
-   * STORE sampled INTO 'output';
-   */
-  @Multiline
-  private String simpleRandomSampleWithTwoCallsTest;
-
-  @Test
-  public void testSimpleRandomSample() throws Exception
-  {
-    writeLinesToFile("input",
-                     "A1\tB1\t1",
-                     "A1\tB1\t4",
-                     "A1\tB3\t4",
-                     "A1\tB4\t4",
-                     "A2\tB1\t4",
-                     "A2\tB2\t4",
-                     "A3\tB1\t3",
-                     "A3\tB1\t1",
-                     "A3\tB3\t77",
-                     "A4\tB1\t3",
-                     "A4\tB2\t3",
-                     "A4\tB3\t59",
-                     "A4\tB4\t29",
-                     "A5\tB1\t4",
-                     "A6\tB2\t3",
-                     "A6\tB2\t55",
-                     "A6\tB3\t1",
-                     "A7\tB1\t39",
-                     "A7\tB2\t27",
-                     "A7\tB3\t85",
-                     "A8\tB1\t4",
-                     "A8\tB2\t45",
-                     "A9\tB3\t92",
-                     "A9\tB3\t0",
-                     "A9\tB6\t42",
-                     "A9\tB5\t1",
-                     "A10\tB1\t7",
-                     "A10\tB2\t23",
-                     "A10\tB2\t1",
-                     "A10\tB2\t31",
-                     "A10\tB6\t41",
-                     "A10\tB7\t52");
-
-    int n = 32;
-    double p = 0.3;
-    int s = (int) Math.ceil(p * n);
-
-    PigTest test = createPigTestFromString(simpleRandomSampleTest, "p=" + p);
-    test.runScript();
-    assertOutput(test, "sampled", "(" + s + ")");
-
-    int n1 = 30;
-    PigTest testWithLB =
-        createPigTestFromString(simpleRandomSampleWithLowerBoundTest, "p=" + p, "n1="
-            + n1);
-    testWithLB.runScript();
-    assertOutput(testWithLB, "sampled", "(" + s + ")");
-
-    double p1 = 0.05;
-    double p2 = 0.95;
-    int s1 = (int) Math.ceil(p1 * n);
-    int s2 = (int) Math.ceil(p2 * n);
-
-    PigTest testWithTwoCalls =
-        createPigTestFromString(simpleRandomSampleWithTwoCallsTest, "p1=" + p1, "p2="
-            + p2);
-    testWithTwoCalls.runScript();
-    assertOutput(testWithTwoCalls, "sampled", "(" + s1 + "," + s2 + ")");
-
-    test.runScript();
-
-  }
-
-  /**
-   * register $JAR_PATH
-   * 
-   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
-   * 
-   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
-   * 
-   * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data,
-   * $SAMPLING_PROBABILITY) as sample_data;
-   * 
-   * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
-   * 
-   * sampled = ORDER sampled BY group;
-   * 
-   * STORE sampled INTO 'output';
-   */
-  @Multiline
-  private String stratifiedSampleTest;
-
-  @Test
-  public void testStratifiedSample() throws Exception
-  {
-    writeLinesToFile("input",
-                     "A1\tB1\t1",
-                     "A1\tB1\t4",
-                     "A1\tB3\t4",
-                     "A1\tB4\t4",
-                     "A2\tB1\t4",
-                     "A2\tB2\t4",
-                     "A3\tB1\t3",
-                     "A3\tB1\t1",
-                     "A3\tB3\t77",
-                     "A4\tB1\t3",
-                     "A4\tB2\t3",
-                     "A4\tB3\t59",
-                     "A4\tB4\t29",
-                     "A5\tB1\t4",
-                     "A6\tB2\t3",
-                     "A6\tB2\t55",
-                     "A6\tB3\t1",
-                     "A7\tB1\t39",
-                     "A7\tB2\t27",
-                     "A7\tB3\t85",
-                     "A8\tB1\t4",
-                     "A8\tB2\t45",
-                     "A9\tB3\t92",
-                     "A9\tB3\t0",
-                     "A9\tB6\t42",
-                     "A9\tB5\t1",
-                     "A10\tB1\t7",
-                     "A10\tB2\t23",
-                     "A10\tB2\t1",
-                     "A10\tB2\t31",
-                     "A10\tB6\t41",
-                     "A10\tB7\t52");
-
-    double p = 0.5;
-
-    PigTest test =
-        createPigTestFromString(stratifiedSampleTest, "SAMPLING_PROBABILITY=" + p);
-    test.runScript();
-    assertOutput(test,
-                 "sampled",
-                 "(A1,2)",
-                 "(A10,3)",
-                 "(A2,1)",
-                 "(A3,2)",
-                 "(A4,2)",
-                 "(A5,1)",
-                 "(A6,2)",
-                 "(A7,2)",
-                 "(A8,1)",
-                 "(A9,2)");
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
deleted file mode 100644
index 15e1fd6..0000000
--- a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sampling;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.sampling.SimpleRandomSample;
-import datafu.test.pig.PigTests;
-
-/**
- * Tests for {@link SimpleRandomSample}.
- * 
- * @deprecated This tests the deprecated functionality of SimpleRandomSample
- *             where the probability can be specified in the constructor.  
- * @author ximeng
- * 
- */
-public class SimpleRandomSampleTestOld extends PigTests
-{
-  /**
-   * register $JAR_PATH
-   * 
-   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
-   * 
-   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
-   * 
-   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data) as sample_data;
-   * 
-   * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
-   * 
-   * STORE sampled INTO 'output';
-   */
-  @Multiline
-  private String simpleRandomSampleTest;
-
-  @Test
-  public void simpleRandomSampleTest() throws Exception
-  {
-    writeLinesToFile("input",
-                     "A1\tB1\t1",
-                     "A1\tB1\t4",
-                     "A1\tB3\t4",
-                     "A1\tB4\t4",
-                     "A2\tB1\t4",
-                     "A2\tB2\t4",
-                     "A3\tB1\t3",
-                     "A3\tB1\t1",
-                     "A3\tB3\t77",
-                     "A4\tB1\t3",
-                     "A4\tB2\t3",
-                     "A4\tB3\t59",
-                     "A4\tB4\t29",
-                     "A5\tB1\t4",
-                     "A6\tB2\t3",
-                     "A6\tB2\t55",
-                     "A6\tB3\t1",
-                     "A7\tB1\t39",
-                     "A7\tB2\t27",
-                     "A7\tB3\t85",
-                     "A8\tB1\t4",
-                     "A8\tB2\t45",
-                     "A9\tB3\t92",
-                     "A9\tB3\t0",
-                     "A9\tB6\t42",
-                     "A9\tB5\t1",
-                     "A10\tB1\t7",
-                     "A10\tB2\t23",
-                     "A10\tB2\t1",
-                     "A10\tB2\t31",
-                     "A10\tB6\t41",
-                     "A10\tB7\t52");
-
-    int n = 32;
-    double p = 0.3;
-    int s = (int) Math.ceil(p * n);
-    PigTest test =
-        createPigTestFromString(simpleRandomSampleTest, "SAMPLING_PROBABILITY=" + p);
-
-    test.runScript();
-
-    assertOutput(test, "sampled", "(" + s + ")");
-  }
-
-  /**
-   * register $JAR_PATH
-   * 
-   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
-   * 
-   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
-   * 
-   * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data) as sample_data;
-   * 
-   * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
-   * 
-   * sampled = ORDER sampled BY group;
-   * 
-   * STORE sampled INTO 'output';
-   */
-  @Multiline
-  private String stratifiedSampleTest;
-
-  @Test
-  public void stratifiedSampleTest() throws Exception
-  {
-    writeLinesToFile("input",
-                     "A1\tB1\t1",
-                     "A1\tB1\t4",
-                     "A1\tB3\t4",
-                     "A1\tB4\t4",
-                     "A2\tB1\t4",
-                     "A2\tB2\t4",
-                     "A3\tB1\t3",
-                     "A3\tB1\t1",
-                     "A3\tB3\t77",
-                     "A4\tB1\t3",
-                     "A4\tB2\t3",
-                     "A4\tB3\t59",
-                     "A4\tB4\t29",
-                     "A5\tB1\t4",
-                     "A6\tB2\t3",
-                     "A6\tB2\t55",
-                     "A6\tB3\t1",
-                     "A7\tB1\t39",
-                     "A7\tB2\t27",
-                     "A7\tB3\t85",
-                     "A8\tB1\t4",
-                     "A8\tB2\t45",
-                     "A9\tB3\t92",
-                     "A9\tB3\t0",
-                     "A9\tB6\t42",
-                     "A9\tB5\t1",
-                     "A10\tB1\t7",
-                     "A10\tB2\t23",
-                     "A10\tB2\t1",
-                     "A10\tB2\t31",
-                     "A10\tB6\t41",
-                     "A10\tB7\t52");
-
-    double p = 0.5;
-
-    PigTest test =
-        createPigTestFromString(stratifiedSampleTest, "SAMPLING_PROBABILITY=" + p);
-    test.runScript();
-    assertOutput(test,
-                 "sampled",
-                 "(A1,2)",
-                 "(A10,3)",
-                 "(A2,1)",
-                 "(A3,2)",
-                 "(A4,2)",
-                 "(A5,1)",
-                 "(A6,2)",
-                 "(A7,2)",
-                 "(A8,1)",
-                 "(A9,2)");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
deleted file mode 100644
index 338b3e7..0000000
--- a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sampling;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.sampling.SimpleRandomSampleWithReplacementElect;
-import datafu.pig.sampling.SimpleRandomSampleWithReplacementVote;
-import datafu.test.pig.PigTests;
-
-/**
- * Tests for {@link SimpleRandomSampleWithReplacementVote} and
- * {@link SimpleRandomSampleWithReplacementElect}.
- * 
- * @author ximeng
- * 
- */
-public class SimpleRandomSampleWithReplacementTest extends PigTests
-{
-  // @formatter:off
-  /**
-   * register $JAR_PATH
-   * 
-   * DEFINE SRSWR_VOTE datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
-   * 
-   * DEFINE SRSWR_ELECT datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
-   * 
-   * item = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int); 
-   * 
-   * summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
-   * 
-   * candidates = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRSWR_VOTE(item, $SAMPLE_SIZE, summary.count));
-   * 
-   * sampled = FOREACH (GROUP candidates BY position) GENERATE FLATTEN(SRSWR_ELECT(candidates));
-   * 
-   * sampled = FOREACH (GROUP sampled ALL) GENERATE COUNT(sampled) AS size;
-   * 
-   * STORE sampled INTO 'output';
-   */
-  @Multiline
-  private String simpleRandomSampleWithReplacementTest;
-  // @formatter:on
-
-  @Test
-  public void testSimpleRandomSampleWithReplacement() throws Exception
-  {
-    writeLinesToFile("input",
-                     "A1\tB1\t1",
-                     "A1\tB1\t4",
-                     "A1\tB3\t4",
-                     "A1\tB4\t4",
-                     "A2\tB1\t4",
-                     "A2\tB2\t4",
-                     "A3\tB1\t3",
-                     "A3\tB1\t1",
-                     "A3\tB3\t77",
-                     "A4\tB1\t3",
-                     "A4\tB2\t3",
-                     "A4\tB3\t59",
-                     "A4\tB4\t29",
-                     "A5\tB1\t4",
-                     "A6\tB2\t3",
-                     "A6\tB2\t55",
-                     "A6\tB3\t1",
-                     "A7\tB1\t39",
-                     "A7\tB2\t27",
-                     "A7\tB3\t85",
-                     "A8\tB1\t4",
-                     "A8\tB2\t45",
-                     "A9\tB3\t92",
-                     "A9\tB3\t0",
-                     "A9\tB6\t42",
-                     "A9\tB5\t1",
-                     "A10\tB1\t7",
-                     "A10\tB2\t23",
-                     "A10\tB2\t1",
-                     "A10\tB2\t31",
-                     "A10\tB6\t41",
-                     "A10\tB7\t52");
-
-    int s = 32;
-    PigTest test =
-        createPigTestFromString(simpleRandomSampleWithReplacementTest, "SAMPLE_SIZE=" + s
-            + "L");
-
-    test.runScript();
-
-    assertOutput(test, "sampled", "(" + s + ")");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
deleted file mode 100644
index 9bab8a0..0000000
--- a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sampling;
-
-import java.io.IOException;
-import java.util.*;
-
-import junit.framework.Assert;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.sampling.WeightedReservoirSample;
-import datafu.test.pig.PigTests;
-
-/**
- * Tests for {@link WeightedReservoirSample}.
- *
- * @author wjian 
- *
- */
-public class WeightedReservoirSamplingTests extends PigTests
-{
-  /**
-  register $JAR_PATH
-
-  DEFINE ReservoirSample datafu.pig.sampling.WeightedReservoirSample('$RESERVOIR_SIZE','2');
-  DEFINE Assert datafu.pig.util.Assert();
-  
-  data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int, W:double);
-  sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C,W)) as sample_data;
-  sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
-  sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
-  STORE sampled INTO 'output';
-
-   */
-  @Multiline
-  private String weightedReservoirSampleGroupTest;
-  
-  /**
-   * Verifies that WeightedReservoirSample works when data grouped by a key.
-   * In particular it ensures that the reservoir is not reused across keys.
-   * 
-   * <p>
-   * This confirms the fix for DATAFU-11.
-   * </p>
-   * 
-   * @throws Exception
-   */
-  @Test
-  public void weightedReservoirSampleGroupTest() throws Exception
-  {    
-    // first value is the key.  second to last value matches the key so we can
-    // verify the register is reset for each key.  values should not
-    // bleed across to other keys.
-    writeLinesToFile("input",
-                     "1\tB1\t1\t1.0",
-                     "1\tB1\t1\t1.0",
-                     "1\tB3\t1\t1.0",
-                     "1\tB4\t1\t1.0",
-                     "2\tB1\t2\t1.0",
-                     "2\tB2\t2\t1.0",
-                     "3\tB1\t3\t1.0",
-                     "3\tB1\t3\t1.0",
-                     "3\tB3\t3\t1.0",
-                     "4\tB1\t4\t1.0",
-                     "4\tB2\t4\t1.0",
-                     "4\tB3\t4\t1.0",
-                     "4\tB4\t4\t1.0",
-                     "5\tB1\t5\t1.0",
-                     "6\tB2\t6\t1.0",
-                     "6\tB2\t6\t1.0",
-                     "6\tB3\t6\t1.0",
-                     "7\tB1\t7\t1.0",
-                     "7\tB2\t7\t1.0",
-                     "7\tB3\t7\t1.0",
-                     "8\tB1\t8\t1.0",
-                     "8\tB2\t8\t1.0",
-                     "9\tB3\t9\t1.0",
-                     "9\tB3\t9\t1.0",
-                     "9\tB6\t9\t1.0",
-                     "9\tB5\t9\t1.0",
-                     "10\tB1\t10\t1.0",
-                     "10\tB2\t10\t1.0",
-                     "10\tB2\t10\t1.0",
-                     "10\tB2\t10\t1.0",
-                     "10\tB6\t10\t1.0",
-                     "10\tB7\t10\t1.0");
-   
-    for(int i=1; i<=3; i++) {
-      int reservoirSize = i ;
-      PigTest test = createPigTestFromString(weightedReservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
-      test.runScript();
-      
-      List<Tuple> tuples = getLinesForAlias(test, "sampled");
-      
-      for (Tuple tuple : tuples)
-      {
-        Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
-      }
-    }
-  }
-  
- /** 
-  register $JAR_PATH 
- 
-  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1'); 
-
-  data = LOAD 'input' AS (v1:chararray,v2:INT);
-  data_g = group data all;
-  sampled = FOREACH data_g GENERATE WeightedSample(data);
-  --describe sampled; 
-   
-  STORE sampled INTO 'output'; 
- 
-  */ 
-  @Multiline 
-  private String weightedSampleTest; 
-   
-  @Test 
-  public void weightedSampleTest() throws Exception 
-  {
-     Map<String, Integer> count = new HashMap<String, Integer>();
-
-     count.put("a", 0);
-     count.put("b", 0);
-     count.put("c", 0);
-     count.put("d", 0);
-
-     PigTest test = createPigTestFromString(weightedSampleTest); 
- 
-     writeLinesToFile("input",  
-                "a\t100","b\t1","c\t5","d\t2");
-
-     for(int i = 0; i < 10; i++) {
-        test.runScript();
-
-        List<Tuple> output = this.getLinesForAlias(test, "sampled");
-
-        Tuple t = output.get(0);
-
-        DataBag sampleBag = (DataBag)t.get(0);
-
-        for(Iterator<Tuple> sampleIter = sampleBag.iterator(); sampleIter.hasNext();) {
-           Tuple st = sampleIter.next();
-           String key = (String)st.get(0);
-           count.put(key, count.get(key) + 1);
-        }              
-     }
-
-     String maxKey = "";
-     int maxCount = 0;
-     for(String key : count.keySet()) {
-        if(maxCount < count.get(key)) {
-           maxKey = key; 
-           maxCount = count.get(key);
-        } 
-     }
-
-     Assert.assertEquals(maxKey, "a");
-  }
-
-  @Test
-  public void weightedReservoirSampleAccumulateTest() throws IOException
-  {
-     WeightedReservoirSample sampler = new WeightedReservoirSample("10", "1");
-
-     for (int i=0; i<100; i++)
-     {
-       Tuple t = TupleFactory.getInstance().newTuple(2);
-       t.set(0, i);
-       t.set(1, i + 1);
-       DataBag bag = BagFactory.getInstance().newDefaultBag();
-       bag.add(t);
-       Tuple input = TupleFactory.getInstance().newTuple(bag);
-       sampler.accumulate(input);
-     }
-
-     DataBag result = sampler.getValue();
-     verifyNoRepeatAllFound(result, 10, 0, 100); 
-  }
-
-  @Test
-  public void weightedReservoirSampleAlgebraicTest() throws IOException
-  {
-    WeightedReservoirSample.Initial initialSampler = new WeightedReservoirSample.Initial("10", "1");
-    WeightedReservoirSample.Intermediate intermediateSampler = new WeightedReservoirSample.Intermediate("10", "1");
-    WeightedReservoirSample.Final finalSampler = new WeightedReservoirSample.Final("10", "1");
-    
-    DataBag bag = BagFactory.getInstance().newDefaultBag();
-    for (int i=0; i<100; i++)
-    {
-      Tuple t = TupleFactory.getInstance().newTuple(2);
-      t.set(0, i);
-      t.set(1, i + 1);
-      bag.add(t);
-    }
-    
-    Tuple input = TupleFactory.getInstance().newTuple(bag);
-    
-    Tuple intermediateTuple = initialSampler.exec(input);  
-    DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
-    intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));  
-    intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
-    DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
-    verifyNoRepeatAllFound(result, 10, 0, 100); 
-   }
-
-  private void verifyNoRepeatAllFound(DataBag result,
-                                      int expectedResultSize,
-                                      int left,
-                                      int right) throws ExecException
-  {
-    Assert.assertEquals(expectedResultSize, result.size());
-    
-    // all must be found, no repeats
-    Set<Integer> found = new HashSet<Integer>();
-    for (Tuple t : result)
-    {
-      Integer i = (Integer)t.get(0);
-      Assert.assertTrue(i>=left && i<right);
-      Assert.assertFalse(String.format("Found duplicate of %d",i), found.contains(i));
-      found.add(i);
-    }
-  }
-
-  @Test
-  public void weightedReservoirSampleLimitExecTest() throws IOException
-  {
-    WeightedReservoirSample sampler = new WeightedReservoirSample("100", "1");
-   
-    DataBag bag = BagFactory.getInstance().newDefaultBag();
-    for (int i=0; i<10; i++)
-    {
-      Tuple t = TupleFactory.getInstance().newTuple(2);
-      t.set(0, i);
-      t.set(1, 1); // score is equal for all
-      bag.add(t);
-    }
-   
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    input.set(0, bag);
-   
-    DataBag result = sampler.exec(input);
-   
-    verifyNoRepeatAllFound(result, 10, 0, 10); 
-
-    Set<Integer> found = new HashSet<Integer>();
-    for (Tuple t : result)
-    {
-      Integer i = (Integer)t.get(0);
-      found.add(i);
-    }
-
-    for(int i = 0; i < 10; i++)
-    {
-      Assert.assertTrue(found.contains(i));
-    }
-  }
-
-  @Test
-  public void invalidConstructorArgTest() throws Exception
-  {
-    try {
-         WeightedReservoirSample sampler = new WeightedReservoirSample("1", "-1");
-         Assert.fail( "Testcase should fail");
-    } catch (Exception ex) {
-         Assert.assertTrue(ex.getMessage().indexOf("Invalid negative index of weight field argument for WeightedReserviorSample constructor: -1") >= 0);
-    }
-  }
-
-  @Test
-  public void invalidWeightTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(weightedSampleTest);
-
-    writeLinesToFile("input",  
-                "a\t100","b\t1","c\t0","d\t2");
-    try {
-         test.runScript();
-         List<Tuple> output = this.getLinesForAlias(test, "sampled");
-         Assert.fail( "Testcase should fail");
-    } catch (Exception ex) {
-    }
-  }
-
- /** 
-  register $JAR_PATH 
- 
-  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1'); 
-
-  data = LOAD 'input' AS (v1:chararray);
-  data_g = group data all;
-  sampled = FOREACH data_g GENERATE WeightedSample(data);
-  describe sampled; 
-   
-  STORE sampled INTO 'output'; 
- 
-  */ 
-  @Multiline 
-  private String invalidInputTupleSizeTest; 
- 
-  @Test
-  public void invalidInputTupleSizeTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(invalidInputTupleSizeTest);
-
-    writeLinesToFile("input",  
-                "a","b","c","d");
-    try {
-         test.runScript();
-         List<Tuple> output = this.getLinesForAlias(test, "sampled");
-         Assert.fail( "Testcase should fail");
-    } catch (Exception ex) {
-         Assert.assertTrue(ex.getMessage().indexOf("The field schema of the input tuple is null or the tuple size is no more than the weight field index: 1") >= 0);
-    }
-  }
-
- /** 
-  register $JAR_PATH 
- 
-  define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0'); 
-
-  data = LOAD 'input' AS (v1:chararray, v2:INT);
-  data_g = group data all;
-  sampled = FOREACH data_g GENERATE WeightedSample(data);
-  describe sampled; 
-   
-  STORE sampled INTO 'output'; 
- 
-  */ 
-  @Multiline 
-  private String invalidWeightFieldSchemaTest; 
- 
-  @Test
-  public void invalidWeightFieldSchemaTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(invalidWeightFieldSchemaTest);
-
-    writeLinesToFile("input",  
-                "a\t100","b\t1","c\t5","d\t2");
-    try {
-         test.runScript();
-         List<Tuple> output = this.getLinesForAlias(test, "sampled");
-         Assert.fail( "Testcase should fail");
-    } catch (Exception ex) {
-         Assert.assertTrue(ex.getMessage().indexOf("Expect the type of the weight field of the input tuple to be of ([int, long, float, double]), but instead found (chararray), weight field: 0") >= 0);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sessions/SessionTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sessions/SessionTests.java b/test/pig/datafu/test/pig/sessions/SessionTests.java
deleted file mode 100644
index 084f970..0000000
--- a/test/pig/datafu/test/pig/sessions/SessionTests.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sessions;
-
-import static org.testng.Assert.*;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import junit.framework.Assert;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.pigunit.PigTest;
-import org.joda.time.DateTime;
-import org.testng.annotations.Test;
-
-import datafu.pig.sessions.SessionCount;
-import datafu.pig.sessions.Sessionize;
-import datafu.test.pig.PigTests;
-
-public class SessionTests extends PigTests
-{
-  /**
-  register $JAR_PATH
-
-  define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
-  
-  views = LOAD 'input' AS (time:$TIME_TYPE, user_id:int, value:int);
-  
-  views_grouped = GROUP views BY user_id;
-  view_counts = FOREACH views_grouped {
-    views = ORDER views BY time;
-    GENERATE flatten(Sessionize(views)) as (time,user_id,value,session_id);
-  }
-  
-  max_value = GROUP view_counts BY (user_id, session_id);
-  
-  max_value = FOREACH max_value GENERATE group.user_id, MAX(view_counts.value) AS val;
-  
-  STORE max_value INTO 'output';
-   */
-  @Multiline private String sessionizeTest;
-  
-  private String[] inputData = new String[] {
-      "2010-01-01T01:00:00Z\t1\t10",
-      "2010-01-01T01:15:00Z\t1\t20",
-      "2010-01-01T01:31:00Z\t1\t10",
-      "2010-01-01T01:35:00Z\t1\t20",
-      "2010-01-01T02:30:00Z\t1\t30",
-
-      "2010-01-01T01:00:00Z\t2\t10",
-      "2010-01-01T01:31:00Z\t2\t20",
-      "2010-01-01T02:10:00Z\t2\t30",
-      "2010-01-01T02:40:30Z\t2\t40",
-      "2010-01-01T03:30:00Z\t2\t50",
-
-      "2010-01-01T01:00:00Z\t3\t10",
-      "2010-01-01T01:01:00Z\t3\t20",
-      "2010-01-01T01:02:00Z\t3\t5",
-      "2010-01-01T01:10:00Z\t3\t25",
-      "2010-01-01T01:15:00Z\t3\t50",
-      "2010-01-01T01:25:00Z\t3\t30",
-      "2010-01-01T01:30:00Z\t3\t15"  
-  };
-  
-  @Test
-  public void sessionizeTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(sessionizeTest,
-                                 "TIME_WINDOW=30m",
-                                 "JAR_PATH=" + getJarPath(),
-                                 "TIME_TYPE=chararray");
-
-    this.writeLinesToFile("input", 
-                          inputData);
-    
-    test.runScript();
-    
-    HashMap<Integer,HashMap<Integer,Boolean>> userValues = new HashMap<Integer,HashMap<Integer,Boolean>>();
-    
-    for (Tuple t : this.getLinesForAlias(test, "max_value"))
-    {
-      Integer userId = (Integer)t.get(0);
-      Integer max = (Integer)t.get(1);
-      if (!userValues.containsKey(userId))
-      {
-        userValues.put(userId, new HashMap<Integer,Boolean>());
-      }
-      userValues.get(userId).put(max, true);
-    }
-    
-    assertEquals(userValues.get(1).size(), 2);
-    assertEquals(userValues.get(2).size(), 5);
-    assertEquals(userValues.get(3).size(), 1);    
-    
-    assertTrue(userValues.get(1).containsKey(20));
-    assertTrue(userValues.get(1).containsKey(30));
-    
-    assertTrue(userValues.get(2).containsKey(10));
-    assertTrue(userValues.get(2).containsKey(20));
-    assertTrue(userValues.get(2).containsKey(30));
-    assertTrue(userValues.get(2).containsKey(40));
-    assertTrue(userValues.get(2).containsKey(50));    
-
-    assertTrue(userValues.get(3).containsKey(50));
-  }
-  
-  private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
-     
-  @Test
-  public void sessionizeLongTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(sessionizeTest,
-                                 "TIME_WINDOW=30m",
-                                 "JAR_PATH=" + getJarPath(),
-                                 "TIME_TYPE=long");
-
-    List<String> lines = new ArrayList<String>();
-        
-    for (String line : inputData)
-    {
-      String[] parts = line.split("\t");
-      Assert.assertEquals(3, parts.length);
-      parts[0] = Long.toString(dateFormat.parse(parts[0]).getTime());
-      lines.add(StringUtils.join(parts,"\t"));
-    }
-    
-    this.writeLinesToFile("input", 
-                          lines.toArray(new String[]{}));
-    
-    test.runScript();
-    
-    HashMap<Integer,HashMap<Integer,Boolean>> userValues = new HashMap<Integer,HashMap<Integer,Boolean>>();
-    
-    for (Tuple t : this.getLinesForAlias(test, "max_value"))
-    {
-      Integer userId = (Integer)t.get(0);
-      Integer max = (Integer)t.get(1);
-      if (!userValues.containsKey(userId))
-      {
-        userValues.put(userId, new HashMap<Integer,Boolean>());
-      }
-      userValues.get(userId).put(max, true);
-    }
-    
-    assertEquals(userValues.get(1).size(), 2);
-    assertEquals(userValues.get(2).size(), 5);
-    
-    assertTrue(userValues.get(1).containsKey(20));
-    assertTrue(userValues.get(1).containsKey(30));
-    
-    assertTrue(userValues.get(2).containsKey(10));
-    assertTrue(userValues.get(2).containsKey(20));
-    assertTrue(userValues.get(2).containsKey(30));
-    assertTrue(userValues.get(2).containsKey(40));
-    assertTrue(userValues.get(2).containsKey(50));
-  }
-  
-  @Test
-  public void sessionizeExecTest() throws Exception
-  {
-    Sessionize sessionize = new Sessionize("30m");
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
-    input.set(0,inputBag);
-    
-    Tuple item;
-    List<Tuple> result;
-    DateTime dt;
-    
-    // test same session id
-    inputBag.clear();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(28).getMillis());
-    inputBag.add(item);
-    result = toList(sessionize.exec(input));
-    
-    Assert.assertEquals(2, result.size());
-    Assert.assertEquals(2,result.get(0).size());
-    Assert.assertEquals(2,result.get(1).size());
-    // session ids match
-    Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1))); 
-    
-    // test different session id
-    inputBag.clear();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(31).getMillis());
-    inputBag.add(item);
-    result = toList(sessionize.exec(input));
-    
-    Assert.assertEquals(2, result.size());
-    Assert.assertEquals(2,result.get(0).size());
-    Assert.assertEquals(2,result.get(1).size());
-    // session ids don't match
-    Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
-  }
-  
-  @Test
-  public void sessionizeAccumulateTest() throws Exception
-  {
-    Sessionize sessionize = new Sessionize("30m");
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
-    input.set(0,inputBag);
-    
-    Tuple item;
-    List<Tuple> result;
-    DateTime dt;
-    
-    // test same session id
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(28).getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    result = toList(sessionize.getValue());
-    
-    Assert.assertEquals(2, result.size());
-    Assert.assertEquals(2,result.get(0).size());
-    Assert.assertEquals(2,result.get(1).size());
-    // session ids match
-    Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1))); 
-    
-    // test different session id
-    sessionize.cleanup();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(31).getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    result = toList(sessionize.getValue());
-    
-    Assert.assertEquals(2, result.size());
-    Assert.assertEquals(2,result.get(0).size());
-    Assert.assertEquals(2,result.get(1).size());
-    // session ids don't match
-    Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
-    
-    sessionize.cleanup();
-    Assert.assertEquals(0,sessionize.getValue().size());
-  }
-  
-  private List<Tuple> toList(DataBag bag)
-  {
-    List<Tuple> result = new ArrayList<Tuple>();
-    for (Tuple t : bag)
-    {
-      result.add(t);
-    }
-    return result;
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define SessionCount datafu.pig.sessions.SessionCount('$TIME_WINDOW');
-  
-  views = LOAD 'input' AS (user_id:int, page_id:int, time:chararray);
-  
-  views_grouped = GROUP views BY (user_id, page_id);
-  view_counts = foreach views_grouped {
-    views = order views by time;
-    generate group.user_id as user_id, group.page_id as page_id, SessionCount(views.(time)) as count;
-  }
-  
-  STORE view_counts INTO 'output';
-   */
-  @Multiline
-  private String sessionCountPageViewsTest;
-  
-  @Test
-  public void sessionCountPageViewsTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(sessionCountPageViewsTest,
-                                 "TIME_WINDOW=30m",
-                                 "JAR_PATH=" + getJarPath());
-        
-    String[] input = {
-      "1\t100\t2010-01-01T01:00:00Z",
-      "1\t100\t2010-01-01T01:15:00Z",
-      "1\t100\t2010-01-01T01:31:00Z",
-      "1\t100\t2010-01-01T01:35:00Z",
-      "1\t100\t2010-01-01T02:30:00Z",
-
-      "1\t101\t2010-01-01T01:00:00Z",
-      "1\t101\t2010-01-01T01:31:00Z",
-      "1\t101\t2010-01-01T02:10:00Z",
-      "1\t101\t2010-01-01T02:40:30Z",
-      "1\t101\t2010-01-01T03:30:00Z",      
-
-      "1\t102\t2010-01-01T01:00:00Z",
-      "1\t102\t2010-01-01T01:01:00Z",
-      "1\t102\t2010-01-01T01:02:00Z",
-      "1\t102\t2010-01-01T01:10:00Z",
-      "1\t102\t2010-01-01T01:15:00Z",
-      "1\t102\t2010-01-01T01:25:00Z",
-      "1\t102\t2010-01-01T01:30:00Z"
-    };
-    
-    String[] output = {
-        "(1,100,2)",
-        "(1,101,5)",
-        "(1,102,1)"
-      };
-    
-    test.assertOutput("views",input,"view_counts",output);
-  }
-  
-  @Test
-  public void sessionCountExecTest() throws Exception
-  {
-    SessionCount sessionize = new SessionCount("30m");
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
-    input.set(0,inputBag);
-    
-    Tuple item;
-    DateTime dt;
-    
-    // test same session id
-    inputBag.clear();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(28).getMillis());
-    inputBag.add(item);
-    Assert.assertEquals(1L,sessionize.exec(input).longValue()); 
-    
-    // test different session id
-    inputBag.clear();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(31).getMillis());
-    inputBag.add(item);
-    Assert.assertEquals(2L,sessionize.exec(input).longValue());
-  }
-  
-  @Test
-  public void sessionCountAccumulateTest() throws Exception
-  {
-    SessionCount sessionize = new SessionCount("30m");
-    Tuple input = TupleFactory.getInstance().newTuple(1);
-    DataBag inputBag = BagFactory.getInstance().newDefaultBag();
-    input.set(0,inputBag);
-    
-    Tuple item;
-    DateTime dt;
-    
-    // test same session id
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(28).getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    Assert.assertEquals(1L,sessionize.getValue().longValue()); 
-    
-    // test different session id
-    sessionize.cleanup();
-    dt = new DateTime();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    item = TupleFactory.getInstance().newTuple(1);
-    item.set(0, dt.plusMinutes(31).getMillis());
-    inputBag.add(item);
-    sessionize.accumulate(input);
-    inputBag.clear();
-    Assert.assertEquals(2L,sessionize.exec(input).longValue());
-    
-    sessionize.cleanup();
-    Assert.assertEquals(0,sessionize.getValue().longValue());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sets/SetTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sets/SetTests.java b/test/pig/datafu/test/pig/sets/SetTests.java
deleted file mode 100644
index de6aa51..0000000
--- a/test/pig/datafu/test/pig/sets/SetTests.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sets;
-
-import java.util.Arrays;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import datafu.pig.sets.SetIntersect;
-import datafu.test.pig.PigTests;
-
-public class SetTests extends PigTests
-{
-  /**
-  register $JAR_PATH
-
-  define SetIntersect datafu.pig.sets.SetIntersect();
-  
-  data = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
-  
-  data2 = FOREACH data GENERATE SetIntersect(B1,B2);
-  
-  STORE data2 INTO 'output';
-   */
-  @Multiline
-  private String setIntersectTest;
-  
-  @Test
-  public void setIntersectTest() throws Exception
-  {    
-    PigTest test = createPigTestFromString(setIntersectTest);    
-    
-    writeLinesToFile("input", 
-                     "{(1,10),(2,20),(3,30),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}",
-                     "{(1,10),(1,10),(2,20),(3,30),(3,30),(4,40),(4,40)}\t{(1,10),(3,30)}");
-                  
-    test.runScript();
-            
-    assertOutput(test, "data2",
-                 "({(2,20),(4,40)})",
-                 "({(1,10),(3,30)})");
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define SetIntersect datafu.pig.sets.SetIntersect();
-  
-  docs = LOAD 'docs' USING PigStorage(',') AS (id:int, line:chararray);
-  B = FOREACH docs GENERATE id, line;
-  C = FOREACH B GENERATE id, TOKENIZE(line) as gu;
-  
-  filtered = FOREACH C {
-    uniq = DISTINCT gu;
-    GENERATE id, uniq;
-  }
-    
-  query = LOAD 'query' AS (line_query:chararray);
-  bag_query = FOREACH query GENERATE TOKENIZE(line_query) AS query;
-  -- sort the bag of tokens, since SetIntersect requires it
-  bag_query = FOREACH bag_query {
-    query_sorted = ORDER query BY token;
-    GENERATE query_sorted;
-  }
-    
-  result = FOREACH filtered {
-    -- sort the tokens, since SetIntersect requires it
-    tokens_sorted = ORDER uniq BY token;
-    GENERATE id, 
-             SIZE(SetIntersect(tokens_sorted,bag_query.query_sorted)) as cnt;
-  }
-    
-  DUMP result;
-  
-   */
-  @Multiline
-  private String setIntersectTestExample;
-  
-  @Test
-  public void setIntersectTestExample() throws Exception
-  {    
-    PigTest test = createPigTestFromString(setIntersectTestExample);    
-    
-    // document with words to check for
-    writeLinesToFile("docs", 
-                     "1,word1 word4 word2 word1",
-                     "2,word2 word6 word1 word5 word3 word7",
-                     "3,word1 word3 word4 word5");
-    
-    // query to apply to document
-    writeLinesToFile("query", 
-                     "word2 word7 word5");
-                  
-    test.runScript();
-    
-    assertOutput(test, "result",
-                 "(1,1)",
-                 "(2,3)",
-                 "(3,1)");
-  }
-  
-  @Test
-  public void setIntersectEmptyBagsTest() throws Exception
-  { 
-    PigTest test = createPigTestFromString(setIntersectTest);    
-    
-    writeLinesToFile("input", 
-                     "{(1,10),(2,20),(3,30),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}",
-                     "{(1,10),(1,10),(2,20),(3,30),(3,30),(4,40),(4,40)}\t{(100,10),(300,30)}",
-                     "{(1,10),(2,20)}\t{(1,10),(2,20)}",
-                     "{(1,10),(2,20)}\t{}",
-                     "{}\t{(1,10),(2,20)}",
-                     "{}\t{}");
-                  
-    test.runScript();
-            
-    assertOutput(test, "data2",
-                 "({(2,20),(4,40)})",
-                 "({})",
-                 "({(1,10),(2,20)})",
-                 "({})",
-                 "({})",
-                 "({})");
-  }
-  
-  @Test
-  public void testIntersectWithNullTuples() throws Exception {
-     DataBag one = BagFactory.getInstance().newDefaultBag();
-     DataBag two = BagFactory.getInstance().newDefaultBag();
-
-     Tuple input = TupleFactory.getInstance().newTuple(Arrays.asList(one, two));
-     DataBag output = new SetIntersect().exec(input);
-     Assert.assertEquals(0, output.size());
-  }
-
-  @Test(expectedExceptions=org.apache.pig.impl.logicalLayer.FrontendException.class)
-  public void setIntersectOutOfOrderTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(setIntersectTest);
-    
-    this.writeLinesToFile("input", 
-                          "{(1,10),(3,30),(2,20),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}");
-        
-    test.runScript();
-    
-    this.getLinesForAlias(test, "data2");
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define SetUnion datafu.pig.sets.SetUnion();
-  
-  data = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
-  
-  --dump data
-  
-  data2 = FOREACH data GENERATE SetUnion(B1,B2) AS C;
-  data2 = FOREACH data2 {
-    C = ORDER C BY val1 ASC, val2 ASC;
-    generate C;
-  }
-  
-  --dump data2
-  
-  STORE data2 INTO 'output';
-   */
-  @Multiline
-  private String setUnionTest;
-  
-  @Test
-  public void setUnionTest() throws Exception
-  {    
-    PigTest test = createPigTestFromString(setUnionTest);    
-    
-    writeLinesToFile("input", 
-                     "{(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)}\t{(1,1),(1,20),(1,25),(1,25),(1,25),(1,40),(1,70),(1,80)}");
-                  
-    test.runScript();
-            
-    assertOutput(test, "data2",
-                 "({(1,1),(1,10),(1,20),(1,25),(1,30),(1,40),(1,50),(1,60),(1,70),(1,80)})");
-  }
-  
-  @Test
-  public void setUnionEmptyBagsTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(setUnionTest);    
-    
-    writeLinesToFile("input", 
-                     "{(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)}\t{}",
-                     "{}\t{(1,10),(1,20),(1,30),(1,40),(1,50)}",
-                     "{}\t{}");
-                  
-    test.runScript();
-            
-    assertOutput(test, "data2",
-                 "({(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)})",
-                 "({(1,10),(1,20),(1,30),(1,40),(1,50)})",
-                 "({})");
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define SetDifference datafu.pig.sets.SetDifference();
-  
-  data = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)});
-  
-  data2 = FOREACH data GENERATE SetDifference(B1,B2);
-  
-  STORE data2 INTO 'output';
-   */
-  @Multiline
-  private String setDifferenceTwoBagsTest;
-  
-  @Test
-  public void setDifferenceTwoBagsTest() throws Exception
-  {    
-    PigTest test = createPigTestFromString(setDifferenceTwoBagsTest);    
-    
-    writeLinesToFile("input", 
-                     "{(1),(2),(3)}\t",
-                     "{(1),(2),(3)}\t{}",
-                     "\t{(1),(2),(3)}",
-                     "{}\t{(1),(2),(3)}",
-                     "{(1),(2),(3)}\t{(1)}",
-                     "{(1),(2),(3)}\t{(1),(2)}",
-                     "{(1),(2),(3)}\t{(1),(2),(3)}",
-                     "{(1),(2),(3)}\t{(1),(2),(3),(4)}",
-                     "{(1),(2),(3),(4),(5),(6)}\t{(3),(4)}");
-                  
-    test.runScript();
-            
-    assertOutput(test, "data2",
-                 "({(1),(2),(3)})",
-                 "({(1),(2),(3)})",
-                 "({})",
-                 "({})",
-                 "({(2),(3)})",
-                 "({(3)})",
-                 "({})",
-                 "({})",
-                 "({(1),(2),(5),(6)})");
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define SetDifference datafu.pig.sets.SetDifference();
-  
-  data = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)},B3:bag{T:tuple(val:int)});
-  
-  data2 = FOREACH data GENERATE SetDifference(B1,B2,B3);
-  
-  STORE data2 INTO 'output';
-   */
-  @Multiline
-  private String setDifferenceThreeBagsTest;
-  
-  @Test
-  public void setDifferenceThreeBagsTest() throws Exception
-  {    
-    PigTest test = createPigTestFromString(setDifferenceThreeBagsTest);    
-    
-    writeLinesToFile("input", 
-                     "{(1),(2),(3)}\t\t",
-                     "{(1),(2),(3)}\t{}\t",
-                     "{(1),(2),(3)}\t\t{}",
-                     "{(1),(2),(3)}\t{}\t{}",
-                     "{(1),(2),(2),(2),(3),(3)}\t{}\t{}",
-                     
-                     "{(1),(2),(3)}\t{(2)}\t{}",
-                     "{(1),(2),(3)}\t{}\t{(2)}",
-                     
-                     "{(1),(2),(3)}\t{(2)}\t{(3)}",
-                     
-                     "{(1),(2),(3)}\t{(2),(2)}\t{(3),(3)}",
-                     "{(1),(1),(1),(2),(2),(3),(3),(3)}\t{(2),(2)}\t{(3),(3)}",
-                     "{(1),(2),(3)}\t{(0),(2)}\t{(3)}",
-                     
-                     "{(1),(2),(3)}\t{(0),(2)}\t{(1),(3)}",
-                     "{(1),(2),(3)}\t{(0),(2)}\t{(1),(3),(4)}");
-                  
-    test.runScript();
-            
-    assertOutput(test, "data2",
-                 "({(1),(2),(3)})",
-                 "({(1),(2),(3)})",
-                 "({(1),(2),(3)})",
-                 "({(1),(2),(3)})",
-                 "({(1),(2),(3)})",
-
-                 "({(1),(3)})",
-                 "({(1),(3)})",
-                 
-                 "({(1)})",
-                 
-                 "({(1)})",
-                 "({(1)})",
-                 "({(1)})",
-                 
-                 "({})",
-                 "({})");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/stats/EstimationTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/stats/EstimationTests.java b/test/pig/datafu/test/pig/stats/EstimationTests.java
deleted file mode 100644
index 258f320..0000000
--- a/test/pig/datafu/test/pig/stats/EstimationTests.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.stats;
-
-import java.util.List;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.test.pig.PigTests;
-import static org.testng.Assert.*;
-
-public class EstimationTests extends PigTests
-{
-  /**
-  register $JAR_PATH
-  
-  define HyperLogLogPlusPlus datafu.pig.stats.HyperLogLogPlusPlus();
-  
-  data_in = LOAD 'input' as (val:int);
-    
-  data_out = FOREACH (GROUP data_in ALL) GENERATE
-    HyperLogLogPlusPlus(data_in) as cardinality;
-  
-  data_out = FOREACH data_out GENERATE cardinality;
-    
-  STORE data_out into 'output';
-   */
-  @Multiline private String hyperLogLogTest;
-  
-  @Test
-  public void hyperLogLogTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(hyperLogLogTest);
-
-    int count = 1000000;
-    String[] input = new String[count];
-    for (int i=0; i<count; i++)
-    {
-      input[i] = Integer.toString(i*10);
-    }
-    
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    double error = Math.abs(count-((Long)output.get(0).get(0)))/(double)count;
-    System.out.println("error: " + error*100.0 + "%");
-    assertTrue(error < 0.01);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/stats/MarkovPairTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/stats/MarkovPairTests.java b/test/pig/datafu/test/pig/stats/MarkovPairTests.java
deleted file mode 100644
index b2f618e..0000000
--- a/test/pig/datafu/test/pig/stats/MarkovPairTests.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.stats;
-
-import static org.testng.Assert.*;
-
-import java.util.Iterator;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.test.pig.PigTests;
-
-public class MarkovPairTests extends PigTests
-{
-  /**
-  register $JAR_PATH
-
-  define markovPairs datafu.pig.stats.MarkovPairs();
-  
-  data = load 'input' as $schema;
-  --describe data;
-  
-  data_out1 = foreach data generate data as orig_bag;
-  --describe data_out1;
-  
-  data_out = foreach data_out1 generate markovPairs(orig_bag) as markov_bag;
-  --describe data_out;
-  
-  store data_out into 'output';
-   */
-  @Multiline private String markovPairDefault;
-  
-  @Test
-  public void markovPairDefaultTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(markovPairDefault,
-                                 "schema=(data: bag {t: tuple(val:int)})");
-    
-    writeLinesToFile("input", "{(10),(20),(30),(40),(50),(60)}");
-    
-    String[] expectedOutput = {
-        "({((10),(20)),((20),(30)),((30),(40)),((40),(50)),((50),(60))})"
-      };
-    
-    test.runScript();
-    
-    Iterator<Tuple> actualOutput = test.getAlias("data_out");
-    
-    assertTuplesMatch(expectedOutput, actualOutput);
-  }
-  
-  @Test
-  public void markovPairMultipleInput() throws Exception
-  {    
-    PigTest test = createPigTestFromString(markovPairDefault,
-                                 "schema=(data: bag {t: tuple(val1:int,val2:int)})");
-    
-    writeLinesToFile("input", "{(10,100),(20,200),(30,300),(40,400),(50,500),(60,600)}");
-    
-    String[] expectedOutput = {
-        "({((10,100),(20,200)),((20,200),(30,300)),((30,300),(40,400)),((40,400),(50,500)),((50,500),(60,600))})"
-      };    
-    
-    
-    test.runScript();
-    
-    Iterator<Tuple> actualOutput = test.getAlias("data_out");
-    
-    assertTuplesMatch(expectedOutput, actualOutput);
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define markovPairs datafu.pig.stats.MarkovPairs('$lookahead');
-  
-  data = load 'input' as $schema;
-  --describe data;
-  
-  data_out1 = foreach data generate data as orig_bag;
-  --describe data_out1;
-  
-  data_out = foreach data_out1 generate markovPairs(orig_bag) as markov_bag;
-  --describe data_out;
-  
-  store data_out into 'output';
-   */
-  @Multiline private String markovPairLookahead;
-  
-  @Test
-  public void markovPairLookaheadTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(markovPairLookahead, 
-                                 "schema=(data: bag {t: tuple(val:int)})",
-                                 "lookahead=3");
-    
-    writeLinesToFile("input", "{(10),(20),(30),(40),(50)}");
-    
-    String[] expectedOutput = {
-        "({((10),(20)),((10),(30)),((10),(40)),((20),(30)),((20),(40)),((20),(50)),((30),(40)),((30),(50)),((40),(50))})"
-      };
-    
-    test.runScript();
-    
-    Iterator<Tuple> actualOutput = test.getAlias("data_out");
-    
-    assertTuplesMatch(expectedOutput, actualOutput);
-  }
-  
-  private void assertTuplesMatch(String[] expectedOutput, Iterator<Tuple> actualOutput)
-  {
-    Iterator<Tuple> tuples = actualOutput;
-    
-    for (String outputLine : expectedOutput)
-    {
-      assertTrue(tuples.hasNext());
-      Tuple outputTuple = tuples.next();
-      System.out.println(String.format("expected: %s", outputLine));
-      System.out.println(String.format("actual: %s", outputTuple.toString()));
-      assertEquals(outputLine,outputTuple.toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/stats/QuantileTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/stats/QuantileTests.java b/test/pig/datafu/test/pig/stats/QuantileTests.java
deleted file mode 100644
index 1814016..0000000
--- a/test/pig/datafu/test/pig/stats/QuantileTests.java
+++ /dev/null
@@ -1,696 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.stats;
-
-import static org.testng.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.framework.Assert;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.stats.QuantileUtil;
-import datafu.pig.stats.StreamingQuantile;
-import datafu.test.pig.PigTests;
-
-public class QuantileTests  extends PigTests
-{
-  /**
-  register $JAR_PATH
-  
-  define Quantile datafu.pig.stats.Quantile($QUANTILES);
-  
-  data_in = LOAD 'input' as (val:int);
-  
-  --describe data_in;
-  
-  data_out = GROUP data_in ALL;
-  
-  --describe data_out;
-  
-  data_out = FOREACH data_out {
-    sorted = ORDER data_in BY val;
-    GENERATE Quantile(sorted) as quantiles;
-  }
-  data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
-  
-  --describe data_out;
-  
-  STORE data_out into 'output';
-   */
-  @Multiline private String quantileTest;
-  
-  @Test
-  public void quantileTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileTest,
-                                 "QUANTILES='0.0','0.25','0.5','0.75','1.0'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,3.0,5.5,8.0,10.0)");
-  }
-  
-  @Test
-  public void quantile2Test() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileTest,
-                                 "QUANTILES='5'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,3.0,5.5,8.0,10.0)");
-  }
-
-  @Test
-  public void quantile3Test() throws Exception {
-    PigTest test = createPigTestFromString(quantileTest,
-                                 "QUANTILES='0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987'");
-
-    List<String> input = new ArrayList<String>();
-    for (int i=100000; i>=0; i--)
-    {
-      input.add(Integer.toString(i));
-    }
-    
-    writeLinesToFile("input", input.toArray(new String[0]));
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(130.0,2280.0,15870.0,50000.0,84130.0,97720.0,99870.0)");
-  }
-  
-  @Test
-  public void quantile4aTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileTest,
-                                 "QUANTILES='4'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
-  }
-  
-  @Test
-  public void quantile4bTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileTest,
-                                 "QUANTILES='0.0','0.333','0.666','1.0'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
-  }
-  
-  @Test
-  public void quantile5aTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileTest,
-                                 "QUANTILES='10'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
-  }
-  
-  @Test
-  public void quantile5bTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileTest,
-                                 "QUANTILES='0.0','0.111','0.222','0.333','0.444','0.555','0.666','0.777','0.888','1.0'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define Median datafu.pig.stats.Median();
-  
-  data_in = LOAD 'input' as (val:int);
-  
-  --describe data_in;
-  
-  data_out = GROUP data_in ALL;
-  
-  --describe data_out;
-  
-  data_out = FOREACH data_out {
-    sorted = ORDER data_in BY val;
-    GENERATE Median(sorted) as medians;
-  }
-  data_out = FOREACH data_out GENERATE FLATTEN(medians);
-  
-  --describe data_out;
-  
-  STORE data_out into 'output';
-   */
-  @Multiline private String medianTest;
-  
-  @Test
-  public void medianTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(medianTest);
-
-    String[] input = {"4","5","6","9","10","7","8","2","3","1"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(5.5)");
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define Median datafu.pig.stats.StreamingMedian();
-  
-  data_in = LOAD 'input' as (val:int);
-  
-  --describe data_in;
-  
-  data_out = GROUP data_in ALL;
-  
-  --describe data_out;
-  
-  data_out = FOREACH data_out {
-    GENERATE Median(data_in) as medians;
-  }
-  data_out = FOREACH data_out GENERATE FLATTEN(medians);
-  
-  --describe data_out;
-  
-  STORE data_out into 'output';
-   */
-  @Multiline private String streamingMedianTest;
-  
-  @Test
-  public void streamingMedianTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(streamingMedianTest);
-
-    String[] input = {"0","4","5","6","9","10","7","8","2","3","1"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(5.0)");
-  }
-  
-  /**
-  register $JAR_PATH
-
-  define Quantile datafu.pig.stats.StreamingQuantile($QUANTILES);
-  
-  data_in = LOAD 'input' as (val:int);
-  
-  --describe data_in;
-  
-  data_out = GROUP data_in ALL;
-  
-  --describe data_out;
-  
-  data_out = FOREACH data_out GENERATE Quantile(data_in.val) as quantiles;
-  data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
-  
-  --describe data_out;
-  
-  STORE data_out into 'output';
-   */
-  @Multiline private String streamingQuantileTest;
-
-  @Test
-  public void streamingQuantileTest() throws Exception {
-    PigTest test = createPigTestFromString(streamingQuantileTest,
-                                 "QUANTILES='5'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0,8.0,10.0)");
-  }
-  
-  @Test
-  public void streamingQuantile2Test() throws Exception {
-    PigTest test = createPigTestFromString(streamingQuantileTest,
-                                 "QUANTILES='0.5','0.75','1.0'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(5.0,8.0,10.0)");
-  }
-  
-  @Test
-  public void streamingQuantile3Test() throws Exception {
-    PigTest test = createPigTestFromString(streamingQuantileTest,
-                                 "QUANTILES='0.07','0.03','0.37','1.0','0.0'");
-
-    List<String> input = new ArrayList<String>();
-    for (int i=1000; i>=1; i--)
-    {
-      input.add(Integer.toString(i));
-    }
-    
-    writeLinesToFile("input", input.toArray(new String[0]));
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(70.0,30.0,370.0,1000.0,1.0)");
-  }
-  
-  @Test
-  public void streamingQuantile4Test() throws Exception {
-    PigTest test = createPigTestFromString(streamingQuantileTest,
-                                 "QUANTILES='0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987'");
-
-    List<String> input = new ArrayList<String>();
-    for (int i=100000; i>=0; i--)
-    {
-      input.add(Integer.toString(i));
-    }
-    
-    writeLinesToFile("input", input.toArray(new String[0]));
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(130.0,2280.0,15870.0,50000.0,84130.0,97720.0,99870.0)");
-  }
-  
-  @Test
-  public void streamingQuantile5aTest() throws Exception {
-    PigTest test = createPigTestFromString(streamingQuantileTest,
-                                 "QUANTILES='10'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
-  }
-  
-  @Test
-  public void streamingQuantile5bTest() throws Exception {
-    PigTest test = createPigTestFromString(streamingQuantileTest,
-                                 "QUANTILES='0.0','0.111','0.222','0.333','0.444','0.555','0.666','0.777','0.888','1.0'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
-  }
-  
-  @Test
-  public void streamingQuantile6Test() throws Exception {
-    PigTest test = createPigTestFromString(streamingQuantileTest,
-                                 "QUANTILES='0.0','0.333','0.666','1.0'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
-  }
-  
-  @Test
-  public void streamingQuantile7Test() throws Exception {
-    PigTest test = createPigTestFromString(streamingQuantileTest,
-                                 "QUANTILES='4'");
-
-    String[] input = {"1","2","3","4","10","5","6","7","8","9"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
-  }
-  
-  @Test
-  public void streamingQuantileExecTest() throws Exception {
-    StreamingQuantile quantile = new StreamingQuantile("4");
-    
-    DataBag bag;
-    Tuple input;
-    Tuple result;
-    
-    bag = BagFactory.getInstance().newDefaultBag();
-    for (int i=1; i<=10; i++)
-    {
-      Tuple t = TupleFactory.getInstance().newTuple(1);
-      t.set(0, i);
-      bag.add(t);
-    }
-    input = TupleFactory.getInstance().newTuple(bag);
-    result = quantile.exec(input);
-    Assert.assertEquals(4, result.size());
-    Assert.assertEquals(1.0, result.get(0));
-    Assert.assertEquals(4.0, result.get(1));
-    Assert.assertEquals(7.0, result.get(2));
-    Assert.assertEquals(10.0, result.get(3));
-    
-    // do twice to check cleanup works
-    
-    bag = BagFactory.getInstance().newDefaultBag();
-    for (int i=11; i<=20; i++)
-    {
-      Tuple t = TupleFactory.getInstance().newTuple(1);
-      t.set(0, i);
-      bag.add(t);
-    }
-    input = TupleFactory.getInstance().newTuple(bag);
-    result = quantile.exec(input);
-    Assert.assertEquals(4, result.size());
-    Assert.assertEquals(11.0, result.get(0));
-    Assert.assertEquals(14.0, result.get(1));
-    Assert.assertEquals(17.0, result.get(2));
-    Assert.assertEquals(20.0, result.get(3));
-  }
-  
-  @Test
-  public void streamingQuantileAccumulateTest() throws Exception {
-    StreamingQuantile quantile = new StreamingQuantile("4");
-    
-    Tuple result;    
-    
-    for (int i=1; i<=10; i++)
-    {
-      Tuple t = TupleFactory.getInstance().newTuple(1);
-      t.set(0, i);
-      DataBag bag = BagFactory.getInstance().newDefaultBag();
-      bag.add(t);
-      Tuple input = TupleFactory.getInstance().newTuple(bag);
-      quantile.accumulate(input);
-    }
-    result = quantile.getValue();
-    Assert.assertEquals(4, result.size());
-    Assert.assertEquals(1.0, result.get(0));
-    Assert.assertEquals(4.0, result.get(1));
-    Assert.assertEquals(7.0, result.get(2));
-    Assert.assertEquals(10.0, result.get(3));
-    
-    // do twice to check cleanup works
-    quantile.cleanup();
-    
-    for (int i=11; i<=20; i++)
-    {
-      Tuple t = TupleFactory.getInstance().newTuple(1);
-      t.set(0, i);
-      DataBag bag = BagFactory.getInstance().newDefaultBag();
-      bag.add(t);
-      Tuple input = TupleFactory.getInstance().newTuple(bag);
-      quantile.accumulate(input);
-    }
-    result = quantile.getValue();
-    Assert.assertEquals(4, result.size());
-    Assert.assertEquals(11.0, result.get(0));
-    Assert.assertEquals(14.0, result.get(1));
-    Assert.assertEquals(17.0, result.get(2));
-    Assert.assertEquals(20.0, result.get(3));
-  }
-  
-  @Test
-  public void quantileParamsTest() throws Exception {
-    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("5");
-    
-    assertEquals(quantiles.size(),5);
-    assertAboutEqual(quantiles.get(0), 0.0);
-    assertAboutEqual(quantiles.get(1), 0.25);
-    assertAboutEqual(quantiles.get(2), 0.5);
-    assertAboutEqual(quantiles.get(3), 0.75);
-    assertAboutEqual(quantiles.get(4), 1.0);
-  }
-  
-  @Test
-  public void quantileParamsTest2() throws Exception {
-    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("2");
-    
-    assertEquals(quantiles.size(),2);
-    assertAboutEqual(quantiles.get(0), 0.0);
-    assertAboutEqual(quantiles.get(1), 1.0);
-  }
-  
-  @Test
-  public void quantileParamsTest3() throws Exception {
-    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("11");
-    
-    assertEquals(quantiles.size(),11);
-    assertAboutEqual(quantiles.get(0), 0.0);
-    assertAboutEqual(quantiles.get(1), 0.1);
-    assertAboutEqual(quantiles.get(2), 0.2);
-    assertAboutEqual(quantiles.get(3), 0.3);
-    assertAboutEqual(quantiles.get(4), 0.4);
-    assertAboutEqual(quantiles.get(5), 0.5);
-    assertAboutEqual(quantiles.get(6), 0.6);
-    assertAboutEqual(quantiles.get(7), 0.7);
-    assertAboutEqual(quantiles.get(8), 0.8);
-    assertAboutEqual(quantiles.get(9), 0.9);
-    assertAboutEqual(quantiles.get(10), 1.0);
-  }
-  
-  @Test
-  public void quantileParamsTest4() throws Exception {
-    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("10");
-    
-    assertEquals(quantiles.size(),10);
-    assertAboutEqual(quantiles.get(0), 0.0);
-    assertAboutEqual(quantiles.get(1), 0.11111);
-    assertAboutEqual(quantiles.get(2), 0.22222);
-    assertAboutEqual(quantiles.get(3), 0.33333);
-    assertAboutEqual(quantiles.get(4), 0.44444);
-    assertAboutEqual(quantiles.get(5), 0.55555);
-    assertAboutEqual(quantiles.get(6), 0.66666);
-    assertAboutEqual(quantiles.get(7), 0.77777);
-    assertAboutEqual(quantiles.get(8), 0.88888);
-    assertAboutEqual(quantiles.get(9), 1.0);
-  }
-  
-  @Test
-  public void quantileParamsTest5() throws Exception {
-    List<Double> quantiles = QuantileUtil.getQuantilesFromParams("4");
-    
-    assertEquals(quantiles.size(),4);
-    assertAboutEqual(quantiles.get(0), 0.0);
-    assertAboutEqual(quantiles.get(1), 0.333);
-    assertAboutEqual(quantiles.get(2), 0.666);
-    assertAboutEqual(quantiles.get(3), 1.0);
-  }
-  
-  private void assertAboutEqual(double actual, double expected) {
-    assertTrue(Math.abs(actual-expected) < 0.001);
-  }
-  
-  /**
-  register $JAR_PATH
-  
-  define Quantile datafu.pig.stats.$UDF($QUANTILES);
-  
-  data_in = LOAD 'input' as (val:int);
-  
-  --describe data_in;
-  
-  data_out = GROUP data_in ALL;
-  
-  --describe data_out;
-  
-  data_out = FOREACH data_out {
-    sorted = ORDER data_in BY val;
-    GENERATE Quantile(sorted) as quantiles;
-  }
-  data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
-  
-  --describe data_out;
-  
-  data_out = FOREACH data_out GENERATE $EXPECTED_OUTPUT;
-  
-  STORE data_out into 'output';
-   */
-  @Multiline private String quantileSchemaTest;
-  
-  @Test
-  public void quantileSchemaTest() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileSchemaTest,
-                                 "UDF=Quantile",
-                                 "QUANTILES='0.0','0.5','1.0'",
-                                 "EXPECTED_OUTPUT=quantiles::quantile_0_0, "+
-                                                 "quantiles::quantile_0_5, "+
-                                                 "quantiles::quantile_1_0");
-
-    String[] input = {"1","5","3","4","2"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
-  }
-  
-  @Test
-  public void quantileSchemaTest2() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileSchemaTest,
-                                 "UDF=Quantile", 
-                                 "QUANTILES='3'",
-                                 "EXPECTED_OUTPUT=quantiles::quantile_0, "+
-                                                 "quantiles::quantile_1, "+
-                                                 "quantiles::quantile_2");
-
-    String[] input = {"1","5","3","4","2"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
-  }
-  
-  @Test
-  public void quantileSchemaTest3() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileSchemaTest,
-                                 "UDF=StreamingQuantile", 
-                                 "QUANTILES='0.0','0.5','1.0'",
-                                 "EXPECTED_OUTPUT=quantiles::quantile_0_0, "+
-                                                 "quantiles::quantile_0_5, "+
-                                                 "quantiles::quantile_1_0");
-
-    String[] input = {"1","5","3","4","2"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
-  }
-  
-  @Test
-  public void quantileSchemaTest4() throws Exception
-  {
-    PigTest test = createPigTestFromString(quantileSchemaTest,
-                                 "UDF=StreamingQuantile", 
-                                 "QUANTILES='3'",
-                                 "EXPECTED_OUTPUT=quantiles::quantile_0, "+
-                                                 "quantiles::quantile_1, "+
-                                                 "quantiles::quantile_2");
-
-    String[] input = {"1","5","3","4","2"};
-    writeLinesToFile("input", input);
-        
-    test.runScript();
-    
-    List<Tuple> output = getLinesForAlias(test, "data_out", true);
-    
-    assertEquals(output.size(),1);
-    assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
-  }
-}


Mime
View raw message