apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csi...@apache.org
Subject [1/3] incubator-apex-malhar git commit: MLHR-1858 #comment File input operator that emits the lines of the file
Date Thu, 01 Oct 2015 05:33:25 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 3d0166bfe -> fdc91fa3f


MLHR-1858 #comment File input operator that emits the lines of the file


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c85a4180
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c85a4180
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c85a4180

Branch: refs/heads/devel-3
Commit: c85a4180dbdfc01f224e6fa3ffb37f62aeb1a1c5
Parents: f9c7992
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Sat Sep 26 18:22:41 2015 -0700
Committer: Pramod Immaneni <pramod@datatorrent.com>
Committed: Sat Sep 26 18:23:55 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileInputOperator.java    |  46 ++++++++
 .../io/fs/AbstractFileInputOperatorTest.java    | 116 ++++++++-----------
 2 files changed, 93 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c85a4180/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 63e5594..18fe83b 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -1158,4 +1158,50 @@ public abstract class AbstractFileInputOperator<T> implements
InputOperator, Par
       return result;
     }
   }
+
+  /**
+   * This is an implementation of the {@link AbstractFileInputOperator} that outputs the
lines in a file.&nbsp;
+   * Each line is emitted as a separate tuple.&nbsp; It is emitted as a String.
+   * <p>
+   * The directory path where to scan and read files from should be specified using the {@link
#directory} property.
+   * </p>
+   * @displayName File Line Input
+   * @category Input
+   * @tags fs, file, line, lines, input operator
+   *
+   */
+  public static class FileLineInputOperator extends AbstractFileInputOperator<String>
+  {
+    public transient final DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+
+    protected transient BufferedReader br;
+
+    @Override
+    protected InputStream openFile(Path path) throws IOException
+    {
+      InputStream is = super.openFile(path);
+      br = new BufferedReader(new InputStreamReader(is));
+      return is;
+    }
+
+    @Override
+    protected void closeFile(InputStream is) throws IOException
+    {
+      super.closeFile(is);
+      br.close();
+      br = null;
+    }
+
+    @Override
+    protected String readEntity() throws IOException
+    {
+      return br.readLine();
+    }
+
+    @Override
+    protected void emit(String tuple)
+    {
+      output.emit(tuple);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c85a4180/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 2333344..d50ec17 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -15,30 +15,42 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import com.datatorrent.api.*;
-import com.datatorrent.api.Partitioner.Partition;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.datatorrent.lib.io.IdempotentStorageManager;
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
-import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.google.common.collect.*;
-
-import java.io.*;
-import java.util.*;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
-import org.junit.*;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
+import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.StatsListener;
+
 public class AbstractFileInputOperatorTest
 {
   public static class TestMeta extends TestWatcher
@@ -70,41 +82,7 @@ public class AbstractFileInputOperatorTest
   }
 
   @Rule public TestMeta testMeta = new TestMeta();
-
-  public static class TestFileInputOperator extends AbstractFileInputOperator<String>
-  {
-    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
-    private transient BufferedReader br = null;
-
-    @Override
-    protected InputStream openFile(Path path) throws IOException
-    {
-      InputStream is = super.openFile(path);
-      br = new BufferedReader(new InputStreamReader(is));
-      return is;
-    }
-
-    @Override
-    protected void closeFile(InputStream is) throws IOException
-    {
-      super.closeFile(is);
-      br.close();
-      br = null;
-    }
-
-    @Override
-    protected String readEntity() throws IOException
-    {
-      return br.readLine();
-    }
-
-    @Override
-    protected void emit(String tuple)
-    {
-      output.emit(tuple);
-    }
-  }
-
+  
   @Test
   public void testSinglePartiton() throws Exception
   {
@@ -119,7 +97,7 @@ public class AbstractFileInputOperatorTest
       FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n'));
     }
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
 
     CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -169,7 +147,7 @@ public class AbstractFileInputOperatorTest
   @Test
   public void testPartitioning() throws Exception
   {
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
 
@@ -208,12 +186,12 @@ public class AbstractFileInputOperatorTest
   public void testPartitioningStateTransfer() throws Exception
   {
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
     oper.setScanIntervalMillis(0);
 
-    TestFileInputOperator initialState = new Kryo().copy(oper);
+    FileLineInputOperator initialState = new Kryo().copy(oper);
 
     // Create 4 files with 3 records each.
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
@@ -263,7 +241,7 @@ public class AbstractFileInputOperatorTest
     /* Collect all operators in a list */
     List<AbstractFileInputOperator<String>> opers = Lists.newArrayList();
     for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
-      TestFileInputOperator oi = (TestFileInputOperator)p.getPartitionedInstance();
+      FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance();
       oi.setup(testMeta.context);
       oi.output.setSink(sink);
       opers.add(oi);
@@ -311,13 +289,13 @@ public class AbstractFileInputOperatorTest
   @Test
   public void testPartitioningStateTransferInterrupted() throws Exception
   {
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
     oper.setScanIntervalMillis(0);
     oper.setEmitBatchSize(2);
 
-    TestFileInputOperator initialState = new Kryo().copy(oper);
+    FileLineInputOperator initialState = new Kryo().copy(oper);
 
     // Create 4 files with 3 records each.
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
@@ -367,7 +345,7 @@ public class AbstractFileInputOperatorTest
     /* Collect all operators in a list */
     List<AbstractFileInputOperator<String>> opers = Lists.newArrayList();
     for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
-      TestFileInputOperator oi = (TestFileInputOperator)p.getPartitionedInstance();
+      FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance();
       oi.setup(testMeta.context);
       oi.output.setSink(sink);
       opers.add(oi);
@@ -396,13 +374,13 @@ public class AbstractFileInputOperatorTest
   @Test
   public void testPartitioningStateTransferFailure() throws Exception
   {
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
     oper.setScanIntervalMillis(0);
     oper.setEmitBatchSize(2);
 
-    TestFileInputOperator initialState = new Kryo().copy(oper);
+    FileLineInputOperator initialState = new Kryo().copy(oper);
 
     // Create 4 files with 3 records each.
     Path path = new Path(new File(testMeta.dir).getAbsolutePath());
@@ -452,7 +430,7 @@ public class AbstractFileInputOperatorTest
     /* Collect all operators in a list */
     List<AbstractFileInputOperator<String>> opers = Lists.newArrayList();
     for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
-      TestFileInputOperator oi = (TestFileInputOperator)p.getPartitionedInstance();
+      FileLineInputOperator oi = (FileLineInputOperator)p.getPartitionedInstance();
       oi.setup(testMeta.context);
       oi.output.setSink(sink);
       opers.add(oi);
@@ -486,7 +464,7 @@ public class AbstractFileInputOperatorTest
     FileUtils.write(testFile, StringUtils.join(lines, '\n'));
 
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.scanner = null;
     oper.failedFiles.add(new AbstractFileInputOperator.FailedFile(testFile.getAbsolutePath(),
1));
 
@@ -521,7 +499,7 @@ public class AbstractFileInputOperatorTest
     File testFile = new File(testMeta.dir, "file0");
     FileUtils.write(testFile, StringUtils.join(lines, '\n'));
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.scanner = null;
     oper.unfinishedFiles.add(new AbstractFileInputOperator.FailedFile(testFile.getAbsolutePath(),
2));
 
@@ -556,7 +534,7 @@ public class AbstractFileInputOperatorTest
     File testFile = new File(testMeta.dir, "file0");
     FileUtils.write(testFile, StringUtils.join(lines, '\n'));
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.scanner = null;
     oper.pendingFiles.add(testFile.getAbsolutePath());
 
@@ -591,7 +569,7 @@ public class AbstractFileInputOperatorTest
     File testFile = new File(testMeta.dir, "file0");
     FileUtils.write(testFile, StringUtils.join(lines, '\n'));
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.scanner = null;
     oper.currentFile = testFile.getAbsolutePath();
     oper.offset = 1;
@@ -629,7 +607,7 @@ public class AbstractFileInputOperatorTest
       FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
     }
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
     manager.setRecoveryPath(testMeta.dir + "/recovery");
 
@@ -678,7 +656,7 @@ public class AbstractFileInputOperatorTest
       FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
     }
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
     manager.setRecoveryPath(testMeta.dir + "/recovery");
 
@@ -721,7 +699,7 @@ public class AbstractFileInputOperatorTest
     }
     FileUtils.write(new File(testMeta.dir, "file0"), StringUtils.join(lines, '\n'));
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
     manager.setRecoveryPath(testMeta.dir + "/recovery");
     oper.setEmitBatchSize(5);
@@ -783,7 +761,7 @@ public class AbstractFileInputOperatorTest
       FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
     }
 
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
 
     IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager();
     manager.setRecoveryPath(testMeta.dir + "/recovery");
@@ -833,7 +811,7 @@ public class AbstractFileInputOperatorTest
   @Test
   public void testIdempotentStorageManagerPartitioning() throws Exception
   {
-    TestFileInputOperator oper = new TestFileInputOperator();
+    FileLineInputOperator oper = new FileLineInputOperator();
     oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)");
     oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
     oper.setIdempotentStorageManager(new TestStorageManager());


Mime
View raw message