datafu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mha...@apache.org
Subject incubator-datafu git commit: DATAFU-58 Hadoop 2.0
Date Wed, 27 May 2015 20:06:40 GMT
Repository: incubator-datafu
Updated Branches:
  refs/heads/master 745a230f2 -> 163fc26ad


DATAFU-58 Hadoop 2.0

Upgrade dependencies to Hadoop 2.7.0 and Pig 0.14.0 and fix all testing issues


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/163fc26a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/163fc26a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/163fc26a

Branch: refs/heads/master
Commit: 163fc26ad9abe30f3a9c0be4622c6e4e187e8505
Parents: 745a230
Author: Matthew Hayes <matthew.terence.hayes@gmail.com>
Authored: Sat May 23 13:00:44 2015 -0700
Committer: Matthew Hayes <mhayes@Matthews-MacBook-Pro-2.local>
Committed: Wed May 27 13:06:20 2015 -0700

----------------------------------------------------------------------
 README.md                                       |   5 +-
 .../multilinestring/MultilineProcessor.java     |   2 +-
 datafu-hourglass/.gitignore                     |   1 +
 datafu-hourglass/build.gradle                   |  43 +++++--
 datafu-hourglass/find_dupes.rb                  |  16 +++
 .../java/datafu/hourglass/fs/PathUtils.java     |   4 +-
 .../mapreduce/DistributedCacheHelper.java       |   4 +-
 .../java/datafu/hourglass/demo/Examples.java    |  36 +++---
 ...artitionCollapsingExecutionPlannerTests.java |   4 +-
 .../test/PartitionCollapsingJoinTest.java       |   6 +-
 .../test/PartitionCollapsingTests.java          |   4 +-
 ...ionPreservingCollapsingIntegrationTests.java |   8 +-
 .../test/PartitionPreservingJoinTests.java      |   6 +-
 .../test/PartitionPreservingTests.java          |   4 +-
 .../java/datafu/hourglass/test/TestAvroJob.java |   4 +-
 .../java/datafu/hourglass/test/TestBase.java    | 127 +++----------------
 datafu-pig/build.gradle                         |  22 +++-
 .../java/datafu/test/pig/bags/BagTests.java     |  57 ++++++++-
 gradle/dependency-versions.gradle               |   6 +-
 19 files changed, 189 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 8e1b67d..7e73093 100644
--- a/README.md
+++ b/README.md
@@ -82,6 +82,9 @@ To run tests for a single class, use the `test.single` property.  For example,
t
 
     ./gradlew :datafu-pig:test -Dtest.single=QuantileTests
 
-The tests can also be run from within eclipse.  Note that you may run out of heap when executing
tests in Eclipse. To fix this adjust your heap settings for the TestNG plugin. Go to Eclipse->Preferences.
Select TestNG->Run/Debug. Add "-Xmx1G" to the JVM args.
+The tests can also be run from within eclipse.  You'll need to install the TestNG plugin
for Eclipse.  See: http://testng.org/doc/download.html. 
 
+Potential issues and workaround:
+* You may run out of heap when executing tests in Eclipse. To fix this adjust your heap settings
for the TestNG plugin. Go to Eclipse->Preferences. Select TestNG->Run/Debug. Add "-Xmx1G"
to the JVM args.
+* You may get a "broken pipe" error when running tests.  If so right click on the project,
open the TestNG settings, and uncheck "Use project TestNG jar".
 

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/build-plugin/src/main/java/org/adrianwalker/multilinestring/MultilineProcessor.java
----------------------------------------------------------------------
diff --git a/build-plugin/src/main/java/org/adrianwalker/multilinestring/MultilineProcessor.java
b/build-plugin/src/main/java/org/adrianwalker/multilinestring/MultilineProcessor.java
index 9abdba5..ab3fa27 100644
--- a/build-plugin/src/main/java/org/adrianwalker/multilinestring/MultilineProcessor.java
+++ b/build-plugin/src/main/java/org/adrianwalker/multilinestring/MultilineProcessor.java
@@ -15,7 +15,7 @@ import javax.lang.model.SourceVersion;
 import javax.lang.model.element.TypeElement;
 
 @SupportedAnnotationTypes({"org.adrianwalker.multilinestring.Multiline"})
-@SupportedSourceVersion(SourceVersion.RELEASE_6)
+@SupportedSourceVersion(SourceVersion.RELEASE_8)
 public final class MultilineProcessor extends AbstractProcessor {
   private Processor delegator = null;
   

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/.gitignore
----------------------------------------------------------------------
diff --git a/datafu-hourglass/.gitignore b/datafu-hourglass/.gitignore
index 942515e..3b15e90 100644
--- a/datafu-hourglass/.gitignore
+++ b/datafu-hourglass/.gitignore
@@ -1,5 +1,6 @@
 test-logs/
 /bin
+/data
 /bin
 /target
 /test-output

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/build.gradle
----------------------------------------------------------------------
diff --git a/datafu-hourglass/build.gradle b/datafu-hourglass/build.gradle
index 75a2876..e89388f 100644
--- a/datafu-hourglass/build.gradle
+++ b/datafu-hourglass/build.gradle
@@ -43,23 +43,48 @@ dependencies {
   compile "log4j:log4j:$log4jVersion"
   compile "org.json:json:$jsonVersion"
   compile "org.apache.avro:avro:$avroVersion"
-  compile "org.apache.avro:avro-mapred:$avroVersion"
   compile "org.apache.avro:avro-compiler:$avroVersion"
-
-  // needed for compilation and testing, not listed as a dependencies in pom
-  compile "org.apache.hadoop:hadoop-core:$hadoopVersion"
-  compile "org.apache.hadoop:hadoop-tools:$hadoopVersion"
+  compile "org.apache.commons:commons-math:$commonsMathVersion"
 
   // needed for testing, not listed as a dependencies in pom
-  testCompile "org.apache.hadoop:hadoop-test:$hadoopVersion"
   testCompile "com.clearspring.analytics:stream:$streamVersion"
   testCompile "javax.ws.rs:jsr311-api:$jsr311Version"
   testCompile "org.slf4j:slf4j-log4j12:$slf4jVersion"
   testCompile "commons-io:commons-io:$commonsIoVersion"
-  testCompile "org.apache.avro:avro-tools:$avroVersion"
   testCompile "org.testng:testng:$testngVersion"
 }
 
+if (hadoopVersion.startsWith("2.") || hadoopVersion.startsWith("0.23.")) {
+  dependencies {
+    // core dependencies, listed as dependencies in pom
+    compile "org.apache.avro:avro-mapred:$avroVersion:hadoop2"
+
+    // needed for compilation and testing, not listed as a dependencies in pom
+    compile "org.apache.hadoop:hadoop-common:$hadoopVersion"
+    compile "org.apache.hadoop:hadoop-hdfs:$hadoopVersion"
+    compile "org.apache.hadoop:hadoop-mapreduce-client-jobclient:$hadoopVersion"
+    compile "org.apache.hadoop:hadoop-archives:$hadoopVersion"
+    compile "org.apache.hadoop:hadoop-auth:$hadoopVersion"
+    compile "org.apache.hadoop:hadoop-mapreduce-client-core:$hadoopVersion"
+
+    // needed for testing, not listed as a dependencies in pom
+    testCompile "org.apache.hadoop:hadoop-minicluster:$hadoopVersion"
+  }
+} else {
+  dependencies {
+    // core dependencies, listed as dependencies in pom
+    compile "org.apache.avro:avro-mapred:$avroVersion"
+
+    // needed for compilation and testing, not listed as a dependencies in pom
+    compile "org.apache.hadoop:hadoop-core:$hadoopVersion"
+    compile "org.apache.hadoop:hadoop-tools:$hadoopVersion"
+
+    // needed for testing, not listed as a dependencies in pom
+    testCompile "org.apache.hadoop:hadoop-test:$hadoopVersion"
+  }
+}
+ 
+
 // modify the pom dependencies so we don't include hadoop and the testing related artifacts
 modifyPom {
   project {
@@ -97,5 +122,7 @@ test {
   // enable TestNG support (default is JUnit)
   useTestNG()
 
+  systemProperty 'hourglass.data.dir', file('data')
+
   maxHeapSize = "2G"
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/find_dupes.rb
----------------------------------------------------------------------
diff --git a/datafu-hourglass/find_dupes.rb b/datafu-hourglass/find_dupes.rb
new file mode 100644
index 0000000..dc57f1b
--- /dev/null
+++ b/datafu-hourglass/find_dupes.rb
@@ -0,0 +1,16 @@
+# Uses jarfish to find all duplicate classes from jars listed in the .classpath
+
+require 'nokogiri'
+
+doc = Nokogiri::XML(File.open('.classpath'))
+
+# Create a classpath.lst file that contains all jars referenced in .classpath
+output = File.open('classpath.lst','w')
+doc.xpath("//classpathentry[@kind='lib']").each { |x| output.write x["path"]+"\n" }
+output.close
+
+unless File.file?("jarfish-1.0-rc7.jar") then
+  `wget https://jarfish.googlecode.com/files/jarfish-1.0-rc7.jar`
+end 
+
+exec "java -jar jarfish-1.0-rc7.jar dupes classpath.lst"

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java b/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
index c270c7b..0e620ac 100644
--- a/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/fs/PathUtils.java
@@ -66,7 +66,7 @@ public class PathUtils
     @Override
     public boolean accept(Path path)
     {
-      String s = path.getName().toString();
+      String s = path.getName();
       return !s.startsWith(".") && !s.startsWith("_");        
     }
   };
@@ -177,7 +177,7 @@ public class PathUtils
    */
   public static List<DatePath> findDatedPaths(FileSystem fs, Path path) throws IOException
   {
-    FileStatus[] outputPaths = fs.listStatus(path, nonHiddenPathFilter);
+    FileStatus[] outputPaths = fs.globStatus(new Path(path, "*"), nonHiddenPathFilter);
     
     List<DatePath> outputs = new ArrayList<DatePath>();
     

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
index 62975d1..5e549cd 100644
--- a/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
+++ b/datafu-hourglass/src/main/java/datafu/hourglass/mapreduce/DistributedCacheHelper.java
@@ -51,9 +51,9 @@ public class DistributedCacheHelper
     Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(conf);
     for (Path localCacheFile : localCacheFiles)
     {
-      if (localCacheFile.toString().endsWith(path.toString()))
+      if (localCacheFile.getName().endsWith(path.getName()))
       {
-        localPath = localCacheFile.toString();
+        localPath = localCacheFile.getName();
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/demo/Examples.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/demo/Examples.java b/datafu-hourglass/src/test/java/datafu/hourglass/demo/Examples.java
index 039822c..d139d02 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/demo/Examples.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/demo/Examples.java
@@ -93,13 +93,13 @@ public class Examples extends TestBase
     _log.info("*** Running " + method.getName());
     
     _log.info("*** Cleaning input and output paths");  
-    getFileSystem().delete(new Path("/data"), true);
-    getFileSystem().delete(new Path("/output"), true);
-    getFileSystem().mkdirs(new Path("/data"));
-    getFileSystem().mkdirs(new Path("/output"));
+    getFileSystem().delete(new Path(getDataPath(), "data"), true);
+    getFileSystem().delete(new Path(getDataPath(), "output"), true);
+    getFileSystem().mkdirs(new Path(getDataPath(), "data"));
+    getFileSystem().mkdirs(new Path(getDataPath(), "output"));
                
     _record = new GenericData.Record(EVENT_SCHEMA);    
-    _eventWriter = new DailyTrackingWriter(new Path("/data/event"),EVENT_SCHEMA,getFileSystem());
+    _eventWriter = new DailyTrackingWriter(new Path(getDataPath(), "data/event"),EVENT_SCHEMA,getFileSystem());
   }
   
   @Test
@@ -119,13 +119,13 @@ public class Examples extends TestBase
     closeDayForEvent();
             
     // run    
-    new CountById().run(createJobConf(),"/data/event","/output");
+    new CountById().run(createJobConf(),getDataPath() + "/data/event",getDataPath() + "/output");
     
     // verify
     
-    checkOutputFolderCount(new Path("/output"), 1);
+    checkOutputFolderCount(new Path(getDataPath(), "output"), 1);
     
-    HashMap<Long,Integer> counts = loadOutputCounts(new Path("/output"), "20130316");
+    HashMap<Long,Integer> counts = loadOutputCounts(new Path(getDataPath(), "output"),
"20130316");
     
     checkSize(counts,3);    
     checkIdCount(counts,1,5);
@@ -140,9 +140,9 @@ public class Examples extends TestBase
     closeDayForEvent();
     
     // run    
-    new CountById().run(createJobConf(),"/data/event","/output");
+    new CountById().run(createJobConf(),getDataPath() + "/data/event",getDataPath() + "/output");
     
-    counts = loadOutputCounts(new Path("/output"), "20130317");
+    counts = loadOutputCounts(new Path(getDataPath(), "output"), "20130317");
     
     checkSize(counts,3);    
     checkIdCount(counts,1,7);
@@ -170,12 +170,12 @@ public class Examples extends TestBase
     }
         
     // run    
-    new EstimateCardinality().run(createJobConf(),"/data/event","/output/daily","/output/summary",30);
+    new EstimateCardinality().run(createJobConf(),getDataPath() + "/data/event", getDataPath()
+ "/output/daily", getDataPath() + "/output/summary",30);
     
     // verify    
-    checkIntermediateFolderCount(new Path("/output/daily"), 30);
-    checkOutputFolderCount(new Path("/output/summary"), 1);
-    Assert.assertTrue(Math.abs(10000L - loadMemberCount(new Path("/output/summary"),"20130330").longValue())/10000.0
< 0.005);
+    checkIntermediateFolderCount(new Path(getDataPath(), "output/daily"), 30);
+    checkOutputFolderCount(new Path(getDataPath(), "output/summary"), 1);
+    Assert.assertTrue(Math.abs(10000L - loadMemberCount(new Path(getDataPath(), "output/summary"),"20130330").longValue())/10000.0
< 0.005);
 
     // more data
     openDayForEvent(2013, 3, 31);        
@@ -183,12 +183,12 @@ public class Examples extends TestBase
     closeDayForEvent();
     
     // run    
-    new EstimateCardinality().run(createJobConf(),"/data/event","/output/daily","/output/summary",30);
+    new EstimateCardinality().run(createJobConf(),getDataPath() + "/data/event", getDataPath()
+ "/output/daily", getDataPath() + "/output/summary",30);
     
     // verify    
-    checkIntermediateFolderCount(new Path("/output/daily"), 31);
-    checkOutputFolderCount(new Path("/output/summary"), 1);
-    Assert.assertEquals(loadMemberCount(new Path("/output/summary"),"20130331").longValue(),10L);
+    checkIntermediateFolderCount(new Path(getDataPath(), "output/daily"), 31);
+    checkOutputFolderCount(new Path(getDataPath(), "output/summary"), 1);
+    Assert.assertEquals(loadMemberCount(new Path(getDataPath(), "output/summary"),"20130331").longValue(),10L);
   }
   
   private void openDayForEvent(int year, int month, int day) throws IOException

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingExecutionPlannerTests.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingExecutionPlannerTests.java
b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingExecutionPlannerTests.java
index d68ea83..f1003d4 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingExecutionPlannerTests.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingExecutionPlannerTests.java
@@ -55,8 +55,8 @@ public class PartitionCollapsingExecutionPlannerTests extends TestBase
 {
   private Logger _log = Logger.getLogger(PartitionCollapsingTests.class);
   
-  private Path _inputPath = new Path("/input");
-  private Path _outputPath = new Path("/output");
+  private Path _inputPath = new Path(getDataPath(), "input");
+  private Path _outputPath = new Path(getDataPath(), "output");
   
   private Properties _props;
   

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingJoinTest.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingJoinTest.java
b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingJoinTest.java
index 02aa342..ca18465 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingJoinTest.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingJoinTest.java
@@ -56,9 +56,9 @@ public class PartitionCollapsingJoinTest extends TestBase
 {
 private Logger _log = Logger.getLogger(PartitionPreservingJoinTests.class);
   
-  private Path _impressionEventPath = new Path("/data/tracking/impressions");
-  private Path _clickEventPath = new Path("/data/tracking/clicks");
-  private Path _outputPath = new Path("/output");
+  private Path _impressionEventPath = new Path(getDataPath(), "tracking/impressions");
+  private Path _clickEventPath = new Path(getDataPath(), "tracking/clicks");
+  private Path _outputPath = new Path(getDataPath(), "output");
   
   private static final Schema IMPRESSION_SCHEMA;
   private static final Schema CLICK_SCHEMA;

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingTests.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingTests.java
b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingTests.java
index fff1cfd..a2007c9 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingTests.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionCollapsingTests.java
@@ -58,8 +58,8 @@ public class PartitionCollapsingTests extends TestBase
 {
   private Logger _log = Logger.getLogger(PartitionCollapsingTests.class);
   
-  private Path _inputPath = new Path("/input");
-  private Path _outputPath = new Path("/output");
+  private Path _inputPath = new Path(getDataPath(), "input");
+  private Path _outputPath = new Path(getDataPath(), "output");
   
   private static final Schema EVENT_SCHEMA;
   

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingCollapsingIntegrationTests.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingCollapsingIntegrationTests.java
b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingCollapsingIntegrationTests.java
index a8f020b..5a5550d 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingCollapsingIntegrationTests.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingCollapsingIntegrationTests.java
@@ -59,9 +59,9 @@ public class PartitionPreservingCollapsingIntegrationTests extends TestBase
 {
   private Logger _log = Logger.getLogger(PartitionPreservingCollapsingIntegrationTests.class);
   
-  private Path _inputPath = new Path("/data/tracking/SimpleEvent");
-  private Path _intermediatePath = new Path("/intermediate");
-  private Path _outputPath = new Path("/output");
+  private Path _inputPath = new Path(getDataPath(), "tracking/SimpleEvent");
+  private Path _intermediatePath = new Path(getDataPath(), "intermediate");
+  private Path _outputPath = new Path(getDataPath(), "output");
   
   private static final Schema EVENT_SCHEMA;
   
@@ -108,8 +108,10 @@ public class PartitionPreservingCollapsingIntegrationTests extends TestBase
     _log.info("*** Cleaning input and output paths");  
     getFileSystem().delete(_inputPath, true);
     getFileSystem().delete(_outputPath, true);
+    getFileSystem().delete(_intermediatePath, true);
     getFileSystem().mkdirs(_inputPath);
     getFileSystem().mkdirs(_outputPath);
+    getFileSystem().mkdirs(_intermediatePath);
     
     _maxIterations = 20;
     _maxDaysToProcess = 365;

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingJoinTests.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingJoinTests.java
b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingJoinTests.java
index c41fd39..c74d9a8 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingJoinTests.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingJoinTests.java
@@ -58,9 +58,9 @@ public class PartitionPreservingJoinTests extends TestBase
 {
   private Logger _log = Logger.getLogger(PartitionPreservingJoinTests.class);
   
-  private Path _impressionEventPath = new Path("/data/tracking/impressions");
-  private Path _clickEventPath = new Path("/data/tracking/clicks");
-  private Path _outputPath = new Path("/output");
+  private Path _impressionEventPath = new Path(getDataPath(), "tracking/impressions");
+  private Path _clickEventPath = new Path(getDataPath(), "tracking/clicks");
+  private Path _outputPath = new Path(getDataPath(), "output");
   
   private static final Schema IMPRESSION_SCHEMA;
   private static final Schema CLICK_SCHEMA;

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingTests.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingTests.java
b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingTests.java
index acae96c..d90c4fc 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingTests.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/test/PartitionPreservingTests.java
@@ -57,8 +57,8 @@ public class PartitionPreservingTests extends TestBase
 {
   private Logger _log = Logger.getLogger(PartitionPreservingTests.class);
   
-  private Path _inputPath = new Path("/data/tracking/SimpleEvent");
-  private Path _outputPath = new Path("/output");
+  private Path _inputPath = new Path(getDataPath(), "tracking/SimpleEvent");
+  private Path _outputPath = new Path(getDataPath(), "output");
   
   private static final Schema EVENT_SCHEMA;
   

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/test/TestAvroJob.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/test/TestAvroJob.java b/datafu-hourglass/src/test/java/datafu/hourglass/test/TestAvroJob.java
index b428003..758280f 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/test/TestAvroJob.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/test/TestAvroJob.java
@@ -53,8 +53,8 @@ public class TestAvroJob extends TestBase
 {
   private Logger _log = Logger.getLogger(TestAvroJob.class);
   
-  private Path _inputPath = new Path("/data/tracking/SimpleEvent");
-  private Path _outputPath = new Path("/output");
+  private Path _inputPath = new Path(getDataPath(), "tracking/SimpleEvent");
+  private Path _outputPath = new Path(getDataPath(), "output");
   
   private static final Schema EVENT_SCHEMA;
   

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-hourglass/src/test/java/datafu/hourglass/test/TestBase.java
----------------------------------------------------------------------
diff --git a/datafu-hourglass/src/test/java/datafu/hourglass/test/TestBase.java b/datafu-hourglass/src/test/java/datafu/hourglass/test/TestBase.java
index bc52977..eedc9a4 100644
--- a/datafu-hourglass/src/test/java/datafu/hourglass/test/TestBase.java
+++ b/datafu-hourglass/src/test/java/datafu/hourglass/test/TestBase.java
@@ -26,100 +26,36 @@ import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRClientCluster;
+import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.log4j.Logger;
 
 public class TestBase
-{
-  private Logger _log = Logger.getLogger(TestBase.class);
-  
-  private MiniDFSCluster _dfsCluster;
-  private MiniMRCluster _mrCluster;
+{  
   private FileSystem _fileSystem;
   protected String _confDir;
   
-  public static final int LOCAL_MR = 1;
-  public static final int CLUSTER_MR = 2;
-  public static final int LOCAL_FS = 4;
-  public static final int DFS_FS = 8;
-
-  private boolean localMR;
-  private boolean localFS;
-
-  private int taskTrackers;
-  private int dataNodes;
-  
   public TestBase() throws IOException 
   {
-    this(1,1);
-  }
-  
-  public TestBase(int taskTrackers, int dataNodes) throws IOException 
-  {      
-    if (System.getProperty("testlocal") != null && Boolean.parseBoolean(System.getProperty("testlocal")))
-    {
-      localMR = true;
-      localFS = true;
       _confDir = System.getProperty("confdir");
-    }
-    
-    if (taskTrackers < 1) {
-      throw new IllegalArgumentException(
-                                         "Invalid taskTrackers value, must be greater than
0");
-    }
-    if (dataNodes < 1) {
-      throw new IllegalArgumentException(
-                                         "Invalid dataNodes value, must be greater than 0");
-    }
-    this.taskTrackers = taskTrackers;
-    this.dataNodes = dataNodes;
   }
   
-  @SuppressWarnings("deprecation")
   public void beforeClass() throws Exception
   {
     // make sure the log folder exists or it will fail
     new File("test-logs").mkdirs();    
     System.setProperty("hadoop.log.dir", "test-logs");
-        
-    if (localFS) 
-    {
-      _fileSystem = FileSystem.get(new JobConf());
-      _log.info("*** Using local file system: " + _fileSystem.getUri());
-    }
-    else 
-    {
-      _log.info("*** Starting Mini DFS Cluster");    
-      _dfsCluster = new MiniDFSCluster(new JobConf(), dataNodes, true, null);
-      _fileSystem = _dfsCluster.getFileSystem();
-    }
     
-    if (localMR)
-    {
-      _log.info("*** Using local MR Cluster");       
-    }
-    else
-    { 
-      _log.info("*** Starting Mini MR Cluster");  
-      _mrCluster = new MiniMRCluster(taskTrackers, _fileSystem.getName(), 1);
-    }
+    _fileSystem = FileSystem.get(new JobConf());
   }
   
   public void afterClass() throws Exception
   {
-    if (_dfsCluster != null) {
-      _log.info("*** Shutting down Mini DFS Cluster");    
-      _dfsCluster.shutdown();
-      _dfsCluster = null;
-    }
-    if (_mrCluster != null) {
-      _log.info("*** Shutting down Mini MR Cluster");     
-      _mrCluster.shutdown();
-      _mrCluster = null;
-    }
   }
   
   /**
@@ -139,42 +75,9 @@ public class TestBase
    * managed by the testcase.
    * @return configuration that works on the testcase Hadoop instance
    */
-  protected JobConf createJobConf() {    
-    if (localMR)
-    {
-      JobConf conf = new JobConf();
-      String jarName = System.getProperty("testjar");
-      if (jarName == null)
-      {
-        throw new RuntimeException("must set testjar property");
-      }
-      _log.info("Using jar name: " + jarName);
-      conf.setJar(jarName);
-      return conf;
-    }
-    else
-    {
-      return _mrCluster.createJobConf();
-    }
-  }
-  
-  /**
-   * Indicates if the MR is running in local or cluster mode.
-   *
-   * @return returns TRUE if the MR is running locally, FALSE if running in
-   * cluster mode.
-   */
-  public boolean isLocalMR() {
-    return localMR;
-  }
-
-  /**
-   * Indicates if the filesystem is local or DFS.
-   *
-   * @return returns TRUE if the filesystem is local, FALSE if it is DFS.
-   */
-  public boolean isLocalFS() {
-    return localFS;
+  protected JobConf createJobConf() 
+  {
+    return new JobConf();
   }
   
   /**
@@ -189,7 +92,7 @@ public class TestBase
   {
     ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
     DataOutputStream dataStream = new DataOutputStream(arrayOutputStream);    
-    createJobConf().write(dataStream);    
+    createJobConf().write(dataStream);  
     dataStream.flush();
     props.setProperty("test.conf", new String(Base64.encodeBase64(arrayOutputStream.toByteArray())));
   }
@@ -207,4 +110,16 @@ public class TestBase
     storeTestConf(props);  
     return props;
   }
+  
+  protected String getDataPath()
+  {
+    if (System.getProperty("hourglass.data.dir") != null)
+    {
+      return System.getProperty("hourglass.data.dir");
+    }
+    else
+    {
+      return new File(System.getProperty("user.dir"), "data").getAbsolutePath();
+    }  
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-pig/build.gradle
----------------------------------------------------------------------
diff --git a/datafu-pig/build.gradle b/datafu-pig/build.gradle
index ea385d2..b5652a9 100644
--- a/datafu-pig/build.gradle
+++ b/datafu-pig/build.gradle
@@ -159,10 +159,6 @@ dependencies {
   // should be available
   compile "joda-time:joda-time:$jodaTimeVersion"
 
-  // needed for compilation only.  obviously don't need to autojar this.
-  compile "org.apache.pig:pig:$pigVersion"
-  compile "org.apache.hadoop:hadoop-core:$hadoopVersion"
-
   testCompile "org.apache.pig:pigunit:$pigVersion"
   testCompile "log4j:log4j:$log4jVersion"
   testCompile "jline:jline:$jlineVersion"
@@ -183,6 +179,22 @@ modifyPom {
   }
 }
 
+if (hadoopVersion.startsWith("2.") || hadoopVersion.startsWith("0.23.")) {
+  dependencies {
+    // needed for compilation only.  obviously don't need to autojar this.
+    compile "org.apache.pig:pig:$pigVersion:h2"
+    compile "org.apache.hadoop:hadoop-common:$hadoopVersion"
+    compile "org.apache.hadoop:hadoop-hdfs:$hadoopVersion"
+    compile "org.apache.hadoop:hadoop-mapreduce-client-jobclient:$hadoopVersion"
+  }
+} else {
+  dependencies {
+    // needed for compilation only.  obviously don't need to autojar this.
+    compile "org.apache.pig:pig:$pigVersion"
+    compile "org.apache.hadoop:hadoop-core:$hadoopVersion"
+  }
+}
+
 compileTestJava.doFirst {
   options.compilerArgs = ['-processor', 'org.adrianwalker.multilinestring.MultilineProcessor']
 }
@@ -218,4 +230,4 @@ test {
   systemProperty 'datafu.data.dir', file('data')
 
   maxHeapSize = "2G"
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
index 9bcc384..11753ba 100644
--- a/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
+++ b/datafu-pig/src/test/java/datafu/test/pig/bags/BagTests.java
@@ -24,9 +24,11 @@ import datafu.pig.bags.DistinctBy;
 import datafu.pig.bags.Enumerate;
 import datafu.test.pig.PigTests;
 import junit.framework.Assert;
+
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.SortedDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.pigunit.PigTest;
@@ -34,10 +36,14 @@ import org.testng.annotations.Test;
 
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import static org.testng.Assert.assertEquals;
 
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 
 public class BagTests extends PigTests
 {
@@ -729,12 +735,12 @@ public class BagTests extends PigTests
     PigTest test = createPigTestFromString(distinctByMultiComplexFieldTest);
 
     writeLinesToFile("input",
-                     "({(a-b,[a#0,b#1],{(a-b,0),(a-b,1)}),(a-c,[b#1,a#0],{(a-b,0),(a-b,1)}),(a-d,[a#1,b#0],{(a-b,1),(a-b,2)})})");
+                     "({(a-b,[b#1],{(a-b,0),(a-b,1)}),(a-c,[b#1],{(a-b,0),(a-b,1)}),(a-d,[b#0],{(a-b,1),(a-b,2)})})");
 
     test.runScript();
 
     assertOutput(test, "data2",
-                 "({(a-b,[b#1,a#0],{(a-b,0),(a-b,1)}),(a-d,[b#0,a#1],{(a-b,1),(a-b,2)})})");
+                 "({(a-b,[b#1],{(a-b,0),(a-b,1)}),(a-d,[b#0],{(a-b,1),(a-b,2)})})");
   }
 
   /**
@@ -1149,9 +1155,21 @@ public class BagTests extends PigTests
                      "1\t{(K1,A1),(K2,B1),(K3,C1)}\t{(K1,A2),(K2,B2),(K2,B22)}\t{(K1,A3),(K3,C3),(K4,D3)}");
 
     test.runScript();
-
-    assertOutput(test, "data2",
-        "(1,{(K1,A1,K1,A2,K1,A3),(K2,B1,K2,B2,,),(K2,B1,K2,B22,,),(K3,C1,,,K3,C3)},{(K1,A1,K1,A3,K1,A2),(K2,B1,,,K2,B2),(K2,B1,,,K2,B22),(K3,C1,K3,C3,,)})");
+    
+    List<Tuple> tuples = getLinesForAlias(test, "data2");
+    assertEquals(tuples.size(), 1);
+    Tuple tuple = tuples.get(0);
+    DataBag joined1 = (DataBag)tuple.get(1);
+    DataBag joined2 = (DataBag)tuple.get(2);
+    
+    String joined1Schema = "{(bag1::k: chararray,bag1::v: chararray,bag2::k: chararray,bag2::v:
chararray,bag3::k3: chararray,bag3::v3: chararray)}";
+    String joined2Schema = "{(bag1::k: chararray,bag1::v: chararray,bag3::k3: chararray,bag3::v3:
chararray,bag2::k: chararray,bag2::v: chararray)}";
+    String expectedJoined1 = "{(K1,A1,K1,A2,K1,A3),(K2,B1,K2,B2,,),(K2,B1,K2,B22,,),(K3,C1,,,K3,C3)}";
+    String expectedJoined2 = "{(K1,A1,K1,A3,K1,A2),(K2,B1,,,K2,B2),(K2,B1,,,K2,B22),(K3,C1,K3,C3,,)}";
+    
+    // compare sorted bags because there is no guarantee on the order
+    assertEquals(getSortedBag(joined1).toString(),getSortedBag(expectedJoined1, joined1Schema).toString());
+    assertEquals(getSortedBag(joined2).toString(),getSortedBag(expectedJoined2, joined2Schema).toString());
   }
 
     /**
@@ -1188,8 +1206,33 @@ public class BagTests extends PigTests
             throw e;
         }
 
-        assertOutput(test, "data2",
-                "(1,{(K1,A1,K1,A2,K1,A3),(K2,B1,K2,B2,,),(K2,B1,K2,B22,,),(K3,C1,,,K3,C3),(,,,,K4,D3)},{(K1,A1,K1,A3,K1,A2),(K2,B1,,,K2,B2),(K2,B1,,,K2,B22),(K3,C1,K3,C3,,),(,,K4,D3,,)})");
+        List<Tuple> tuples = getLinesForAlias(test, "data2");
+        assertEquals(tuples.size(), 1);
+        Tuple tuple = tuples.get(0);
+        DataBag joined1 = (DataBag)tuple.get(1);
+        DataBag joined2 = (DataBag)tuple.get(2);
+        
+        String joined1Schema = "{(bag1::k: chararray,bag1::v: chararray,bag2::k: chararray,bag2::v:
chararray,bag3::k3: chararray,bag3::v3: chararray)}";
+        String joined2Schema = "{(bag1::k: chararray,bag1::v: chararray,bag3::k3: chararray,bag3::v3:
chararray,bag2::k: chararray,bag2::v: chararray)}";
+        String expectedJoined1 = "{(K1,A1,K1,A2,K1,A3),(K2,B1,K2,B2,,),(K2,B1,K2,B22,,),(K3,C1,,,K3,C3),(,,,,K4,D3)}";
+        String expectedJoined2 = "{(K1,A1,K1,A3,K1,A2),(K2,B1,,,K2,B2),(K2,B1,,,K2,B22),(K3,C1,K3,C3,,),(,,K4,D3,,)}";
+        
+        // compare sorted bags because there is no guarantee on the order
+        assertEquals(getSortedBag(joined1).toString(),getSortedBag(expectedJoined1, joined1Schema).toString());
+        assertEquals(getSortedBag(joined2).toString(),getSortedBag(expectedJoined2, joined2Schema).toString());
+    }
+    
+    private DataBag getSortedBag(String bagString, String schema) throws Exception {
+        Utf8StorageConverter converter = new Utf8StorageConverter();
+        ResourceFieldSchema parsedSchema = new ResourceFieldSchema(Utils.parseSchema("the_bag:
" + schema).getField("the_bag"));
+        DataBag bag = converter.bytesToBag(bagString.getBytes("UTF-8"), parsedSchema);
+        return getSortedBag(bag);
+    }
+    
+    private DataBag getSortedBag(DataBag bag) {
+        DataBag sortedBag = new SortedDataBag(null);
+        sortedBag.addAll(bag);
+        return sortedBag;
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/163fc26a/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 3b0835f..462f971 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -24,13 +24,13 @@ ext {
   commonsMathVersion="2.2"
   commonsIoVersion="2.4"
   fastutilVersion="6.5.7"
-  guavaVersion="17.0"
-  hadoopVersion="1.0.4"
+  guavaVersion="11.0.2"
+  hadoopVersion="2.7.0"
   jodaTimeVersion="1.6"
   log4jVersion="1.2.17"
   mavenVersion="2.1.3"
   jlineVersion="0.9.94"
-  pigVersion="0.12.1"
+  pigVersion="0.14.0"
   testngVersion="6.2"
   toolsVersion="1.4.2"
   wagonHttpVersion="1.0-beta-2"



Mime
View raw message