storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajeev <rajeev.dal...@gmail.com>
Subject Ack method in Spout does not get called.
Date Sun, 21 Jan 2018 03:12:07 GMT
Hi,

I have a simple topology as below:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "DirSpout", new DirectoryScan() );
builder.setBolt( "printBeacon", new FileSource(), 1 ).shuffleGrouping(
"DirSpout" );
Below is my Spout:

public class DirectoryScan extends BaseRichSpout {

private String inputDirectory;
private String compressionType;
private String fileNamePattern;
private boolean validationNeeded;
private String validPattern;
private File validationFile;
private SpoutOutputCollector collector;
@Override
    public void ack( Object msgId ) {
System.out.println("acked with: " + msgId );
Tuple tuple = ( Tuple ) msgId;
String path = tuple.getString( 0 );
System.out.println( "Path acked: " + path );
    }

@Override
    public void fail(Object msgId) {
System.out.println( "I failed you: " + msgId.toString() );
    }
@Override
public void open( Map conf, TopologyContext context, SpoutOutputCollector
collector ) {

this.collector = collector;
inputDirectory = ( String ) conf.get( "inputDirectory" );
compressionType = ( String ) conf.get( "compressionType" );
fileNamePattern = ( String ) conf.get( "fileNamePattern" );
validationNeeded = ( Boolean ) conf.get( "validationNeeded" );
validPattern = ( String ) conf.get( "validPattern" );
}

@Override
public void nextTuple() {
int i = 1;
File dir = new File( inputDirectory );
FileFilter filteredFiles = new RegexFileFilter( fileNamePattern );

int maxFiles = 4;
while( true ) {
System.out.println( "NIO run" );
long start = System.currentTimeMillis();
Path pdir = FileSystems.getDefault().getPath( inputDirectory );
try {
Thread.sleep(5000);
DirectoryStream<Path> stream = Files.newDirectoryStream( pdir,
fileNamePattern );
for (Path path : stream) {
if ( i > maxFiles) break;
System.out.println( "" + i + ": " + path.getFileName() );
Object msgId = "ID " + i;
this.collector.emit( new Values( path.toString() ), msgId );
i ++;
}
stream.close();
} catch (IOException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long stop = System.currentTimeMillis();
System.out.println( "Elapsed: " + (stop - start) + " ms" );
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "File" ) );
}

public static class FileComparator implements Comparator< File > {

public int compare( File fileA, File fileB ) {
long lastModifiedA = fileA.lastModified();
long lastModifiedB = fileB.lastModified();

if ( lastModifiedA > lastModifiedB )
return 1;
else if ( lastModifiedB > lastModifiedA )
return -1;

return 0;
}
}

}







Below is my Bolt:
public class FileSource extends BaseRichBolt {

private OutputCollector outputCollector = null;
private String processedDirectory;
private boolean deleteFile;
private BufferedReader bufferedReader = null;
private FileReader fileReader = null;
private File thisFile = null;
private String thisFileName = null;
private boolean validationNeeded;
private String validPattern;
private File validationFile;

@Override
public void prepare( Map stormConf, TopologyContext context,
OutputCollector collector ) {

processedDirectory = ( String ) stormConf.get( "processedDirectory" );
deleteFile = ( Boolean ) stormConf.get( "deleteFile" );
validationNeeded = ( Boolean ) stormConf.get( "validationNeeded" );
validPattern = ( String ) stormConf.get( "validPattern" );

outputCollector = collector;
}

@Override
public void execute( Tuple tuple ) {

String file = tuple.getStringByField( "File" );
try {
File thisFile = new File( file );
Path path = thisFile.toPath();
BufferedReader reader = Files.newBufferedReader( path,
Charset.forName("UTF-8") );

String currentLine = null;
while((currentLine = reader.readLine()) != null){
System.out.println( "Printing within filesource: " + currentLine );
outputCollector.emit( tuple, new Values( currentLine ) );
}
outputCollector.ack( tuple );
System.out.println("end of file read: " + path.toString());
thisFile.renameTo( new File( processedDirectory + "/" + thisFile.getName()
) );

} catch ( IOException e ) {
e.printStackTrace();
}
}

@Override
public void declareOutputFields( OutputFieldsDeclarer declarer ) {
declarer.declare( new Fields( "record" ) );
}
}





But the above is not calling the ack method in the spout. Can you please
let me know what is wrong in this code.

Regards,
Rajeev.

Mime
View raw message