storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [01/12] storm git commit: Support for exactly once semantics in HdfsState
Date Fri, 14 Aug 2015 18:01:17 GMT
Repository: storm
Updated Branches:
  refs/heads/master b8d5635e8 -> aa308e116


Support for exactly once semantics in HdfsState


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5631b9d4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5631b9d4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5631b9d4

Branch: refs/heads/master
Commit: 5631b9d4746f34127a1bc89cb4488d2b2d8ec9d7
Parents: 6a21b6a
Author: Arun Mahadevan <aiyer@hortonworks.com>
Authored: Wed Jul 22 00:34:09 2015 +0530
Committer: Arun Iyer <aiyer@Arun-Iyer-MBP.local>
Committed: Wed Jul 22 00:41:31 2015 +0530

----------------------------------------------------------------------
 .../apache/storm/hdfs/trident/HdfsState.java    | 336 +++++++++++++++----
 .../trident/rotation/FileRotationPolicy.java    |   8 +
 .../rotation/FileSizeRotationPolicy.java        |   5 +
 .../hdfs/trident/rotation/NoRotationPolicy.java |   5 +
 .../trident/rotation/TimedRotationPolicy.java   |   5 +
 5 files changed, 301 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
index 67fff88..9b6b48c 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
@@ -17,14 +17,14 @@
  */
 package org.apache.storm.hdfs.trident;
 
+import backtype.storm.Config;
 import backtype.storm.task.IMetricsContext;
 import backtype.storm.topology.FailedException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
@@ -40,8 +40,7 @@ import storm.trident.operation.TridentCollector;
 import storm.trident.state.State;
 import storm.trident.tuple.TridentTuple;
 
-import java.io.IOException;
-import java.io.Serializable;
+import java.io.*;
 import java.net.URI;
 import java.util.*;
 
@@ -60,6 +59,10 @@ public class HdfsState implements State {
         protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
         protected transient Object writeLock;
         protected transient Timer rotationTimer;
+        /**
+         * This is on by default unless TimedRotationPolicy is in use.
+         */
+        private boolean exactlyOnce = true;
 
         abstract void closeOutputFile() throws IOException;
 
@@ -69,17 +72,28 @@ public class HdfsState implements State {
 
         abstract void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException;
 
-        protected void rotateOutputFile() throws IOException {
+        abstract long getCurrentOffset() throws  IOException;
+
+        abstract void doCommit(Long txId) throws IOException;
+
+        abstract void doRecover(Path srcPath, long nBytes) throws Exception;
+
+        protected boolean isExactlyOnce() {
+            return this.exactlyOnce;
+        }
+
+        protected void rotateOutputFile(boolean doRotateAction) throws IOException {
             LOG.info("Rotating output file...");
             long start = System.currentTimeMillis();
             synchronized (this.writeLock) {
                 closeOutputFile();
                 this.rotation++;
-
                 Path newFile = createOutputFile();
-                LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
-                for (RotationAction action : this.rotationActions) {
-                    action.execute(this.fs, this.currentFile);
+                if (doRotateAction) {
+                    LOG.info("Performing {} file rotation actions.", this.rotationActions.size());
+                    for (RotationAction action : this.rotationActions) {
+                        action.execute(this.fs, this.currentFile);
+                    }
                 }
                 this.currentFile = newFile;
             }
@@ -89,38 +103,49 @@ public class HdfsState implements State {
 
         }
 
-        void prepare(Map conf, int partitionIndex, int numPartitions){
+        protected void rotateOutputFile() throws IOException {
+            rotateOutputFile(true);
+        }
+
+
+        void prepare(Map conf, int partitionIndex, int numPartitions) {
             this.writeLock = new Object();
-            if (this.rotationPolicy == null) throw new IllegalStateException("RotationPolicy
must be specified.");
+            if (this.rotationPolicy == null) {
+                throw new IllegalStateException("RotationPolicy must be specified.");
+            } else if (this.rotationPolicy instanceof TimedRotationPolicy) {
+                LOG.warn("*** Exactly once semantics is not supported with TimedRotationPolicy
***");
+                LOG.warn("*** Turning off exactly once.");
+                this.exactlyOnce = false;
+            }
             if (this.fsUrl == null) {
                 throw new IllegalStateException("File system URL must be specified.");
             }
             this.fileNameFormat.prepare(conf, partitionIndex, numPartitions);
             this.hdfsConfig = new Configuration();
-            Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
-            if(map != null){
-                for(String key : map.keySet()){
+            Map<String, Object> map = (Map<String, Object>) conf.get(this.configKey);
+            if (map != null) {
+                for (String key : map.keySet()) {
                     this.hdfsConfig.set(key, String.valueOf(map.get(key)));
                 }
             }
-            try{
+            try {
                 HdfsSecurityUtil.login(conf, hdfsConfig);
                 doPrepare(conf, partitionIndex, numPartitions);
                 this.currentFile = createOutputFile();
 
-            } catch (Exception e){
+            } catch (Exception e) {
                 throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(),
e);
             }
 
-            if(this.rotationPolicy instanceof TimedRotationPolicy){
-                long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
+            if (this.rotationPolicy instanceof TimedRotationPolicy) {
+                long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
                 this.rotationTimer = new Timer(true);
                 TimerTask task = new TimerTask() {
                     @Override
                     public void run() {
                         try {
                             rotateOutputFile();
-                        } catch(IOException e){
+                        } catch (IOException e) {
                             LOG.warn("IOException during scheduled file rotation.", e);
                         }
                     }
@@ -129,6 +154,26 @@ public class HdfsState implements State {
             }
         }
 
+        /**
+         * Recovers nBytes from srcFile to the new file created
+         * by calling rotateOutputFile and then deletes the srcFile.
+         */
+        private void recover(String srcFile, long nBytes) {
+            try {
+                Path srcPath = new Path(srcFile);
+                if (nBytes > 0) {
+                    rotateOutputFile(false);
+                    this.rotationPolicy.reset();
+                    doRecover(srcPath, nBytes);
+                    LOG.info("Recovered {} bytes from {} to {}", nBytes, srcFile, currentFile);
+                }
+                fs.delete(srcPath, false);
+            } catch (Exception e) {
+                LOG.warn("Recovery failed.", e);
+                throw new RuntimeException(e);
+            }
+        }
+
     }
 
     public static class HdfsFileOptions extends Options {
@@ -137,32 +182,33 @@ public class HdfsState implements State {
         protected RecordFormat format;
         private long offset = 0;
 
-        public HdfsFileOptions withFsUrl(String fsUrl){
+        public HdfsFileOptions withFsUrl(String fsUrl) {
             this.fsUrl = fsUrl;
             return this;
         }
 
-        public HdfsFileOptions withConfigKey(String configKey){
+        public HdfsFileOptions withConfigKey(String configKey) {
             this.configKey = configKey;
             return this;
         }
 
-        public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat){
+        public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat) {
             this.fileNameFormat = fileNameFormat;
             return this;
         }
 
-        public HdfsFileOptions withRecordFormat(RecordFormat format){
+        public HdfsFileOptions withRecordFormat(RecordFormat format) {
             this.format = format;
             return this;
         }
 
-        public HdfsFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy){
+        public HdfsFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy) {
             this.rotationPolicy = rotationPolicy;
             return this;
         }
 
-        public HdfsFileOptions addRotationAction(RotationAction action){
+        @Deprecated
+        public HdfsFileOptions addRotationAction(RotationAction action) {
             this.rotationActions.add(action);
             return this;
         }
@@ -174,6 +220,45 @@ public class HdfsState implements State {
         }
 
         @Override
+        public long getCurrentOffset() {
+            return offset;
+        }
+
+        @Override
+        public void doCommit(Long txId) throws IOException {
+            synchronized (writeLock) {
+                if (this.rotationPolicy.mark(this.offset)) {
+                    rotateOutputFile();
+                    this.offset = 0;
+                    this.rotationPolicy.reset();
+                } else {
+                    if (this.out instanceof HdfsDataOutputStream) {
+                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+                    } else {
+                        this.out.hsync();
+                    }
+                }
+            }
+        }
+
+        @Override
+        void doRecover(Path srcPath, long nBytes) throws IOException {
+            this.offset = 0;
+            FSDataInputStream is = this.fs.open(srcPath);
+            copyBytes(is, out, nBytes);
+            this.offset = nBytes;
+        }
+
+        private void copyBytes(FSDataInputStream is, FSDataOutputStream out, long bytesToCopy)
throws IOException {
+            byte[] buf = new byte[4096];
+            int n;
+            while ((n = is.read(buf)) != -1 && bytesToCopy > 0) {
+                out.write(buf, 0, (int) Math.min(n, bytesToCopy));
+                bytesToCopy -= n;
+            }
+        }
+
+        @Override
         void closeOutputFile() throws IOException {
             this.out.close();
         }
@@ -187,26 +272,11 @@ public class HdfsState implements State {
 
         @Override
         public void execute(List<TridentTuple> tuples) throws IOException {
-            boolean rotated = false;
             synchronized (this.writeLock) {
                 for (TridentTuple tuple : tuples) {
                     byte[] bytes = this.format.format(tuple);
                     out.write(bytes);
                     this.offset += bytes.length;
-
-                    if (this.rotationPolicy.mark(tuple, this.offset)) {
-                        rotateOutputFile();
-                        this.offset = 0;
-                        this.rotationPolicy.reset();
-                        rotated = true;
-                    }
-                }
-                if (!rotated) {
-                    if (this.out instanceof HdfsDataOutputStream) {
-                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-                    } else {
-                        this.out.hsync();
-                    }
                 }
             }
         }
@@ -219,7 +289,7 @@ public class HdfsState implements State {
         private String compressionCodec = "default";
         private transient CompressionCodecFactory codecFactory;
 
-        public SequenceFileOptions withCompressionCodec(String codec){
+        public SequenceFileOptions withCompressionCodec(String codec) {
             this.compressionCodec = codec;
             return this;
         }
@@ -229,7 +299,7 @@ public class HdfsState implements State {
             return this;
         }
 
-        public SequenceFileOptions withConfigKey(String configKey){
+        public SequenceFileOptions withConfigKey(String configKey) {
             this.configKey = configKey;
             return this;
         }
@@ -249,12 +319,12 @@ public class HdfsState implements State {
             return this;
         }
 
-        public SequenceFileOptions withCompressionType(SequenceFile.CompressionType compressionType){
+        public SequenceFileOptions withCompressionType(SequenceFile.CompressionType compressionType)
{
             this.compressionType = compressionType;
             return this;
         }
 
-        public SequenceFileOptions addRotationAction(RotationAction action){
+        public SequenceFileOptions addRotationAction(RotationAction action) {
             this.rotationActions.add(action);
             return this;
         }
@@ -269,6 +339,36 @@ public class HdfsState implements State {
         }
 
         @Override
+        public long getCurrentOffset() throws IOException {
+            return this.writer.getLength();
+        }
+
+        @Override
+        public void doCommit(Long txId) throws IOException {
+            synchronized (writeLock) {
+                if (this.rotationPolicy.mark(this.writer.getLength())) {
+                    rotateOutputFile();
+                    this.rotationPolicy.reset();
+                } else {
+                    this.writer.hsync();
+                }
+            }
+        }
+
+
+        @Override
+        void doRecover(Path srcPath, long nBytes) throws Exception {
+            SequenceFile.Reader reader = new SequenceFile.Reader(this.hdfsConfig,
+                    SequenceFile.Reader.file(srcPath), SequenceFile.Reader.length(nBytes));
+
+            Writable key = (Writable) this.format.keyClass().newInstance();
+            Writable value = (Writable) this.format.valueClass().newInstance();
+            while(reader.next(key, value)) {
+                this.writer.append(key, value);
+            }
+        }
+
+        @Override
         Path createOutputFile() throws IOException {
             Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation,
System.currentTimeMillis()));
             this.writer = SequenceFile.createWriter(
@@ -288,45 +388,165 @@ public class HdfsState implements State {
 
         @Override
         public void execute(List<TridentTuple> tuples) throws IOException {
-            long offset;
-            for(TridentTuple tuple : tuples) {
+            for (TridentTuple tuple : tuples) {
                 synchronized (this.writeLock) {
                     this.writer.append(this.format.key(tuple), this.format.value(tuple));
-                    offset = this.writer.getLength();
-                }
-
-                if (this.rotationPolicy.mark(tuple, offset)) {
-                    rotateOutputFile();
-                    this.rotationPolicy.reset();
                 }
             }
         }
 
     }
 
+    /**
+     * TxnRecord [txnid, data_file_path, data_file_offset]
+     * <p>
+     * This is written to the index file during beginCommit() and used for recovery.
+     * </p>
+     */
+    private static class TxnRecord {
+        private long txnid;
+        private String dataFilePath;
+        private long offset;
+
+        private TxnRecord(long txnId, String dataFilePath, long offset) {
+            this.txnid = txnId;
+            this.dataFilePath = dataFilePath;
+            this.offset = offset;
+        }
+
+        @Override
+        public String toString() {
+            return Long.toString(txnid) + "," + dataFilePath + "," + Long.toString(offset);
+        }
+    }
+
+
     public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class);
     private Options options;
+    private volatile TxnRecord lastSeenTxn;
+    private Path indexFilePath;
 
-    HdfsState(Options options){
+    HdfsState(Options options) {
         this.options = options;
     }
 
-    void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions){
+    void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions)
{
         this.options.prepare(conf, partitionIndex, numPartitions);
+        if (options.isExactlyOnce()) {
+            initLastTxn(conf, partitionIndex);
+        }
+    }
+
+    private TxnRecord readTxnRecord(Path path) throws IOException {
+        FSDataInputStream inputStream = null;
+        try {
+            inputStream = this.options.fs.open(path);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+            String line;
+            if ((line = reader.readLine()) != null) {
+                String[] fields = line.split(",");
+                return new TxnRecord(Long.valueOf(fields[0]), fields[1], Long.valueOf(fields[2]));
+            }
+        } finally {
+            if (inputStream != null) {
+                inputStream.close();
+            }
+        }
+        return new TxnRecord(0, options.currentFile.toString(), 0);
+    }
+
+    /**
+     * Reads the last txn record from index file if it exists, if not
+     * from .tmp file if exists.
+     *
+     * @param indexFilePath the index file path
+     * @return the txn record from the index file or a default initial record.
+     * @throws IOException
+     */
+    private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
+        Path tmpPath = new Path(indexFilePath.toString() + ".tmp");
+        if (this.options.fs.exists(indexFilePath)) {
+            return readTxnRecord(indexFilePath);
+        } else if (this.options.fs.exists(tmpPath)) {
+            return readTxnRecord(tmpPath);
+        }
+        return new TxnRecord(0, options.currentFile.toString(), 0);
+    }
+
+    private void initLastTxn(Map conf, int partition) {
+        // include partition id in the file name so that index for different partitions are
independent.
+        String indexFileName = String.format(".index.%s.%d", conf.get(Config.TOPOLOGY_NAME),
partition);
+        this.indexFilePath = new Path(options.fileNameFormat.getPath(), indexFileName);
+        try {
+            this.lastSeenTxn = getTxnRecord(indexFilePath);
+            LOG.debug("initLastTxn updated lastSeenTxn to [{}]", this.lastSeenTxn);
+        } catch (IOException e) {
+            LOG.warn("initLastTxn failed due to IOException.", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void updateIndex(long txId) {
+        FSDataOutputStream out = null;
+        LOG.debug("Starting index update.");
+        try {
+            Path tmpPath = new Path(this.indexFilePath.toString() + ".tmp");
+            out = this.options.fs.create(tmpPath, true);
+            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(out));
+            TxnRecord txnRecord = new TxnRecord(txId, options.currentFile.toString(), this.options.getCurrentOffset());
+            bw.write(txnRecord.toString());
+            bw.newLine();
+            bw.flush();
+            /*
+             * Delete the current index file and rename the tmp file to atomically
+             * replace the index file. Orphan .tmp files are handled in getTxnRecord.
+             */
+            options.fs.delete(this.indexFilePath, false);
+            options.fs.rename(tmpPath, this.indexFilePath);
+            lastSeenTxn = txnRecord;
+            LOG.debug("updateIndex updated lastSeenTxn to [{}]", this.lastSeenTxn);
+        } catch (IOException e) {
+            LOG.warn("Begin commit failed due to IOException. Failing batch", e);
+            throw new FailedException(e);
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException e) {
+                    LOG.warn("Begin commit failed due to IOException. Failing batch", e);
+                    throw new FailedException(e);
+                }
+            }
+        }
     }
 
     @Override
     public void beginCommit(Long txId) {
+        if (options.isExactlyOnce()) {
+            if (txId <= lastSeenTxn.txnid) {
+                LOG.info("txID {} is already processed, lastSeenTxn {}. Triggering recovery.",
txId, lastSeenTxn);
+                long start = System.currentTimeMillis();
+                options.recover(lastSeenTxn.dataFilePath, lastSeenTxn.offset);
+                LOG.info("Recovery took {} ms.", System.currentTimeMillis() - start);
+            }
+            updateIndex(txId);
+        }
     }
 
     @Override
     public void commit(Long txId) {
+        try {
+            options.doCommit(txId);
+        } catch (IOException e) {
+            LOG.warn("Commit failed due to IOException. Failing the batch.", e);
+            throw new FailedException(e);
+        }
     }
 
-    public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector){
-        try{
+    public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector)
{
+        try {
             this.options.execute(tuples);
-        } catch (IOException e){
+        } catch (IOException e) {
             LOG.warn("Failing batch due to IOException.", e);
             throw new FailedException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
index 3db56f8..89fd918 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileRotationPolicy.java
@@ -41,6 +41,14 @@ public interface FileRotationPolicy extends Serializable {
      */
     boolean mark(TridentTuple tuple, long offset);
 
+    /**
+     * Check if a file rotation should be performed based on
+     * the offset at which file is being written.
+     * 
+     * @param offset the current offset of file being written
+     * @return true if a file rotation should be performed.
+     */
+    boolean mark(long offset);
 
     /**
      * Called after the HdfsBolt rotates a file.

http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
index 93f0d58..5d99108 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/FileSizeRotationPolicy.java
@@ -66,6 +66,11 @@ public class FileSizeRotationPolicy implements FileRotationPolicy {
 
     @Override
     public boolean mark(TridentTuple tuple, long offset) {
+        return mark(offset);
+    }
+
+    @Override
+    public boolean mark(long offset) {
         long diff = offset - this.lastOffset;
         this.currentBytesWritten += diff;
         this.lastOffset = offset;

http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
index 50130e5..caabee5 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/NoRotationPolicy.java
@@ -30,6 +30,11 @@ public class NoRotationPolicy implements FileRotationPolicy {
     }
 
     @Override
+    public boolean mark(long offset) {
+        return false;
+    }
+
+    @Override
     public void reset() {
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5631b9d4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
index a75e2e5..f407d1d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/rotation/TimedRotationPolicy.java
@@ -57,6 +57,11 @@ public class TimedRotationPolicy implements FileRotationPolicy {
         return false;
     }
 
+    @Override
+    public boolean mark(long offset) {
+        return false;
+    }
+
     /**
      * Called after the HdfsBolt rotates a file.
      */


Mime
View raw message