storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bilal Al Fartakh <alfartaj.bi...@gmail.com>
Subject Re: duplicated result
Date Tue, 06 May 2014 11:53:33 GMT
HI ,Nathan and thank you for responding ,I appreciate it  !
no I'm not , I just run this topology for the first time






public class Ex {

  public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector
collector) {
      _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
      _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
    }


  }

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
TryRead T = new TryRead();
PrinterBolty P = new PrinterBolty();
    builder.setSpout("word", T, 10);
    builder.setBolt("exclaim1", new ExclamationBolt(),
3).shuffleGrouping("word");
    builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");


-------------------------------------------------------------------------------------
my spout Tryread


public class TryRead extends BaseRichSpout {
          SpoutOutputCollector _collector;
          Random _rand;
          BufferedReader fileReader;
          FileSystem f;
          WatchService watcher ;
          String S;
          Path dir;

          @Override
          public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
                  _collector = collector;


try{
                        f = FileSystems.getDefault();
                                 watcher = f.newWatchService();

 S="/root/src/storm-starter/src/jvm/storm/starter";

                                 dir = f.getPath(S);

                                dir.register(watcher, ENTRY_CREATE);

} catch (IOException e) {

                                                e.printStackTrace();
                                        }

          }
 }

          @Override
          public void nextTuple() {


                  Utils.sleep(2000);
                   for (;;) {

                    // wait for key to be signaled
                    WatchKey key;
                    try {
                        key = watcher.take();
                    } catch (InterruptedException x) {
                        return;
                    }

                    for (WatchEvent<?> event: key.pollEvents()) {
                        WatchEvent.Kind kind = event.kind();
 if (kind == OVERFLOW) {
                            continue;
                        }


                        WatchEvent<Path> ev = (WatchEvent<Path>)event;
                        Path filename = ev.context();

                        System.out.format("Emailing file %s%n", filename);
                        try {


                                                 fileReader = new
BufferedReader(new FileReader(new File(S+"/"+filename)));

                                                RandomAccessFile access =
null;
                            String line = null;
                               try
                               {
                                   while ((line = fileReader.readLine()) !=
null)
                                   {
                                       if (line !=null)
                                       {

                                           _collector.emit(new
Values(line));
                                       }
                                   }
                               } catch (IOException e) {
                                                // TODO Auto-generated
catch block
                                                e.printStackTrace();
                                        }
                                        } catch (FileNotFoundException e) {
                                                // TODO Auto-generated
catch block
                                                e.printStackTrace();
                                        }
                    }
   }

                    boolean valid = key.reset();
                    if (!valid) {
                            break;
                    }
                }


          }

          @Override
          public void ack(Object id) {
          }

          @Override
          public void fail(Object id) {
          }

          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
          }



2014-05-06 12:35 GMT+01:00 Nathan Leung <ncleung@gmail.com>:

> You are creating your file writer with append set to true. It's it
> possible your topology was run more than once?
> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <alfartaj.bilal@gmail.com>
> wrote:
>
>> I'm using a bolt that receives tuples from another bolt (exclamation bolt
>> ) and writes it on a file , the problem I got is that I have duplicated
>> results four times , like when I emit a word , I found the word Written four
>> times . where's the problem possibly could be ?
>>
>>
>>
>>
>> public class PrinterBolty extends BaseBasicBolt {
>>
>>   @Override
>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>
>> try {
>>
>>                         BufferedWriter output;
>>                         output = new BufferedWriter(new
>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>                         output.newLine();
>>                         output.append(tuple.getString(0));
>>                         output.close();
>>
>>                 } catch (IOException e) {
>>                         // TODO Auto-generated catch block
>>                         e.printStackTrace();
>>                 }
>>  }
>>
>>   @Override
>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>   }
>>
>> }
>>
>>
>> --
>> *Al Fartakh Bilal*
>>
>


-- 
*Al Fartakh Bilal*

Mime
View raw message