Thank you  :D 
specifying 1 spout was the solution .
thank you Padma and Nathan again for  your help !


2014-05-06 14:40 GMT+01:00 Nathan Leung <ncleung@gmail.com>:

Good point, but it shouldn't matter how many exclamation bolts there are. The number of spouts does because they are all reading the same file.

On May 6, 2014 9:37 AM, "padma priya chitturi" <padmapriya30@gmail.com> wrote:
Hi,

The issue lies with the number of tasks/executors specified for spout. Try specifying 1 spout and see if you could see duplicates. I suppose there would be no duplicates in specifying 1 spout and 1 exclamtion bolt.


On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <alfartaj.bilal@gmail.com> wrote:
HI ,Nathan and thank you for responding ,I appreciate it  !
no I'm not , I just run this topology for the first time 






public class Ex {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
TryRead T = new TryRead();
PrinterBolty P = new PrinterBolty();
    builder.setSpout("word", T, 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");


-------------------------------------------------------------------------------------
my spout Tryread


public class TryRead extends BaseRichSpout {
          SpoutOutputCollector _collector;
          Random _rand;
          BufferedReader fileReader;
          FileSystem f;
          WatchService watcher ;
          String S;
          Path dir;

          @Override
          public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                  _collector = collector;


try{
                        f = FileSystems.getDefault();
                                 watcher = f.newWatchService();
                                 S="/root/src/storm-starter/src/jvm/storm/starter";

                                 dir = f.getPath(S);

                                dir.register(watcher, ENTRY_CREATE);

} catch (IOException e) {

                                                e.printStackTrace();
                                        }

          }
 }

          @Override
          public void nextTuple() {


                  Utils.sleep(2000);
                   for (;;) {

                    // wait for key to be signaled
                    WatchKey key;
                    try {
                        key = watcher.take();
                    } catch (InterruptedException x) {
                        return;
                    }

                    for (WatchEvent<?> event: key.pollEvents()) {
                        WatchEvent.Kind kind = event.kind();
 if (kind == OVERFLOW) {
                            continue;
                        }


                        WatchEvent<Path> ev = (WatchEvent<Path>)event;
                        Path filename = ev.context();

                        System.out.format("Emailing file %s%n", filename);
                        try {


                                                 fileReader = new BufferedReader(new FileReader(new File(S+"/"+filename)));

                                                RandomAccessFile access = null;
                            String line = null;
                               try
                               {
                                   while ((line = fileReader.readLine()) != null)
                                   {
                                       if (line !=null)
                                       {

                                           _collector.emit(new Values(line));
                                       }
                                   }
                               } catch (IOException e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                        }
                                        } catch (FileNotFoundException e) {
                                                // TODO Auto-generated catch block
                                                e.printStackTrace();
                                        }
                    }
   }

                    boolean valid = key.reset();
                    if (!valid) {
                            break;
                    }
                }


          }

          @Override
          public void ack(Object id) {
          }

          @Override
          public void fail(Object id) {
          }

          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
          }



2014-05-06 12:35 GMT+01:00 Nathan Leung <ncleung@gmail.com>:

You are creating your file writer with append set to true. It's it possible your topology was run more than once?

On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <alfartaj.bilal@gmail.com> wrote:
I'm using a bolt that receives tuples from another bolt (exclamation bolt ) and writes it on a file , the problem I got is that I have duplicated results four times , like when I emit a word , I found the word Written four times . where's the problem possibly could be ? 




public class PrinterBolty extends BaseBasicBolt {

  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {

try {

                        BufferedWriter output;
                        output = new BufferedWriter(new FileWriter("/root/src/storm-starter/hh.txt", true));
                        output.newLine();
                        output.append(tuple.getString(0));
                        output.close();

                } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
 }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer ofd) {
  }

}


--
Al Fartakh Bilal



--
Al Fartakh Bilal




--
Al Fartakh Bilal