storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [09/12] storm git commit: Refactoring code based on feedback
Date Fri, 14 Aug 2015 18:01:25 GMT
Refactoring code based on feedback


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c44e02b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c44e02b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c44e02b3

Branch: refs/heads/master
Commit: c44e02b3818c572924d96ae1dc26bb6ab6df66c7
Parents: 4d8c51b
Author: Arun Mahadevan <aiyer@hortonworks.com>
Authored: Fri Aug 14 11:58:54 2015 +0530
Committer: Arun Mahadevan <aiyer@hortonworks.com>
Committed: Fri Aug 14 11:58:54 2015 +0530

----------------------------------------------------------------------
 .../java/org/apache/storm/hdfs/trident/HdfsState.java | 14 +++++++++-----
 .../hdfs/trident/rotation/FileRotationPolicy.java     |  6 ++++++
 .../hdfs/trident/rotation/FileSizeRotationPolicy.java |  5 +++++
 .../storm/hdfs/trident/rotation/NoRotationPolicy.java |  5 +++++
 .../hdfs/trident/rotation/TimedRotationPolicy.java    |  1 +
 5 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 8b29f66..4448868 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -139,9 +139,7 @@ public class HdfsState implements State {
                 throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(),
e);
             }
 
-            if (this.rotationPolicy instanceof TimedRotationPolicy) {
-                ((TimedRotationPolicy) this.rotationPolicy).start();
-            }
+            rotationPolicy.start();
         }
 
         /**
@@ -455,6 +453,12 @@ public class HdfsState implements State {
     }
 
     /**
+     * Returns temp file path corresponding to a file name.
+     */
+    private Path tmpFilePath(String filename) {
+        return new Path(filename + ".tmp");
+    }
+    /**
      * Reads the last txn record from index file if it exists, if not
      * from .tmp file if exists.
      *
@@ -463,7 +467,7 @@ public class HdfsState implements State {
      * @throws IOException
      */
     private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
-        Path tmpPath = new Path(indexFilePath.toString() + ".tmp");
+        Path tmpPath = tmpFilePath(indexFilePath.toString());
         if (this.options.fs.exists(indexFilePath)) {
             return readTxnRecord(indexFilePath);
         } else if (this.options.fs.exists(tmpPath)) {
@@ -489,7 +493,7 @@ public class HdfsState implements State {
         FSDataOutputStream out = null;
         LOG.debug("Starting index update.");
         try {
-            Path tmpPath = new Path(this.indexFilePath.toString() + ".tmp");
+            Path tmpPath = tmpFilePath(indexFilePath.toString());
             out = this.options.fs.create(tmpPath, true);
             BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
             TxnRecord txnRecord = new TxnRecord(txId, options.currentFile.toString(), this.options.getCurrentOffset());

http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
index 89fd918..f429221 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
@@ -55,4 +55,10 @@ public interface FileRotationPolicy extends Serializable {
      *
      */
     void reset();
+
+    /**
+     * Start the policy. Useful in case of policies like timed rotation
+     * where the timer can be started.
+     */
+    void start();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 79b4f75..fad6455 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -83,6 +83,11 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
         this.lastOffset = 0;
     }
 
+    @Override
+    public void start() {
+
+    }
+
     public long getMaxBytes() {
         return maxBytes;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
index caabee5..8117f95 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
@@ -37,4 +37,9 @@ public class NoRotationPolicy implements FileRotationPolicy {
     @Override
     public void reset() {
     }
+
+    @Override
+    public void start() {
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c44e02b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index 74e7fab..f8cfe44 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -84,6 +84,7 @@ public class TimedRotationPolicy implements FileRotationPolicy {
     /**
      * Start the timer to run at fixed intervals.
      */
+    @Override
     public void start() {
         rotationTimer = new Timer(true);
         TimerTask task = new TimerTask() {


Mime
View raw message