storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rajeev <rajeev.dal...@gmail.com>
Subject Re: Ack method in Spout does not get called.
Date Mon, 22 Jan 2018 02:25:57 GMT
Hi Susheel,

Thanks for the reply..

I tried emitting the path.toString() as the messageId in my Spout like
below:

this.collector.emit( new Values( path.toString() ), path.toString() );


And in my bolt the emit and ack is like below:

outputCollector.emit( tuple, new Values( currentLine ) );
outputCollector.ack( tuple );

But still i do not see the ack coming in for Spout. Can you please point
what i am doing wrong here.

Regards,
Rajeev.

On 21 January 2018 at 21:41, Susheel Kumar Gadalay <skgadalay@gmail.com>
wrote:

> I think this emit from bolt is not correct.
>         outputCollector.emit( tuple, new Values( currentLine ) );
> You have to anchor the same msgid of spout.
>
>
> On 1/21/18, Rajeev <rajeev.dalavi@gmail.com> wrote:
> > Hi,
> >
> > I have a simple topology as below:
> > TopologyBuilder builder = new TopologyBuilder();
> > builder.setSpout( "DirSpout", new DirectoryScan() );
> > builder.setBolt( "printBeacon", new FileSource(), 1 ).shuffleGrouping(
> > "DirSpout" );
> > Below is my Spout:
> >
> > public class DirectoryScan extends BaseRichSpout {
> >
> > private String inputDirectory;
> > private String compressionType;
> > private String fileNamePattern;
> > private boolean validationNeeded;
> > private String validPattern;
> > private File validationFile;
> > private SpoutOutputCollector collector;
> > @Override
> >     public void ack( Object msgId ) {
> > System.out.println("acked with: " + msgId );
> > Tuple tuple = ( Tuple ) msgId;
> > String path = tuple.getString( 0 );
> > System.out.println( "Path acked: " + path );
> >     }
> >
> > @Override
> >     public void fail(Object msgId) {
> > System.out.println( "I failed you: " + msgId.toString() );
> >     }
> > @Override
> > public void open( Map conf, TopologyContext context, SpoutOutputCollector
> > collector ) {
> >
> > this.collector = collector;
> > inputDirectory = ( String ) conf.get( "inputDirectory" );
> > compressionType = ( String ) conf.get( "compressionType" );
> > fileNamePattern = ( String ) conf.get( "fileNamePattern" );
> > validationNeeded = ( Boolean ) conf.get( "validationNeeded" );
> > validPattern = ( String ) conf.get( "validPattern" );
> > }
> >
> > @Override
> > public void nextTuple() {
> > int i = 1;
> > File dir = new File( inputDirectory );
> > FileFilter filteredFiles = new RegexFileFilter( fileNamePattern );
> >
> > int maxFiles = 4;
> > while( true ) {
> > System.out.println( "NIO run" );
> > long start = System.currentTimeMillis();
> > Path pdir = FileSystems.getDefault().getPath( inputDirectory );
> > try {
> > Thread.sleep(5000);
> > DirectoryStream<Path> stream = Files.newDirectoryStream( pdir,
> > fileNamePattern );
> > for (Path path : stream) {
> > if ( i > maxFiles) break;
> > System.out.println( "" + i + ": " + path.getFileName() );
> > Object msgId = "ID " + i;
> > this.collector.emit( new Values( path.toString() ), msgId );
> > i ++;
> > }
> > stream.close();
> > } catch (IOException | InterruptedException e) {
> > // TODO Auto-generated catch block
> > e.printStackTrace();
> > }
> > long stop = System.currentTimeMillis();
> > System.out.println( "Elapsed: " + (stop - start) + " ms" );
> > }
> > }
> >
> > @Override
> > public void declareOutputFields(OutputFieldsDeclarer declarer) {
> > declarer.declare( new Fields( "File" ) );
> > }
> >
> > public static class FileComparator implements Comparator< File > {
> >
> > public int compare( File fileA, File fileB ) {
> > long lastModifiedA = fileA.lastModified();
> > long lastModifiedB = fileB.lastModified();
> >
> > if ( lastModifiedA > lastModifiedB )
> > return 1;
> > else if ( lastModifiedB > lastModifiedA )
> > return -1;
> >
> > return 0;
> > }
> > }
> >
> > }
> >
> >
> >
> >
> >
> >
> >
> > Below is my Bolt:
> > public class FileSource extends BaseRichBolt {
> >
> > private OutputCollector outputCollector = null;
> > private String processedDirectory;
> > private boolean deleteFile;
> > private BufferedReader bufferedReader = null;
> > private FileReader fileReader = null;
> > private File thisFile = null;
> > private String thisFileName = null;
> > private boolean validationNeeded;
> > private String validPattern;
> > private File validationFile;
> >
> > @Override
> > public void prepare( Map stormConf, TopologyContext context,
> > OutputCollector collector ) {
> >
> > processedDirectory = ( String ) stormConf.get( "processedDirectory" );
> > deleteFile = ( Boolean ) stormConf.get( "deleteFile" );
> > validationNeeded = ( Boolean ) stormConf.get( "validationNeeded" );
> > validPattern = ( String ) stormConf.get( "validPattern" );
> >
> > outputCollector = collector;
> > }
> >
> > @Override
> > public void execute( Tuple tuple ) {
> >
> > String file = tuple.getStringByField( "File" );
> > try {
> > File thisFile = new File( file );
> > Path path = thisFile.toPath();
> > BufferedReader reader = Files.newBufferedReader( path,
> > Charset.forName("UTF-8") );
> >
> > String currentLine = null;
> > while((currentLine = reader.readLine()) != null){
> > System.out.println( "Printing within filesource: " + currentLine );
> > outputCollector.emit( tuple, new Values( currentLine ) );
> > }
> > outputCollector.ack( tuple );
> > System.out.println("end of file read: " + path.toString());
> > thisFile.renameTo( new File( processedDirectory + "/" +
> thisFile.getName()
> > ) );
> >
> > } catch ( IOException e ) {
> > e.printStackTrace();
> > }
> > }
> >
> > @Override
> > public void declareOutputFields( OutputFieldsDeclarer declarer ) {
> > declarer.declare( new Fields( "record" ) );
> > }
> > }
> >
> >
> >
> >
> >
> > But the above is not calling the ack method in the spout. Can you please
> > let me know what is wrong in this code.
> >
> > Regards,
> > Rajeev.
> >
>

Mime
View raw message