http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
deleted file mode 100644
index bd34881..0000000
--- a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sampling;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.sampling.SimpleRandomSample;
-import datafu.test.pig.PigTests;
-
-/**
- * Tests for {@link SimpleRandomSample}.
- *
- * @author ximeng
- *
- */
-public class SimpleRandomSampleTest extends PigTests
-{
- /**
- * register $JAR_PATH
- *
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
- *
- * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
- *
- * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p) as sample_data;
- *
- * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
- *
- * STORE sampled INTO 'output';
- */
- @Multiline
- private String simpleRandomSampleTest;
-
- /**
- * register $JAR_PATH
- *
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
- *
- * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
- *
- * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p, $n1) as sample_data;
- *
- * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
- *
- * STORE sampled INTO 'output';
- */
- @Multiline
- private String simpleRandomSampleWithLowerBoundTest;
-
- /**
- * register $JAR_PATH
- *
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
- *
- * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
- *
- * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p1) as sample_1, SRS(data,
- * $p2) AS sample_2;
- *
- * sampled = FOREACH sampled GENERATE COUNT(sample_1) AS sample_count_1, COUNT(sample_2)
- * AS sample_count_2;
- *
- * STORE sampled INTO 'output';
- */
- @Multiline
- private String simpleRandomSampleWithTwoCallsTest;
-
- @Test
- public void testSimpleRandomSample() throws Exception
- {
- writeLinesToFile("input",
- "A1\tB1\t1",
- "A1\tB1\t4",
- "A1\tB3\t4",
- "A1\tB4\t4",
- "A2\tB1\t4",
- "A2\tB2\t4",
- "A3\tB1\t3",
- "A3\tB1\t1",
- "A3\tB3\t77",
- "A4\tB1\t3",
- "A4\tB2\t3",
- "A4\tB3\t59",
- "A4\tB4\t29",
- "A5\tB1\t4",
- "A6\tB2\t3",
- "A6\tB2\t55",
- "A6\tB3\t1",
- "A7\tB1\t39",
- "A7\tB2\t27",
- "A7\tB3\t85",
- "A8\tB1\t4",
- "A8\tB2\t45",
- "A9\tB3\t92",
- "A9\tB3\t0",
- "A9\tB6\t42",
- "A9\tB5\t1",
- "A10\tB1\t7",
- "A10\tB2\t23",
- "A10\tB2\t1",
- "A10\tB2\t31",
- "A10\tB6\t41",
- "A10\tB7\t52");
-
- int n = 32;
- double p = 0.3;
- int s = (int) Math.ceil(p * n);
-
- PigTest test = createPigTestFromString(simpleRandomSampleTest, "p=" + p);
- test.runScript();
- assertOutput(test, "sampled", "(" + s + ")");
-
- int n1 = 30;
- PigTest testWithLB =
- createPigTestFromString(simpleRandomSampleWithLowerBoundTest, "p=" + p, "n1="
- + n1);
- testWithLB.runScript();
- assertOutput(testWithLB, "sampled", "(" + s + ")");
-
- double p1 = 0.05;
- double p2 = 0.95;
- int s1 = (int) Math.ceil(p1 * n);
- int s2 = (int) Math.ceil(p2 * n);
-
- PigTest testWithTwoCalls =
- createPigTestFromString(simpleRandomSampleWithTwoCallsTest, "p1=" + p1, "p2="
- + p2);
- testWithTwoCalls.runScript();
- assertOutput(testWithTwoCalls, "sampled", "(" + s1 + "," + s2 + ")");
-
- test.runScript();
-
- }
-
- /**
- * register $JAR_PATH
- *
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
- *
- * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
- *
- * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data,
- * $SAMPLING_PROBABILITY) as sample_data;
- *
- * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
- *
- * sampled = ORDER sampled BY group;
- *
- * STORE sampled INTO 'output';
- */
- @Multiline
- private String stratifiedSampleTest;
-
- @Test
- public void testStratifiedSample() throws Exception
- {
- writeLinesToFile("input",
- "A1\tB1\t1",
- "A1\tB1\t4",
- "A1\tB3\t4",
- "A1\tB4\t4",
- "A2\tB1\t4",
- "A2\tB2\t4",
- "A3\tB1\t3",
- "A3\tB1\t1",
- "A3\tB3\t77",
- "A4\tB1\t3",
- "A4\tB2\t3",
- "A4\tB3\t59",
- "A4\tB4\t29",
- "A5\tB1\t4",
- "A6\tB2\t3",
- "A6\tB2\t55",
- "A6\tB3\t1",
- "A7\tB1\t39",
- "A7\tB2\t27",
- "A7\tB3\t85",
- "A8\tB1\t4",
- "A8\tB2\t45",
- "A9\tB3\t92",
- "A9\tB3\t0",
- "A9\tB6\t42",
- "A9\tB5\t1",
- "A10\tB1\t7",
- "A10\tB2\t23",
- "A10\tB2\t1",
- "A10\tB2\t31",
- "A10\tB6\t41",
- "A10\tB7\t52");
-
- double p = 0.5;
-
- PigTest test =
- createPigTestFromString(stratifiedSampleTest, "SAMPLING_PROBABILITY=" + p);
- test.runScript();
- assertOutput(test,
- "sampled",
- "(A1,2)",
- "(A10,3)",
- "(A2,1)",
- "(A3,2)",
- "(A4,2)",
- "(A5,1)",
- "(A6,2)",
- "(A7,2)",
- "(A8,1)",
- "(A9,2)");
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
deleted file mode 100644
index 15e1fd6..0000000
--- a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sampling;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.sampling.SimpleRandomSample;
-import datafu.test.pig.PigTests;
-
-/**
- * Tests for {@link SimpleRandomSample}.
- *
- * @deprecated This tests the deprecated functionality of SimpleRandomSample
- * where the probability can be specified in the constructor.
- * @author ximeng
- *
- */
-public class SimpleRandomSampleTestOld extends PigTests
-{
- /**
- * register $JAR_PATH
- *
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
- *
- * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
- *
- * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data) as sample_data;
- *
- * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
- *
- * STORE sampled INTO 'output';
- */
- @Multiline
- private String simpleRandomSampleTest;
-
- @Test
- public void simpleRandomSampleTest() throws Exception
- {
- writeLinesToFile("input",
- "A1\tB1\t1",
- "A1\tB1\t4",
- "A1\tB3\t4",
- "A1\tB4\t4",
- "A2\tB1\t4",
- "A2\tB2\t4",
- "A3\tB1\t3",
- "A3\tB1\t1",
- "A3\tB3\t77",
- "A4\tB1\t3",
- "A4\tB2\t3",
- "A4\tB3\t59",
- "A4\tB4\t29",
- "A5\tB1\t4",
- "A6\tB2\t3",
- "A6\tB2\t55",
- "A6\tB3\t1",
- "A7\tB1\t39",
- "A7\tB2\t27",
- "A7\tB3\t85",
- "A8\tB1\t4",
- "A8\tB2\t45",
- "A9\tB3\t92",
- "A9\tB3\t0",
- "A9\tB6\t42",
- "A9\tB5\t1",
- "A10\tB1\t7",
- "A10\tB2\t23",
- "A10\tB2\t1",
- "A10\tB2\t31",
- "A10\tB6\t41",
- "A10\tB7\t52");
-
- int n = 32;
- double p = 0.3;
- int s = (int) Math.ceil(p * n);
- PigTest test =
- createPigTestFromString(simpleRandomSampleTest, "SAMPLING_PROBABILITY=" + p);
-
- test.runScript();
-
- assertOutput(test, "sampled", "(" + s + ")");
- }
-
- /**
- * register $JAR_PATH
- *
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
- *
- * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
- *
- * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data) as sample_data;
- *
- * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
- *
- * sampled = ORDER sampled BY group;
- *
- * STORE sampled INTO 'output';
- */
- @Multiline
- private String stratifiedSampleTest;
-
- @Test
- public void stratifiedSampleTest() throws Exception
- {
- writeLinesToFile("input",
- "A1\tB1\t1",
- "A1\tB1\t4",
- "A1\tB3\t4",
- "A1\tB4\t4",
- "A2\tB1\t4",
- "A2\tB2\t4",
- "A3\tB1\t3",
- "A3\tB1\t1",
- "A3\tB3\t77",
- "A4\tB1\t3",
- "A4\tB2\t3",
- "A4\tB3\t59",
- "A4\tB4\t29",
- "A5\tB1\t4",
- "A6\tB2\t3",
- "A6\tB2\t55",
- "A6\tB3\t1",
- "A7\tB1\t39",
- "A7\tB2\t27",
- "A7\tB3\t85",
- "A8\tB1\t4",
- "A8\tB2\t45",
- "A9\tB3\t92",
- "A9\tB3\t0",
- "A9\tB6\t42",
- "A9\tB5\t1",
- "A10\tB1\t7",
- "A10\tB2\t23",
- "A10\tB2\t1",
- "A10\tB2\t31",
- "A10\tB6\t41",
- "A10\tB7\t52");
-
- double p = 0.5;
-
- PigTest test =
- createPigTestFromString(stratifiedSampleTest, "SAMPLING_PROBABILITY=" + p);
- test.runScript();
- assertOutput(test,
- "sampled",
- "(A1,2)",
- "(A10,3)",
- "(A2,1)",
- "(A3,2)",
- "(A4,2)",
- "(A5,1)",
- "(A6,2)",
- "(A7,2)",
- "(A8,1)",
- "(A9,2)");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
deleted file mode 100644
index 338b3e7..0000000
--- a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleWithReplacementTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sampling;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.sampling.SimpleRandomSampleWithReplacementElect;
-import datafu.pig.sampling.SimpleRandomSampleWithReplacementVote;
-import datafu.test.pig.PigTests;
-
-/**
- * Tests for {@link SimpleRandomSampleWithReplacementVote} and
- * {@link SimpleRandomSampleWithReplacementElect}.
- *
- * @author ximeng
- *
- */
-public class SimpleRandomSampleWithReplacementTest extends PigTests
-{
- // @formatter:off
- /**
- * register $JAR_PATH
- *
- * DEFINE SRSWR_VOTE datafu.pig.sampling.SimpleRandomSampleWithReplacementVote();
- *
- * DEFINE SRSWR_ELECT datafu.pig.sampling.SimpleRandomSampleWithReplacementElect();
- *
- * item = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
- *
- * summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
- *
- * candidates = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRSWR_VOTE(item, $SAMPLE_SIZE, summary.count));
- *
- * sampled = FOREACH (GROUP candidates BY position) GENERATE FLATTEN(SRSWR_ELECT(candidates));
- *
- * sampled = FOREACH (GROUP sampled ALL) GENERATE COUNT(sampled) AS size;
- *
- * STORE sampled INTO 'output';
- */
- @Multiline
- private String simpleRandomSampleWithReplacementTest;
- // @formatter:on
-
- @Test
- public void testSimpleRandomSampleWithReplacement() throws Exception
- {
- writeLinesToFile("input",
- "A1\tB1\t1",
- "A1\tB1\t4",
- "A1\tB3\t4",
- "A1\tB4\t4",
- "A2\tB1\t4",
- "A2\tB2\t4",
- "A3\tB1\t3",
- "A3\tB1\t1",
- "A3\tB3\t77",
- "A4\tB1\t3",
- "A4\tB2\t3",
- "A4\tB3\t59",
- "A4\tB4\t29",
- "A5\tB1\t4",
- "A6\tB2\t3",
- "A6\tB2\t55",
- "A6\tB3\t1",
- "A7\tB1\t39",
- "A7\tB2\t27",
- "A7\tB3\t85",
- "A8\tB1\t4",
- "A8\tB2\t45",
- "A9\tB3\t92",
- "A9\tB3\t0",
- "A9\tB6\t42",
- "A9\tB5\t1",
- "A10\tB1\t7",
- "A10\tB2\t23",
- "A10\tB2\t1",
- "A10\tB2\t31",
- "A10\tB6\t41",
- "A10\tB7\t52");
-
- int s = 32;
- PigTest test =
- createPigTestFromString(simpleRandomSampleWithReplacementTest, "SAMPLE_SIZE=" + s
- + "L");
-
- test.runScript();
-
- assertOutput(test, "sampled", "(" + s + ")");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
deleted file mode 100644
index 9bab8a0..0000000
--- a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sampling;
-
-import java.io.IOException;
-import java.util.*;
-
-import junit.framework.Assert;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.sampling.WeightedReservoirSample;
-import datafu.test.pig.PigTests;
-
-/**
- * Tests for {@link WeightedReservoirSample}.
- *
- * @author wjian
- *
- */
-public class WeightedReservoirSamplingTests extends PigTests
-{
- /**
- register $JAR_PATH
-
- DEFINE ReservoirSample datafu.pig.sampling.WeightedReservoirSample('$RESERVOIR_SIZE','2');
- DEFINE Assert datafu.pig.util.Assert();
-
- data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int, W:double);
- sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C,W)) as sample_data;
- sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
- sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
- STORE sampled INTO 'output';
-
- */
- @Multiline
- private String weightedReservoirSampleGroupTest;
-
- /**
- * Verifies that WeightedReservoirSample works when data grouped by a key.
- * In particular it ensures that the reservoir is not reused across keys.
- *
- * <p>
- * This confirms the fix for DATAFU-11.
- * </p>
- *
- * @throws Exception
- */
- @Test
- public void weightedReservoirSampleGroupTest() throws Exception
- {
- // first value is the key. second to last value matches the key so we can
- // verify the register is reset for each key. values should not
- // bleed across to other keys.
- writeLinesToFile("input",
- "1\tB1\t1\t1.0",
- "1\tB1\t1\t1.0",
- "1\tB3\t1\t1.0",
- "1\tB4\t1\t1.0",
- "2\tB1\t2\t1.0",
- "2\tB2\t2\t1.0",
- "3\tB1\t3\t1.0",
- "3\tB1\t3\t1.0",
- "3\tB3\t3\t1.0",
- "4\tB1\t4\t1.0",
- "4\tB2\t4\t1.0",
- "4\tB3\t4\t1.0",
- "4\tB4\t4\t1.0",
- "5\tB1\t5\t1.0",
- "6\tB2\t6\t1.0",
- "6\tB2\t6\t1.0",
- "6\tB3\t6\t1.0",
- "7\tB1\t7\t1.0",
- "7\tB2\t7\t1.0",
- "7\tB3\t7\t1.0",
- "8\tB1\t8\t1.0",
- "8\tB2\t8\t1.0",
- "9\tB3\t9\t1.0",
- "9\tB3\t9\t1.0",
- "9\tB6\t9\t1.0",
- "9\tB5\t9\t1.0",
- "10\tB1\t10\t1.0",
- "10\tB2\t10\t1.0",
- "10\tB2\t10\t1.0",
- "10\tB2\t10\t1.0",
- "10\tB6\t10\t1.0",
- "10\tB7\t10\t1.0");
-
- for(int i=1; i<=3; i++) {
- int reservoirSize = i ;
- PigTest test = createPigTestFromString(weightedReservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
- test.runScript();
-
- List<Tuple> tuples = getLinesForAlias(test, "sampled");
-
- for (Tuple tuple : tuples)
- {
- Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
- }
- }
- }
-
- /**
- register $JAR_PATH
-
- define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
-
- data = LOAD 'input' AS (v1:chararray,v2:INT);
- data_g = group data all;
- sampled = FOREACH data_g GENERATE WeightedSample(data);
- --describe sampled;
-
- STORE sampled INTO 'output';
-
- */
- @Multiline
- private String weightedSampleTest;
-
- @Test
- public void weightedSampleTest() throws Exception
- {
- Map<String, Integer> count = new HashMap<String, Integer>();
-
- count.put("a", 0);
- count.put("b", 0);
- count.put("c", 0);
- count.put("d", 0);
-
- PigTest test = createPigTestFromString(weightedSampleTest);
-
- writeLinesToFile("input",
- "a\t100","b\t1","c\t5","d\t2");
-
- for(int i = 0; i < 10; i++) {
- test.runScript();
-
- List<Tuple> output = this.getLinesForAlias(test, "sampled");
-
- Tuple t = output.get(0);
-
- DataBag sampleBag = (DataBag)t.get(0);
-
- for(Iterator<Tuple> sampleIter = sampleBag.iterator(); sampleIter.hasNext();) {
- Tuple st = sampleIter.next();
- String key = (String)st.get(0);
- count.put(key, count.get(key) + 1);
- }
- }
-
- String maxKey = "";
- int maxCount = 0;
- for(String key : count.keySet()) {
- if(maxCount < count.get(key)) {
- maxKey = key;
- maxCount = count.get(key);
- }
- }
-
- Assert.assertEquals(maxKey, "a");
- }
-
- @Test
- public void weightedReservoirSampleAccumulateTest() throws IOException
- {
- WeightedReservoirSample sampler = new WeightedReservoirSample("10", "1");
-
- for (int i=0; i<100; i++)
- {
- Tuple t = TupleFactory.getInstance().newTuple(2);
- t.set(0, i);
- t.set(1, i + 1);
- DataBag bag = BagFactory.getInstance().newDefaultBag();
- bag.add(t);
- Tuple input = TupleFactory.getInstance().newTuple(bag);
- sampler.accumulate(input);
- }
-
- DataBag result = sampler.getValue();
- verifyNoRepeatAllFound(result, 10, 0, 100);
- }
-
- @Test
- public void weightedReservoirSampleAlgebraicTest() throws IOException
- {
- WeightedReservoirSample.Initial initialSampler = new WeightedReservoirSample.Initial("10", "1");
- WeightedReservoirSample.Intermediate intermediateSampler = new WeightedReservoirSample.Intermediate("10", "1");
- WeightedReservoirSample.Final finalSampler = new WeightedReservoirSample.Final("10", "1");
-
- DataBag bag = BagFactory.getInstance().newDefaultBag();
- for (int i=0; i<100; i++)
- {
- Tuple t = TupleFactory.getInstance().newTuple(2);
- t.set(0, i);
- t.set(1, i + 1);
- bag.add(t);
- }
-
- Tuple input = TupleFactory.getInstance().newTuple(bag);
-
- Tuple intermediateTuple = initialSampler.exec(input);
- DataBag intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
- intermediateTuple = intermediateSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
- intermediateBag = BagFactory.getInstance().newDefaultBag(Arrays.asList(intermediateTuple));
- DataBag result = finalSampler.exec(TupleFactory.getInstance().newTuple(intermediateBag));
- verifyNoRepeatAllFound(result, 10, 0, 100);
- }
-
- private void verifyNoRepeatAllFound(DataBag result,
- int expectedResultSize,
- int left,
- int right) throws ExecException
- {
- Assert.assertEquals(expectedResultSize, result.size());
-
- // all must be found, no repeats
- Set<Integer> found = new HashSet<Integer>();
- for (Tuple t : result)
- {
- Integer i = (Integer)t.get(0);
- Assert.assertTrue(i>=left && i<right);
- Assert.assertFalse(String.format("Found duplicate of %d",i), found.contains(i));
- found.add(i);
- }
- }
-
- @Test
- public void weightedReservoirSampleLimitExecTest() throws IOException
- {
- WeightedReservoirSample sampler = new WeightedReservoirSample("100", "1");
-
- DataBag bag = BagFactory.getInstance().newDefaultBag();
- for (int i=0; i<10; i++)
- {
- Tuple t = TupleFactory.getInstance().newTuple(2);
- t.set(0, i);
- t.set(1, 1); // score is equal for all
- bag.add(t);
- }
-
- Tuple input = TupleFactory.getInstance().newTuple(1);
- input.set(0, bag);
-
- DataBag result = sampler.exec(input);
-
- verifyNoRepeatAllFound(result, 10, 0, 10);
-
- Set<Integer> found = new HashSet<Integer>();
- for (Tuple t : result)
- {
- Integer i = (Integer)t.get(0);
- found.add(i);
- }
-
- for(int i = 0; i < 10; i++)
- {
- Assert.assertTrue(found.contains(i));
- }
- }
-
- @Test
- public void invalidConstructorArgTest() throws Exception
- {
- try {
- WeightedReservoirSample sampler = new WeightedReservoirSample("1", "-1");
- Assert.fail( "Testcase should fail");
- } catch (Exception ex) {
- Assert.assertTrue(ex.getMessage().indexOf("Invalid negative index of weight field argument for WeightedReserviorSample constructor: -1") >= 0);
- }
- }
-
- @Test
- public void invalidWeightTest() throws Exception
- {
- PigTest test = createPigTestFromString(weightedSampleTest);
-
- writeLinesToFile("input",
- "a\t100","b\t1","c\t0","d\t2");
- try {
- test.runScript();
- List<Tuple> output = this.getLinesForAlias(test, "sampled");
- Assert.fail( "Testcase should fail");
- } catch (Exception ex) {
- }
- }
-
- /**
- register $JAR_PATH
-
- define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','1');
-
- data = LOAD 'input' AS (v1:chararray);
- data_g = group data all;
- sampled = FOREACH data_g GENERATE WeightedSample(data);
- describe sampled;
-
- STORE sampled INTO 'output';
-
- */
- @Multiline
- private String invalidInputTupleSizeTest;
-
- @Test
- public void invalidInputTupleSizeTest() throws Exception
- {
- PigTest test = createPigTestFromString(invalidInputTupleSizeTest);
-
- writeLinesToFile("input",
- "a","b","c","d");
- try {
- test.runScript();
- List<Tuple> output = this.getLinesForAlias(test, "sampled");
- Assert.fail( "Testcase should fail");
- } catch (Exception ex) {
- Assert.assertTrue(ex.getMessage().indexOf("The field schema of the input tuple is null or the tuple size is no more than the weight field index: 1") >= 0);
- }
- }
-
- /**
- register $JAR_PATH
-
- define WeightedSample datafu.pig.sampling.WeightedReservoirSample('1','0');
-
- data = LOAD 'input' AS (v1:chararray, v2:INT);
- data_g = group data all;
- sampled = FOREACH data_g GENERATE WeightedSample(data);
- describe sampled;
-
- STORE sampled INTO 'output';
-
- */
- @Multiline
- private String invalidWeightFieldSchemaTest;
-
- @Test
- public void invalidWeightFieldSchemaTest() throws Exception
- {
- PigTest test = createPigTestFromString(invalidWeightFieldSchemaTest);
-
- writeLinesToFile("input",
- "a\t100","b\t1","c\t5","d\t2");
- try {
- test.runScript();
- List<Tuple> output = this.getLinesForAlias(test, "sampled");
- Assert.fail( "Testcase should fail");
- } catch (Exception ex) {
- Assert.assertTrue(ex.getMessage().indexOf("Expect the type of the weight field of the input tuple to be of ([int, long, float, double]), but instead found (chararray), weight field: 0") >= 0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sessions/SessionTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sessions/SessionTests.java b/test/pig/datafu/test/pig/sessions/SessionTests.java
deleted file mode 100644
index 084f970..0000000
--- a/test/pig/datafu/test/pig/sessions/SessionTests.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sessions;
-
-import static org.testng.Assert.*;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import junit.framework.Assert;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.pigunit.PigTest;
-import org.joda.time.DateTime;
-import org.testng.annotations.Test;
-
-import datafu.pig.sessions.SessionCount;
-import datafu.pig.sessions.Sessionize;
-import datafu.test.pig.PigTests;
-
-public class SessionTests extends PigTests
-{
- /**
- register $JAR_PATH
-
- define Sessionize datafu.pig.sessions.Sessionize('$TIME_WINDOW');
-
- views = LOAD 'input' AS (time:$TIME_TYPE, user_id:int, value:int);
-
- views_grouped = GROUP views BY user_id;
- view_counts = FOREACH views_grouped {
- views = ORDER views BY time;
- GENERATE flatten(Sessionize(views)) as (time,user_id,value,session_id);
- }
-
- max_value = GROUP view_counts BY (user_id, session_id);
-
- max_value = FOREACH max_value GENERATE group.user_id, MAX(view_counts.value) AS val;
-
- STORE max_value INTO 'output';
- */
- @Multiline private String sessionizeTest;
-
- private String[] inputData = new String[] {
- "2010-01-01T01:00:00Z\t1\t10",
- "2010-01-01T01:15:00Z\t1\t20",
- "2010-01-01T01:31:00Z\t1\t10",
- "2010-01-01T01:35:00Z\t1\t20",
- "2010-01-01T02:30:00Z\t1\t30",
-
- "2010-01-01T01:00:00Z\t2\t10",
- "2010-01-01T01:31:00Z\t2\t20",
- "2010-01-01T02:10:00Z\t2\t30",
- "2010-01-01T02:40:30Z\t2\t40",
- "2010-01-01T03:30:00Z\t2\t50",
-
- "2010-01-01T01:00:00Z\t3\t10",
- "2010-01-01T01:01:00Z\t3\t20",
- "2010-01-01T01:02:00Z\t3\t5",
- "2010-01-01T01:10:00Z\t3\t25",
- "2010-01-01T01:15:00Z\t3\t50",
- "2010-01-01T01:25:00Z\t3\t30",
- "2010-01-01T01:30:00Z\t3\t15"
- };
-
- @Test
- public void sessionizeTest() throws Exception
- {
- PigTest test = createPigTestFromString(sessionizeTest,
- "TIME_WINDOW=30m",
- "JAR_PATH=" + getJarPath(),
- "TIME_TYPE=chararray");
-
- this.writeLinesToFile("input",
- inputData);
-
- test.runScript();
-
- HashMap<Integer,HashMap<Integer,Boolean>> userValues = new HashMap<Integer,HashMap<Integer,Boolean>>();
-
- for (Tuple t : this.getLinesForAlias(test, "max_value"))
- {
- Integer userId = (Integer)t.get(0);
- Integer max = (Integer)t.get(1);
- if (!userValues.containsKey(userId))
- {
- userValues.put(userId, new HashMap<Integer,Boolean>());
- }
- userValues.get(userId).put(max, true);
- }
-
- assertEquals(userValues.get(1).size(), 2);
- assertEquals(userValues.get(2).size(), 5);
- assertEquals(userValues.get(3).size(), 1);
-
- assertTrue(userValues.get(1).containsKey(20));
- assertTrue(userValues.get(1).containsKey(30));
-
- assertTrue(userValues.get(2).containsKey(10));
- assertTrue(userValues.get(2).containsKey(20));
- assertTrue(userValues.get(2).containsKey(30));
- assertTrue(userValues.get(2).containsKey(40));
- assertTrue(userValues.get(2).containsKey(50));
-
- assertTrue(userValues.get(3).containsKey(50));
- }
-
- private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
-
- @Test
- public void sessionizeLongTest() throws Exception
- {
- PigTest test = createPigTestFromString(sessionizeTest,
- "TIME_WINDOW=30m",
- "JAR_PATH=" + getJarPath(),
- "TIME_TYPE=long");
-
- List<String> lines = new ArrayList<String>();
-
- for (String line : inputData)
- {
- String[] parts = line.split("\t");
- Assert.assertEquals(3, parts.length);
- parts[0] = Long.toString(dateFormat.parse(parts[0]).getTime());
- lines.add(StringUtils.join(parts,"\t"));
- }
-
- this.writeLinesToFile("input",
- lines.toArray(new String[]{}));
-
- test.runScript();
-
- HashMap<Integer,HashMap<Integer,Boolean>> userValues = new HashMap<Integer,HashMap<Integer,Boolean>>();
-
- for (Tuple t : this.getLinesForAlias(test, "max_value"))
- {
- Integer userId = (Integer)t.get(0);
- Integer max = (Integer)t.get(1);
- if (!userValues.containsKey(userId))
- {
- userValues.put(userId, new HashMap<Integer,Boolean>());
- }
- userValues.get(userId).put(max, true);
- }
-
- assertEquals(userValues.get(1).size(), 2);
- assertEquals(userValues.get(2).size(), 5);
-
- assertTrue(userValues.get(1).containsKey(20));
- assertTrue(userValues.get(1).containsKey(30));
-
- assertTrue(userValues.get(2).containsKey(10));
- assertTrue(userValues.get(2).containsKey(20));
- assertTrue(userValues.get(2).containsKey(30));
- assertTrue(userValues.get(2).containsKey(40));
- assertTrue(userValues.get(2).containsKey(50));
- }
-
- @Test
- public void sessionizeExecTest() throws Exception
- {
- Sessionize sessionize = new Sessionize("30m");
- Tuple input = TupleFactory.getInstance().newTuple(1);
- DataBag inputBag = BagFactory.getInstance().newDefaultBag();
- input.set(0,inputBag);
-
- Tuple item;
- List<Tuple> result;
- DateTime dt;
-
- // test same session id
- inputBag.clear();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(28).getMillis());
- inputBag.add(item);
- result = toList(sessionize.exec(input));
-
- Assert.assertEquals(2, result.size());
- Assert.assertEquals(2,result.get(0).size());
- Assert.assertEquals(2,result.get(1).size());
- // session ids match
- Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
-
- // test different session id
- inputBag.clear();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(31).getMillis());
- inputBag.add(item);
- result = toList(sessionize.exec(input));
-
- Assert.assertEquals(2, result.size());
- Assert.assertEquals(2,result.get(0).size());
- Assert.assertEquals(2,result.get(1).size());
- // session ids don't match
- Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
- }
-
- @Test
- public void sessionizeAccumulateTest() throws Exception
- {
- Sessionize sessionize = new Sessionize("30m");
- Tuple input = TupleFactory.getInstance().newTuple(1);
- DataBag inputBag = BagFactory.getInstance().newDefaultBag();
- input.set(0,inputBag);
-
- Tuple item;
- List<Tuple> result;
- DateTime dt;
-
- // test same session id
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(28).getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- result = toList(sessionize.getValue());
-
- Assert.assertEquals(2, result.size());
- Assert.assertEquals(2,result.get(0).size());
- Assert.assertEquals(2,result.get(1).size());
- // session ids match
- Assert.assertTrue(result.get(0).get(1).equals(result.get(1).get(1)));
-
- // test different session id
- sessionize.cleanup();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(31).getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- result = toList(sessionize.getValue());
-
- Assert.assertEquals(2, result.size());
- Assert.assertEquals(2,result.get(0).size());
- Assert.assertEquals(2,result.get(1).size());
- // session ids don't match
- Assert.assertFalse(result.get(0).get(1).equals(result.get(1).get(1)));
-
- sessionize.cleanup();
- Assert.assertEquals(0,sessionize.getValue().size());
- }
-
- private List<Tuple> toList(DataBag bag)
- {
- List<Tuple> result = new ArrayList<Tuple>();
- for (Tuple t : bag)
- {
- result.add(t);
- }
- return result;
- }
-
- /**
- register $JAR_PATH
-
- define SessionCount datafu.pig.sessions.SessionCount('$TIME_WINDOW');
-
- views = LOAD 'input' AS (user_id:int, page_id:int, time:chararray);
-
- views_grouped = GROUP views BY (user_id, page_id);
- view_counts = foreach views_grouped {
- views = order views by time;
- generate group.user_id as user_id, group.page_id as page_id, SessionCount(views.(time)) as count;
- }
-
- STORE view_counts INTO 'output';
- */
- @Multiline
- private String sessionCountPageViewsTest;
-
- @Test
- public void sessionCountPageViewsTest() throws Exception
- {
- PigTest test = createPigTestFromString(sessionCountPageViewsTest,
- "TIME_WINDOW=30m",
- "JAR_PATH=" + getJarPath());
-
- String[] input = {
- "1\t100\t2010-01-01T01:00:00Z",
- "1\t100\t2010-01-01T01:15:00Z",
- "1\t100\t2010-01-01T01:31:00Z",
- "1\t100\t2010-01-01T01:35:00Z",
- "1\t100\t2010-01-01T02:30:00Z",
-
- "1\t101\t2010-01-01T01:00:00Z",
- "1\t101\t2010-01-01T01:31:00Z",
- "1\t101\t2010-01-01T02:10:00Z",
- "1\t101\t2010-01-01T02:40:30Z",
- "1\t101\t2010-01-01T03:30:00Z",
-
- "1\t102\t2010-01-01T01:00:00Z",
- "1\t102\t2010-01-01T01:01:00Z",
- "1\t102\t2010-01-01T01:02:00Z",
- "1\t102\t2010-01-01T01:10:00Z",
- "1\t102\t2010-01-01T01:15:00Z",
- "1\t102\t2010-01-01T01:25:00Z",
- "1\t102\t2010-01-01T01:30:00Z"
- };
-
- String[] output = {
- "(1,100,2)",
- "(1,101,5)",
- "(1,102,1)"
- };
-
- test.assertOutput("views",input,"view_counts",output);
- }
-
- @Test
- public void sessionCountExecTest() throws Exception
- {
- SessionCount sessionize = new SessionCount("30m");
- Tuple input = TupleFactory.getInstance().newTuple(1);
- DataBag inputBag = BagFactory.getInstance().newDefaultBag();
- input.set(0,inputBag);
-
- Tuple item;
- DateTime dt;
-
- // test same session id
- inputBag.clear();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(28).getMillis());
- inputBag.add(item);
- Assert.assertEquals(1L,sessionize.exec(input).longValue());
-
- // test different session id
- inputBag.clear();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(31).getMillis());
- inputBag.add(item);
- Assert.assertEquals(2L,sessionize.exec(input).longValue());
- }
-
- @Test
- public void sessionCountAccumulateTest() throws Exception
- {
- SessionCount sessionize = new SessionCount("30m");
- Tuple input = TupleFactory.getInstance().newTuple(1);
- DataBag inputBag = BagFactory.getInstance().newDefaultBag();
- input.set(0,inputBag);
-
- Tuple item;
- DateTime dt;
-
- // test same session id
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(28).getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- Assert.assertEquals(1L,sessionize.getValue().longValue());
-
- // test different session id
- sessionize.cleanup();
- dt = new DateTime();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- item = TupleFactory.getInstance().newTuple(1);
- item.set(0, dt.plusMinutes(31).getMillis());
- inputBag.add(item);
- sessionize.accumulate(input);
- inputBag.clear();
- Assert.assertEquals(2L,sessionize.exec(input).longValue());
-
- sessionize.cleanup();
- Assert.assertEquals(0,sessionize.getValue().longValue());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/sets/SetTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sets/SetTests.java b/test/pig/datafu/test/pig/sets/SetTests.java
deleted file mode 100644
index de6aa51..0000000
--- a/test/pig/datafu/test/pig/sets/SetTests.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.sets;
-
-import java.util.Arrays;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import datafu.pig.sets.SetIntersect;
-import datafu.test.pig.PigTests;
-
-public class SetTests extends PigTests
-{
- /**
- register $JAR_PATH
-
- define SetIntersect datafu.pig.sets.SetIntersect();
-
- data = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
-
- data2 = FOREACH data GENERATE SetIntersect(B1,B2);
-
- STORE data2 INTO 'output';
- */
- @Multiline
- private String setIntersectTest;
-
- @Test
- public void setIntersectTest() throws Exception
- {
- PigTest test = createPigTestFromString(setIntersectTest);
-
- writeLinesToFile("input",
- "{(1,10),(2,20),(3,30),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}",
- "{(1,10),(1,10),(2,20),(3,30),(3,30),(4,40),(4,40)}\t{(1,10),(3,30)}");
-
- test.runScript();
-
- assertOutput(test, "data2",
- "({(2,20),(4,40)})",
- "({(1,10),(3,30)})");
- }
-
- /**
- register $JAR_PATH
-
- define SetIntersect datafu.pig.sets.SetIntersect();
-
- docs = LOAD 'docs' USING PigStorage(',') AS (id:int, line:chararray);
- B = FOREACH docs GENERATE id, line;
- C = FOREACH B GENERATE id, TOKENIZE(line) as gu;
-
- filtered = FOREACH C {
- uniq = DISTINCT gu;
- GENERATE id, uniq;
- }
-
- query = LOAD 'query' AS (line_query:chararray);
- bag_query = FOREACH query GENERATE TOKENIZE(line_query) AS query;
- -- sort the bag of tokens, since SetIntersect requires it
- bag_query = FOREACH bag_query {
- query_sorted = ORDER query BY token;
- GENERATE query_sorted;
- }
-
- result = FOREACH filtered {
- -- sort the tokens, since SetIntersect requires it
- tokens_sorted = ORDER uniq BY token;
- GENERATE id,
- SIZE(SetIntersect(tokens_sorted,bag_query.query_sorted)) as cnt;
- }
-
- DUMP result;
-
- */
- @Multiline
- private String setIntersectTestExample;
-
- @Test
- public void setIntersectTestExample() throws Exception
- {
- PigTest test = createPigTestFromString(setIntersectTestExample);
-
- // document with words to check for
- writeLinesToFile("docs",
- "1,word1 word4 word2 word1",
- "2,word2 word6 word1 word5 word3 word7",
- "3,word1 word3 word4 word5");
-
- // query to apply to document
- writeLinesToFile("query",
- "word2 word7 word5");
-
- test.runScript();
-
- assertOutput(test, "result",
- "(1,1)",
- "(2,3)",
- "(3,1)");
- }
-
- @Test
- public void setIntersectEmptyBagsTest() throws Exception
- {
- PigTest test = createPigTestFromString(setIntersectTest);
-
- writeLinesToFile("input",
- "{(1,10),(2,20),(3,30),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}",
- "{(1,10),(1,10),(2,20),(3,30),(3,30),(4,40),(4,40)}\t{(100,10),(300,30)}",
- "{(1,10),(2,20)}\t{(1,10),(2,20)}",
- "{(1,10),(2,20)}\t{}",
- "{}\t{(1,10),(2,20)}",
- "{}\t{}");
-
- test.runScript();
-
- assertOutput(test, "data2",
- "({(2,20),(4,40)})",
- "({})",
- "({(1,10),(2,20)})",
- "({})",
- "({})",
- "({})");
- }
-
- @Test
- public void testIntersectWithNullTuples() throws Exception {
- DataBag one = BagFactory.getInstance().newDefaultBag();
- DataBag two = BagFactory.getInstance().newDefaultBag();
-
- Tuple input = TupleFactory.getInstance().newTuple(Arrays.asList(one, two));
- DataBag output = new SetIntersect().exec(input);
- Assert.assertEquals(0, output.size());
- }
-
- @Test(expectedExceptions=org.apache.pig.impl.logicalLayer.FrontendException.class)
- public void setIntersectOutOfOrderTest() throws Exception
- {
- PigTest test = createPigTestFromString(setIntersectTest);
-
- this.writeLinesToFile("input",
- "{(1,10),(3,30),(2,20),(4,40),(5,50),(6,60)}\t{(0,0),(2,20),(4,40),(8,80)}");
-
- test.runScript();
-
- this.getLinesForAlias(test, "data2");
- }
-
- /**
- register $JAR_PATH
-
- define SetUnion datafu.pig.sets.SetUnion();
-
- data = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
-
- --dump data
-
- data2 = FOREACH data GENERATE SetUnion(B1,B2) AS C;
- data2 = FOREACH data2 {
- C = ORDER C BY val1 ASC, val2 ASC;
- generate C;
- }
-
- --dump data2
-
- STORE data2 INTO 'output';
- */
- @Multiline
- private String setUnionTest;
-
- @Test
- public void setUnionTest() throws Exception
- {
- PigTest test = createPigTestFromString(setUnionTest);
-
- writeLinesToFile("input",
- "{(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)}\t{(1,1),(1,20),(1,25),(1,25),(1,25),(1,40),(1,70),(1,80)}");
-
- test.runScript();
-
- assertOutput(test, "data2",
- "({(1,1),(1,10),(1,20),(1,25),(1,30),(1,40),(1,50),(1,60),(1,70),(1,80)})");
- }
-
- @Test
- public void setUnionEmptyBagsTest() throws Exception
- {
- PigTest test = createPigTestFromString(setUnionTest);
-
- writeLinesToFile("input",
- "{(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)}\t{}",
- "{}\t{(1,10),(1,20),(1,30),(1,40),(1,50)}",
- "{}\t{}");
-
- test.runScript();
-
- assertOutput(test, "data2",
- "({(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80)})",
- "({(1,10),(1,20),(1,30),(1,40),(1,50)})",
- "({})");
- }
-
- /**
- register $JAR_PATH
-
- define SetDifference datafu.pig.sets.SetDifference();
-
- data = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)});
-
- data2 = FOREACH data GENERATE SetDifference(B1,B2);
-
- STORE data2 INTO 'output';
- */
- @Multiline
- private String setDifferenceTwoBagsTest;
-
- @Test
- public void setDifferenceTwoBagsTest() throws Exception
- {
- PigTest test = createPigTestFromString(setDifferenceTwoBagsTest);
-
- writeLinesToFile("input",
- "{(1),(2),(3)}\t",
- "{(1),(2),(3)}\t{}",
- "\t{(1),(2),(3)}",
- "{}\t{(1),(2),(3)}",
- "{(1),(2),(3)}\t{(1)}",
- "{(1),(2),(3)}\t{(1),(2)}",
- "{(1),(2),(3)}\t{(1),(2),(3)}",
- "{(1),(2),(3)}\t{(1),(2),(3),(4)}",
- "{(1),(2),(3),(4),(5),(6)}\t{(3),(4)}");
-
- test.runScript();
-
- assertOutput(test, "data2",
- "({(1),(2),(3)})",
- "({(1),(2),(3)})",
- "({})",
- "({})",
- "({(2),(3)})",
- "({(3)})",
- "({})",
- "({})",
- "({(1),(2),(5),(6)})");
- }
-
- /**
- register $JAR_PATH
-
- define SetDifference datafu.pig.sets.SetDifference();
-
- data = LOAD 'input' AS (B1:bag{T:tuple(val:int)},B2:bag{T:tuple(val:int)},B3:bag{T:tuple(val:int)});
-
- data2 = FOREACH data GENERATE SetDifference(B1,B2,B3);
-
- STORE data2 INTO 'output';
- */
- @Multiline
- private String setDifferenceThreeBagsTest;
-
- @Test
- public void setDifferenceThreeBagsTest() throws Exception
- {
- PigTest test = createPigTestFromString(setDifferenceThreeBagsTest);
-
- writeLinesToFile("input",
- "{(1),(2),(3)}\t\t",
- "{(1),(2),(3)}\t{}\t",
- "{(1),(2),(3)}\t\t{}",
- "{(1),(2),(3)}\t{}\t{}",
- "{(1),(2),(2),(2),(3),(3)}\t{}\t{}",
-
- "{(1),(2),(3)}\t{(2)}\t{}",
- "{(1),(2),(3)}\t{}\t{(2)}",
-
- "{(1),(2),(3)}\t{(2)}\t{(3)}",
-
- "{(1),(2),(3)}\t{(2),(2)}\t{(3),(3)}",
- "{(1),(1),(1),(2),(2),(3),(3),(3)}\t{(2),(2)}\t{(3),(3)}",
- "{(1),(2),(3)}\t{(0),(2)}\t{(3)}",
-
- "{(1),(2),(3)}\t{(0),(2)}\t{(1),(3)}",
- "{(1),(2),(3)}\t{(0),(2)}\t{(1),(3),(4)}");
-
- test.runScript();
-
- assertOutput(test, "data2",
- "({(1),(2),(3)})",
- "({(1),(2),(3)})",
- "({(1),(2),(3)})",
- "({(1),(2),(3)})",
- "({(1),(2),(3)})",
-
- "({(1),(3)})",
- "({(1),(3)})",
-
- "({(1)})",
-
- "({(1)})",
- "({(1)})",
- "({(1)})",
-
- "({})",
- "({})");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/stats/EstimationTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/stats/EstimationTests.java b/test/pig/datafu/test/pig/stats/EstimationTests.java
deleted file mode 100644
index 258f320..0000000
--- a/test/pig/datafu/test/pig/stats/EstimationTests.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.stats;
-
-import java.util.List;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.test.pig.PigTests;
-import static org.testng.Assert.*;
-
-public class EstimationTests extends PigTests
-{
- /**
- register $JAR_PATH
-
- define HyperLogLogPlusPlus datafu.pig.stats.HyperLogLogPlusPlus();
-
- data_in = LOAD 'input' as (val:int);
-
- data_out = FOREACH (GROUP data_in ALL) GENERATE
- HyperLogLogPlusPlus(data_in) as cardinality;
-
- data_out = FOREACH data_out GENERATE cardinality;
-
- STORE data_out into 'output';
- */
- @Multiline private String hyperLogLogTest;
-
- @Test
- public void hyperLogLogTest() throws Exception
- {
- PigTest test = createPigTestFromString(hyperLogLogTest);
-
- int count = 1000000;
- String[] input = new String[count];
- for (int i=0; i<count; i++)
- {
- input[i] = Integer.toString(i*10);
- }
-
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- double error = Math.abs(count-((Long)output.get(0).get(0)))/(double)count;
- System.out.println("error: " + error*100.0 + "%");
- assertTrue(error < 0.01);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/stats/MarkovPairTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/stats/MarkovPairTests.java b/test/pig/datafu/test/pig/stats/MarkovPairTests.java
deleted file mode 100644
index b2f618e..0000000
--- a/test/pig/datafu/test/pig/stats/MarkovPairTests.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.stats;
-
-import static org.testng.Assert.*;
-
-import java.util.Iterator;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.test.pig.PigTests;
-
-public class MarkovPairTests extends PigTests
-{
- /**
- register $JAR_PATH
-
- define markovPairs datafu.pig.stats.MarkovPairs();
-
- data = load 'input' as $schema;
- --describe data;
-
- data_out1 = foreach data generate data as orig_bag;
- --describe data_out1;
-
- data_out = foreach data_out1 generate markovPairs(orig_bag) as markov_bag;
- --describe data_out;
-
- store data_out into 'output';
- */
- @Multiline private String markovPairDefault;
-
- @Test
- public void markovPairDefaultTest() throws Exception
- {
- PigTest test = createPigTestFromString(markovPairDefault,
- "schema=(data: bag {t: tuple(val:int)})");
-
- writeLinesToFile("input", "{(10),(20),(30),(40),(50),(60)}");
-
- String[] expectedOutput = {
- "({((10),(20)),((20),(30)),((30),(40)),((40),(50)),((50),(60))})"
- };
-
- test.runScript();
-
- Iterator<Tuple> actualOutput = test.getAlias("data_out");
-
- assertTuplesMatch(expectedOutput, actualOutput);
- }
-
- @Test
- public void markovPairMultipleInput() throws Exception
- {
- PigTest test = createPigTestFromString(markovPairDefault,
- "schema=(data: bag {t: tuple(val1:int,val2:int)})");
-
- writeLinesToFile("input", "{(10,100),(20,200),(30,300),(40,400),(50,500),(60,600)}");
-
- String[] expectedOutput = {
- "({((10,100),(20,200)),((20,200),(30,300)),((30,300),(40,400)),((40,400),(50,500)),((50,500),(60,600))})"
- };
-
-
- test.runScript();
-
- Iterator<Tuple> actualOutput = test.getAlias("data_out");
-
- assertTuplesMatch(expectedOutput, actualOutput);
- }
-
- /**
- register $JAR_PATH
-
- define markovPairs datafu.pig.stats.MarkovPairs('$lookahead');
-
- data = load 'input' as $schema;
- --describe data;
-
- data_out1 = foreach data generate data as orig_bag;
- --describe data_out1;
-
- data_out = foreach data_out1 generate markovPairs(orig_bag) as markov_bag;
- --describe data_out;
-
- store data_out into 'output';
- */
- @Multiline private String markovPairLookahead;
-
- @Test
- public void markovPairLookaheadTest() throws Exception
- {
- PigTest test = createPigTestFromString(markovPairLookahead,
- "schema=(data: bag {t: tuple(val:int)})",
- "lookahead=3");
-
- writeLinesToFile("input", "{(10),(20),(30),(40),(50)}");
-
- String[] expectedOutput = {
- "({((10),(20)),((10),(30)),((10),(40)),((20),(30)),((20),(40)),((20),(50)),((30),(40)),((30),(50)),((40),(50))})"
- };
-
- test.runScript();
-
- Iterator<Tuple> actualOutput = test.getAlias("data_out");
-
- assertTuplesMatch(expectedOutput, actualOutput);
- }
-
- private void assertTuplesMatch(String[] expectedOutput, Iterator<Tuple> actualOutput)
- {
- Iterator<Tuple> tuples = actualOutput;
-
- for (String outputLine : expectedOutput)
- {
- assertTrue(tuples.hasNext());
- Tuple outputTuple = tuples.next();
- System.out.println(String.format("expected: %s", outputLine));
- System.out.println(String.format("actual: %s", outputTuple.toString()));
- assertEquals(outputLine,outputTuple.toString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/778bef1e/test/pig/datafu/test/pig/stats/QuantileTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/stats/QuantileTests.java b/test/pig/datafu/test/pig/stats/QuantileTests.java
deleted file mode 100644
index 1814016..0000000
--- a/test/pig/datafu/test/pig/stats/QuantileTests.java
+++ /dev/null
@@ -1,696 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package datafu.test.pig.stats;
-
-import static org.testng.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.framework.Assert;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.pigunit.PigTest;
-import org.testng.annotations.Test;
-
-import datafu.pig.stats.QuantileUtil;
-import datafu.pig.stats.StreamingQuantile;
-import datafu.test.pig.PigTests;
-
-public class QuantileTests extends PigTests
-{
- /**
- register $JAR_PATH
-
- define Quantile datafu.pig.stats.Quantile($QUANTILES);
-
- data_in = LOAD 'input' as (val:int);
-
- --describe data_in;
-
- data_out = GROUP data_in ALL;
-
- --describe data_out;
-
- data_out = FOREACH data_out {
- sorted = ORDER data_in BY val;
- GENERATE Quantile(sorted) as quantiles;
- }
- data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
-
- --describe data_out;
-
- STORE data_out into 'output';
- */
- @Multiline private String quantileTest;
-
- @Test
- public void quantileTest() throws Exception
- {
- PigTest test = createPigTestFromString(quantileTest,
- "QUANTILES='0.0','0.25','0.5','0.75','1.0'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,3.0,5.5,8.0,10.0)");
- }
-
- @Test
- public void quantile2Test() throws Exception
- {
- PigTest test = createPigTestFromString(quantileTest,
- "QUANTILES='5'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,3.0,5.5,8.0,10.0)");
- }
-
- @Test
- public void quantile3Test() throws Exception {
- PigTest test = createPigTestFromString(quantileTest,
- "QUANTILES='0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987'");
-
- List<String> input = new ArrayList<String>();
- for (int i=100000; i>=0; i--)
- {
- input.add(Integer.toString(i));
- }
-
- writeLinesToFile("input", input.toArray(new String[0]));
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(130.0,2280.0,15870.0,50000.0,84130.0,97720.0,99870.0)");
- }
-
- @Test
- public void quantile4aTest() throws Exception
- {
- PigTest test = createPigTestFromString(quantileTest,
- "QUANTILES='4'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
- }
-
- @Test
- public void quantile4bTest() throws Exception
- {
- PigTest test = createPigTestFromString(quantileTest,
- "QUANTILES='0.0','0.333','0.666','1.0'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
- }
-
- @Test
- public void quantile5aTest() throws Exception
- {
- PigTest test = createPigTestFromString(quantileTest,
- "QUANTILES='10'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
- }
-
- @Test
- public void quantile5bTest() throws Exception
- {
- PigTest test = createPigTestFromString(quantileTest,
- "QUANTILES='0.0','0.111','0.222','0.333','0.444','0.555','0.666','0.777','0.888','1.0'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
- }
-
- /**
- register $JAR_PATH
-
- define Median datafu.pig.stats.Median();
-
- data_in = LOAD 'input' as (val:int);
-
- --describe data_in;
-
- data_out = GROUP data_in ALL;
-
- --describe data_out;
-
- data_out = FOREACH data_out {
- sorted = ORDER data_in BY val;
- GENERATE Median(sorted) as medians;
- }
- data_out = FOREACH data_out GENERATE FLATTEN(medians);
-
- --describe data_out;
-
- STORE data_out into 'output';
- */
- @Multiline private String medianTest;
-
- @Test
- public void medianTest() throws Exception
- {
- PigTest test = createPigTestFromString(medianTest);
-
- String[] input = {"4","5","6","9","10","7","8","2","3","1"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(5.5)");
- }
-
- /**
- register $JAR_PATH
-
- define Median datafu.pig.stats.StreamingMedian();
-
- data_in = LOAD 'input' as (val:int);
-
- --describe data_in;
-
- data_out = GROUP data_in ALL;
-
- --describe data_out;
-
- data_out = FOREACH data_out {
- GENERATE Median(data_in) as medians;
- }
- data_out = FOREACH data_out GENERATE FLATTEN(medians);
-
- --describe data_out;
-
- STORE data_out into 'output';
- */
- @Multiline private String streamingMedianTest;
-
- @Test
- public void streamingMedianTest() throws Exception
- {
- PigTest test = createPigTestFromString(streamingMedianTest);
-
- String[] input = {"0","4","5","6","9","10","7","8","2","3","1"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(5.0)");
- }
-
- /**
- register $JAR_PATH
-
- define Quantile datafu.pig.stats.StreamingQuantile($QUANTILES);
-
- data_in = LOAD 'input' as (val:int);
-
- --describe data_in;
-
- data_out = GROUP data_in ALL;
-
- --describe data_out;
-
- data_out = FOREACH data_out GENERATE Quantile(data_in.val) as quantiles;
- data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
-
- --describe data_out;
-
- STORE data_out into 'output';
- */
- @Multiline private String streamingQuantileTest;
-
- @Test
- public void streamingQuantileTest() throws Exception {
- PigTest test = createPigTestFromString(streamingQuantileTest,
- "QUANTILES='5'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,3.0,5.0,8.0,10.0)");
- }
-
- @Test
- public void streamingQuantile2Test() throws Exception {
- PigTest test = createPigTestFromString(streamingQuantileTest,
- "QUANTILES='0.5','0.75','1.0'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(5.0,8.0,10.0)");
- }
-
- @Test
- public void streamingQuantile3Test() throws Exception {
- PigTest test = createPigTestFromString(streamingQuantileTest,
- "QUANTILES='0.07','0.03','0.37','1.0','0.0'");
-
- List<String> input = new ArrayList<String>();
- for (int i=1000; i>=1; i--)
- {
- input.add(Integer.toString(i));
- }
-
- writeLinesToFile("input", input.toArray(new String[0]));
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(70.0,30.0,370.0,1000.0,1.0)");
- }
-
- @Test
- public void streamingQuantile4Test() throws Exception {
- PigTest test = createPigTestFromString(streamingQuantileTest,
- "QUANTILES='0.0013','0.0228','0.1587','0.5','0.8413','0.9772','0.9987'");
-
- List<String> input = new ArrayList<String>();
- for (int i=100000; i>=0; i--)
- {
- input.add(Integer.toString(i));
- }
-
- writeLinesToFile("input", input.toArray(new String[0]));
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(130.0,2280.0,15870.0,50000.0,84130.0,97720.0,99870.0)");
- }
-
- @Test
- public void streamingQuantile5aTest() throws Exception {
- PigTest test = createPigTestFromString(streamingQuantileTest,
- "QUANTILES='10'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
- }
-
- @Test
- public void streamingQuantile5bTest() throws Exception {
- PigTest test = createPigTestFromString(streamingQuantileTest,
- "QUANTILES='0.0','0.111','0.222','0.333','0.444','0.555','0.666','0.777','0.888','1.0'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0)");
- }
-
- @Test
- public void streamingQuantile6Test() throws Exception {
- PigTest test = createPigTestFromString(streamingQuantileTest,
- "QUANTILES='0.0','0.333','0.666','1.0'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
- }
-
- @Test
- public void streamingQuantile7Test() throws Exception {
- PigTest test = createPigTestFromString(streamingQuantileTest,
- "QUANTILES='4'");
-
- String[] input = {"1","2","3","4","10","5","6","7","8","9"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,4.0,7.0,10.0)");
- }
-
- @Test
- public void streamingQuantileExecTest() throws Exception {
- StreamingQuantile quantile = new StreamingQuantile("4");
-
- DataBag bag;
- Tuple input;
- Tuple result;
-
- bag = BagFactory.getInstance().newDefaultBag();
- for (int i=1; i<=10; i++)
- {
- Tuple t = TupleFactory.getInstance().newTuple(1);
- t.set(0, i);
- bag.add(t);
- }
- input = TupleFactory.getInstance().newTuple(bag);
- result = quantile.exec(input);
- Assert.assertEquals(4, result.size());
- Assert.assertEquals(1.0, result.get(0));
- Assert.assertEquals(4.0, result.get(1));
- Assert.assertEquals(7.0, result.get(2));
- Assert.assertEquals(10.0, result.get(3));
-
- // do twice to check cleanup works
-
- bag = BagFactory.getInstance().newDefaultBag();
- for (int i=11; i<=20; i++)
- {
- Tuple t = TupleFactory.getInstance().newTuple(1);
- t.set(0, i);
- bag.add(t);
- }
- input = TupleFactory.getInstance().newTuple(bag);
- result = quantile.exec(input);
- Assert.assertEquals(4, result.size());
- Assert.assertEquals(11.0, result.get(0));
- Assert.assertEquals(14.0, result.get(1));
- Assert.assertEquals(17.0, result.get(2));
- Assert.assertEquals(20.0, result.get(3));
- }
-
- @Test
- public void streamingQuantileAccumulateTest() throws Exception {
- StreamingQuantile quantile = new StreamingQuantile("4");
-
- Tuple result;
-
- for (int i=1; i<=10; i++)
- {
- Tuple t = TupleFactory.getInstance().newTuple(1);
- t.set(0, i);
- DataBag bag = BagFactory.getInstance().newDefaultBag();
- bag.add(t);
- Tuple input = TupleFactory.getInstance().newTuple(bag);
- quantile.accumulate(input);
- }
- result = quantile.getValue();
- Assert.assertEquals(4, result.size());
- Assert.assertEquals(1.0, result.get(0));
- Assert.assertEquals(4.0, result.get(1));
- Assert.assertEquals(7.0, result.get(2));
- Assert.assertEquals(10.0, result.get(3));
-
- // do twice to check cleanup works
- quantile.cleanup();
-
- for (int i=11; i<=20; i++)
- {
- Tuple t = TupleFactory.getInstance().newTuple(1);
- t.set(0, i);
- DataBag bag = BagFactory.getInstance().newDefaultBag();
- bag.add(t);
- Tuple input = TupleFactory.getInstance().newTuple(bag);
- quantile.accumulate(input);
- }
- result = quantile.getValue();
- Assert.assertEquals(4, result.size());
- Assert.assertEquals(11.0, result.get(0));
- Assert.assertEquals(14.0, result.get(1));
- Assert.assertEquals(17.0, result.get(2));
- Assert.assertEquals(20.0, result.get(3));
- }
-
- @Test
- public void quantileParamsTest() throws Exception {
- List<Double> quantiles = QuantileUtil.getQuantilesFromParams("5");
-
- assertEquals(quantiles.size(),5);
- assertAboutEqual(quantiles.get(0), 0.0);
- assertAboutEqual(quantiles.get(1), 0.25);
- assertAboutEqual(quantiles.get(2), 0.5);
- assertAboutEqual(quantiles.get(3), 0.75);
- assertAboutEqual(quantiles.get(4), 1.0);
- }
-
- @Test
- public void quantileParamsTest2() throws Exception {
- List<Double> quantiles = QuantileUtil.getQuantilesFromParams("2");
-
- assertEquals(quantiles.size(),2);
- assertAboutEqual(quantiles.get(0), 0.0);
- assertAboutEqual(quantiles.get(1), 1.0);
- }
-
- @Test
- public void quantileParamsTest3() throws Exception {
- List<Double> quantiles = QuantileUtil.getQuantilesFromParams("11");
-
- assertEquals(quantiles.size(),11);
- assertAboutEqual(quantiles.get(0), 0.0);
- assertAboutEqual(quantiles.get(1), 0.1);
- assertAboutEqual(quantiles.get(2), 0.2);
- assertAboutEqual(quantiles.get(3), 0.3);
- assertAboutEqual(quantiles.get(4), 0.4);
- assertAboutEqual(quantiles.get(5), 0.5);
- assertAboutEqual(quantiles.get(6), 0.6);
- assertAboutEqual(quantiles.get(7), 0.7);
- assertAboutEqual(quantiles.get(8), 0.8);
- assertAboutEqual(quantiles.get(9), 0.9);
- assertAboutEqual(quantiles.get(10), 1.0);
- }
-
- @Test
- public void quantileParamsTest4() throws Exception {
- List<Double> quantiles = QuantileUtil.getQuantilesFromParams("10");
-
- assertEquals(quantiles.size(),10);
- assertAboutEqual(quantiles.get(0), 0.0);
- assertAboutEqual(quantiles.get(1), 0.11111);
- assertAboutEqual(quantiles.get(2), 0.22222);
- assertAboutEqual(quantiles.get(3), 0.33333);
- assertAboutEqual(quantiles.get(4), 0.44444);
- assertAboutEqual(quantiles.get(5), 0.55555);
- assertAboutEqual(quantiles.get(6), 0.66666);
- assertAboutEqual(quantiles.get(7), 0.77777);
- assertAboutEqual(quantiles.get(8), 0.88888);
- assertAboutEqual(quantiles.get(9), 1.0);
- }
-
- @Test
- public void quantileParamsTest5() throws Exception {
- List<Double> quantiles = QuantileUtil.getQuantilesFromParams("4");
-
- assertEquals(quantiles.size(),4);
- assertAboutEqual(quantiles.get(0), 0.0);
- assertAboutEqual(quantiles.get(1), 0.333);
- assertAboutEqual(quantiles.get(2), 0.666);
- assertAboutEqual(quantiles.get(3), 1.0);
- }
-
- private void assertAboutEqual(double actual, double expected) {
- assertTrue(Math.abs(actual-expected) < 0.001);
- }
-
- /**
- register $JAR_PATH
-
- define Quantile datafu.pig.stats.$UDF($QUANTILES);
-
- data_in = LOAD 'input' as (val:int);
-
- --describe data_in;
-
- data_out = GROUP data_in ALL;
-
- --describe data_out;
-
- data_out = FOREACH data_out {
- sorted = ORDER data_in BY val;
- GENERATE Quantile(sorted) as quantiles;
- }
- data_out = FOREACH data_out GENERATE FLATTEN(quantiles);
-
- --describe data_out;
-
- data_out = FOREACH data_out GENERATE $EXPECTED_OUTPUT;
-
- STORE data_out into 'output';
- */
- @Multiline private String quantileSchemaTest;
-
- @Test
- public void quantileSchemaTest() throws Exception
- {
- PigTest test = createPigTestFromString(quantileSchemaTest,
- "UDF=Quantile",
- "QUANTILES='0.0','0.5','1.0'",
- "EXPECTED_OUTPUT=quantiles::quantile_0_0, "+
- "quantiles::quantile_0_5, "+
- "quantiles::quantile_1_0");
-
- String[] input = {"1","5","3","4","2"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
- }
-
- @Test
- public void quantileSchemaTest2() throws Exception
- {
- PigTest test = createPigTestFromString(quantileSchemaTest,
- "UDF=Quantile",
- "QUANTILES='3'",
- "EXPECTED_OUTPUT=quantiles::quantile_0, "+
- "quantiles::quantile_1, "+
- "quantiles::quantile_2");
-
- String[] input = {"1","5","3","4","2"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
- }
-
- @Test
- public void quantileSchemaTest3() throws Exception
- {
- PigTest test = createPigTestFromString(quantileSchemaTest,
- "UDF=StreamingQuantile",
- "QUANTILES='0.0','0.5','1.0'",
- "EXPECTED_OUTPUT=quantiles::quantile_0_0, "+
- "quantiles::quantile_0_5, "+
- "quantiles::quantile_1_0");
-
- String[] input = {"1","5","3","4","2"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
- }
-
- @Test
- public void quantileSchemaTest4() throws Exception
- {
- PigTest test = createPigTestFromString(quantileSchemaTest,
- "UDF=StreamingQuantile",
- "QUANTILES='3'",
- "EXPECTED_OUTPUT=quantiles::quantile_0, "+
- "quantiles::quantile_1, "+
- "quantiles::quantile_2");
-
- String[] input = {"1","5","3","4","2"};
- writeLinesToFile("input", input);
-
- test.runScript();
-
- List<Tuple> output = getLinesForAlias(test, "data_out", true);
-
- assertEquals(output.size(),1);
- assertEquals(output.get(0).toString(), "(1.0,3.0,5.0)");
- }
-}
|