Hi Susheel,

Thanks for the reply..

I tried emitting the path.toString() as the messageId in my Spout like below:

this.collector.emit( new Values( path.toString() ), path.toString() );


And in my bolt the emit and ack is like below:

outputCollector.emit( tuple, new Values( currentLine ) );
outputCollector.ack( tuple );

But still i do not see the ack coming in for Spout. Can you please point what i am doing wrong here.

Regards,
Rajeev.

On 21 January 2018 at 21:41, Susheel Kumar Gadalay <skgadalay@gmail.com> wrote:
I think this emit from bolt is not correct.
        outputCollector.emit( tuple, new Values( currentLine ) );
You have to anchor the same msgid of spout.


On 1/21/18, Rajeev <rajeev.dalavi@gmail.com> wrote:
> Hi,
>
> I have a simple topology as below:
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout( "DirSpout", new DirectoryScan() );
> builder.setBolt( "printBeacon", new FileSource(), 1 ).shuffleGrouping(
> "DirSpout" );
> Below is my Spout:
>
> public class DirectoryScan extends BaseRichSpout {
>
> private String inputDirectory;
> private String compressionType;
> private String fileNamePattern;
> private boolean validationNeeded;
> private String validPattern;
> private File validationFile;
> private SpoutOutputCollector collector;
> @Override
>     public void ack( Object msgId ) {
> System.out.println("acked with: " + msgId );
> Tuple tuple = ( Tuple ) msgId;
> String path = tuple.getString( 0 );
> System.out.println( "Path acked: " + path );
>     }
>
> @Override
>     public void fail(Object msgId) {
> System.out.println( "I failed you: " + msgId.toString() );
>     }
> @Override
> public void open( Map conf, TopologyContext context, SpoutOutputCollector
> collector ) {
>
> this.collector = collector;
> inputDirectory = ( String ) conf.get( "inputDirectory" );
> compressionType = ( String ) conf.get( "compressionType" );
> fileNamePattern = ( String ) conf.get( "fileNamePattern" );
> validationNeeded = ( Boolean ) conf.get( "validationNeeded" );
> validPattern = ( String ) conf.get( "validPattern" );
> }
>
> @Override
> public void nextTuple() {
> int i = 1;
> File dir = new File( inputDirectory );
> FileFilter filteredFiles = new RegexFileFilter( fileNamePattern );
>
> int maxFiles = 4;
> while( true ) {
> System.out.println( "NIO run" );
> long start = System.currentTimeMillis();
> Path pdir = FileSystems.getDefault().getPath( inputDirectory );
> try {
> Thread.sleep(5000);
> DirectoryStream<Path> stream = Files.newDirectoryStream( pdir,
> fileNamePattern );
> for (Path path : stream) {
> if ( i > maxFiles) break;
> System.out.println( "" + i + ": " + path.getFileName() );
> Object msgId = "ID " + i;
> this.collector.emit( new Values( path.toString() ), msgId );
> i ++;
> }
> stream.close();
> } catch (IOException | InterruptedException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> long stop = System.currentTimeMillis();
> System.out.println( "Elapsed: " + (stop - start) + " ms" );
> }
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare( new Fields( "File" ) );
> }
>
> public static class FileComparator implements Comparator< File > {
>
> public int compare( File fileA, File fileB ) {
> long lastModifiedA = fileA.lastModified();
> long lastModifiedB = fileB.lastModified();
>
> if ( lastModifiedA > lastModifiedB )
> return 1;
> else if ( lastModifiedB > lastModifiedA )
> return -1;
>
> return 0;
> }
> }
>
> }
>
>
>
>
>
>
>
> Below is my Bolt:
> public class FileSource extends BaseRichBolt {
>
> private OutputCollector outputCollector = null;
> private String processedDirectory;
> private boolean deleteFile;
> private BufferedReader bufferedReader = null;
> private FileReader fileReader = null;
> private File thisFile = null;
> private String thisFileName = null;
> private boolean validationNeeded;
> private String validPattern;
> private File validationFile;
>
> @Override
> public void prepare( Map stormConf, TopologyContext context,
> OutputCollector collector ) {
>
> processedDirectory = ( String ) stormConf.get( "processedDirectory" );
> deleteFile = ( Boolean ) stormConf.get( "deleteFile" );
> validationNeeded = ( Boolean ) stormConf.get( "validationNeeded" );
> validPattern = ( String ) stormConf.get( "validPattern" );
>
> outputCollector = collector;
> }
>
> @Override
> public void execute( Tuple tuple ) {
>
> String file = tuple.getStringByField( "File" );
> try {
> File thisFile = new File( file );
> Path path = thisFile.toPath();
> BufferedReader reader = Files.newBufferedReader( path,
> Charset.forName("UTF-8") );
>
> String currentLine = null;
> while((currentLine = reader.readLine()) != null){
> System.out.println( "Printing within filesource: " + currentLine );
> outputCollector.emit( tuple, new Values( currentLine ) );
> }
> outputCollector.ack( tuple );
> System.out.println("end of file read: " + path.toString());
> thisFile.renameTo( new File( processedDirectory + "/" + thisFile.getName()
> ) );
>
> } catch ( IOException e ) {
> e.printStackTrace();
> }
> }
>
> @Override
> public void declareOutputFields( OutputFieldsDeclarer declarer ) {
> declarer.declare( new Fields( "record" ) );
> }
> }
>
>
>
>
>
> But the above is not calling the ack method in the spout. Can you please
> let me know what is wrong in this code.
>
> Regards,
> Rajeev.
>