storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nathan Leung <ncle...@gmail.com>
Subject Re: duplicated result
Date Tue, 06 May 2014 13:40:47 GMT
Good point, but it shouldn't matter how many exclamation bolts there are.
The number of spouts does because they are all reading the same file.
On May 6, 2014 9:37 AM, "padma priya chitturi" <padmapriya30@gmail.com>
wrote:

> Hi,
>
> The issue lies with the number of tasks/executors specified for spout. Try
> specifying 1 spout and see if you could see duplicates. I suppose there
> would be no duplicates in specifying 1 spout and 1 exclamtion bolt.
>
>
> On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <alfartaj.bilal@gmail.com
> > wrote:
>
>> HI ,Nathan and thank you for responding ,I appreciate it  !
>> no I'm not , I just run this topology for the first time
>>
>>
>>
>>
>>
>>
>> public class Ex {
>>
>>   public static class ExclamationBolt extends BaseRichBolt {
>>     OutputCollector _collector;
>>
>>     @Override
>>     public void prepare(Map conf, TopologyContext context,
>> OutputCollector collector) {
>>       _collector = collector;
>>     }
>>
>>     @Override
>>      public void execute(Tuple tuple) {
>>       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>>       _collector.ack(tuple);
>>     }
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>       declarer.declare(new Fields("word"));
>>     }
>>
>>
>>   }
>>
>>   public static void main(String[] args) throws Exception {
>>     TopologyBuilder builder = new TopologyBuilder();
>> TryRead T = new TryRead();
>> PrinterBolty P = new PrinterBolty();
>>     builder.setSpout("word", T, 10);
>>     builder.setBolt("exclaim1", new ExclamationBolt(),
>> 3).shuffleGrouping("word");
>>     builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>>
>>
>>
>> -------------------------------------------------------------------------------------
>> my spout Tryread
>>
>>
>> public class TryRead extends BaseRichSpout {
>>           SpoutOutputCollector _collector;
>>           Random _rand;
>>           BufferedReader fileReader;
>>           FileSystem f;
>>           WatchService watcher ;
>>           String S;
>>           Path dir;
>>
>>           @Override
>>           public void open(Map conf, TopologyContext context,
>> SpoutOutputCollector collector) {
>>                   _collector = collector;
>>
>>
>> try{
>>                         f = FileSystems.getDefault();
>>                                  watcher = f.newWatchService();
>>
>>  S="/root/src/storm-starter/src/jvm/storm/starter";
>>
>>                                  dir = f.getPath(S);
>>
>>                                 dir.register(watcher, ENTRY_CREATE);
>>
>> } catch (IOException e) {
>>
>>                                                 e.printStackTrace();
>>                                         }
>>
>>           }
>>  }
>>
>>           @Override
>>           public void nextTuple() {
>>
>>
>>                   Utils.sleep(2000);
>>                    for (;;) {
>>
>>                     // wait for key to be signaled
>>                     WatchKey key;
>>                     try {
>>                         key = watcher.take();
>>                     } catch (InterruptedException x) {
>>                         return;
>>                     }
>>
>>                     for (WatchEvent<?> event: key.pollEvents()) {
>>                         WatchEvent.Kind kind = event.kind();
>>  if (kind == OVERFLOW) {
>>                             continue;
>>                         }
>>
>>
>>                         WatchEvent<Path> ev = (WatchEvent<Path>)event;
>>                         Path filename = ev.context();
>>
>>                         System.out.format("Emailing file %s%n", filename);
>>                         try {
>>
>>
>>                                                  fileReader = new
>> BufferedReader(new FileReader(new File(S+"/"+filename)));
>>
>>                                                 RandomAccessFile access =
>> null;
>>                             String line = null;
>>                                try
>>                                {
>>                                    while ((line = fileReader.readLine())
>> != null)
>>                                    {
>>                                        if (line !=null)
>>                                        {
>>
>>                                            _collector.emit(new
>> Values(line));
>>                                         }
>>                                    }
>>                                } catch (IOException e) {
>>                                                 // TODO Auto-generated
>> catch block
>>                                                 e.printStackTrace();
>>                                         }
>>                                         } catch (FileNotFoundException e)
>> {
>>                                                  // TODO Auto-generated
>> catch block
>>                                                 e.printStackTrace();
>>                                         }
>>                     }
>>    }
>>
>>                     boolean valid = key.reset();
>>                     if (!valid) {
>>                             break;
>>                     }
>>                 }
>>
>>
>>           }
>>
>>           @Override
>>           public void ack(Object id) {
>>           }
>>
>>           @Override
>>           public void fail(Object id) {
>>           }
>>
>>           @Override
>>           public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>             declarer.declare(new Fields("word"));
>>           }
>>
>>
>>
>> 2014-05-06 12:35 GMT+01:00 Nathan Leung <ncleung@gmail.com>:
>>
>> You are creating your file writer with append set to true. It's it
>>> possible your topology was run more than once?
>>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <alfartaj.bilal@gmail.com>
>>> wrote:
>>>
>>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>>> bolt ) and writes it on a file , the problem I got is that I have
>>>> duplicated results four times , like when I emit a word , I found the word
>>>> Written four times . where's the problem possibly could be ?
>>>>
>>>>
>>>>
>>>>
>>>> public class PrinterBolty extends BaseBasicBolt {
>>>>
>>>>   @Override
>>>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>
>>>> try {
>>>>
>>>>                         BufferedWriter output;
>>>>                         output = new BufferedWriter(new
>>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>>                         output.newLine();
>>>>                         output.append(tuple.getString(0));
>>>>                         output.close();
>>>>
>>>>                 } catch (IOException e) {
>>>>                         // TODO Auto-generated catch block
>>>>                         e.printStackTrace();
>>>>                 }
>>>>  }
>>>>
>>>>   @Override
>>>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>   }
>>>>
>>>> }
>>>>
>>>>
>>>> --
>>>> *Al Fartakh Bilal*
>>>>
>>>
>>
>>
>> --
>> *Al Fartakh Bilal*
>>
>
>

Mime
View raw message