storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [05/12] storm git commit: Added doc to explain when exaclty once semantics is supported; Auto disable exactly once if file size > 1GB.
Date Fri, 14 Aug 2015 18:01:21 GMT
Added doc to explain when exaclty once semantics is supported; Auto disable exactly once if
file size > 1GB.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8505c3ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8505c3ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8505c3ef

Branch: refs/heads/master
Commit: 8505c3ef2d5e32c1716f49f6928851363c25df91
Parents: 85eadd7
Author: Arun Mahadevan <aiyer@hortonworks.com>
Authored: Mon Aug 3 12:02:52 2015 +0530
Committer: Arun Mahadevan <aiyer@hortonworks.com>
Committed: Mon Aug 3 12:02:52 2015 +0530

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   |  9 +++++++
 .../apache/storm/hdfs/trident/HdfsState.java    | 26 +++++++++++++++++---
 .../rotation/FileSizeRotationPolicy.java        |  3 +++
 3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index b37af7e..bb4a618 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -317,6 +317,15 @@ that of the bolts.
                 .addRotationAction(new MoveFileAction().toDestination("/dest2/"));
 ```
 
+### Note
+Whenever a batch is replayed by storm (due to failures), the trident state implementation
automatically removes 
+duplicates from the current data file by copying the data up to the last transaction to another
file . Since this 
+operation involves a lot of data copy, the exactly once semantics is enabled only if `FileSizeRotationPolicy`
with 
+file size less than 1 GB is specified.
+
+The exactly once semantics is automatically disabled if `FileSizeRotationPolicy` with size
greater than 1 GB or
+`TimedRotationPolicy` is in use.
+
 ##Working with Secure HDFS
 If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated
by NameNode. We 
 currently have 2 options to support this:

http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 8d32b32..3dc566c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -35,6 +35,7 @@ import org.apache.storm.hdfs.trident.format.FileNameFormat;
 import org.apache.storm.hdfs.trident.format.RecordFormat;
 import org.apache.storm.hdfs.trident.format.SequenceFormat;
 import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
 import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,9 +125,16 @@ public class HdfsState implements State {
             this.writeLock = new Object();
             if (this.rotationPolicy == null) {
                 throw new IllegalStateException("RotationPolicy must be specified.");
+            } else if (this.rotationPolicy instanceof FileSizeRotationPolicy) {
+                long limit = FileSizeRotationPolicy.Units.GB.getByteCount();
+                if(((FileSizeRotationPolicy) rotationPolicy).getMaxBytes() > limit) {
+                    LOG.warn("*** Exactly once semantics is not supported for FileSizeRotationPolicy
with size > 1 GB ***");
+                    LOG.warn("Turning off exactly once.");
+                    this.exactlyOnce = false;
+                }
             } else if (this.rotationPolicy instanceof TimedRotationPolicy) {
-                LOG.warn("*** Exactly once semantics is not supported with TimedRotationPolicy
***");
-                LOG.warn("*** Turning off exactly once.");
+                LOG.warn("*** Exactly once semantics is not supported for TimedRotationPolicy
***");
+                LOG.warn("Turning off exactly once.");
                 this.exactlyOnce = false;
             }
             if (this.fsUrl == null) {
@@ -220,8 +228,18 @@ public class HdfsState implements State {
             return this;
         }
 
-        public HdfsFileOptions withBufferSize(int size) {
-            this.bufferSize = Math.max(4096, size); // at least 4K
+        /**
+         * <p>Set the size of the buffer used for hdfs file copy in case of recovery.
The default
+         * value is 131072.</p>
+         *
+         * <p> Note: The lower limit for the parameter is 4096, below which the
+         * option is ignored. </p>
+         *
+         * @param sizeInBytes the buffer size in bytes
+         * @return {@link HdfsFileOptions}
+         */
+        public HdfsFileOptions withBufferSize(int sizeInBytes) {
+            this.bufferSize = Math.max(4096, sizeInBytes); // at least 4K
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/8505c3ef/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 5d99108..79b4f75 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -83,4 +83,7 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
         this.lastOffset = 0;
     }
 
+    public long getMaxBytes() {
+        return maxBytes;
+    }
 }


Mime
View raw message