storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bilal Al Fartakh <alfartaj.bi...@gmail.com>
Subject Re: duplicated result
Date Tue, 06 May 2014 14:16:25 GMT
Thank you  :D
specifying 1 spout was the solution .
thank you Padma and Nathan again for  your help !


2014-05-06 14:40 GMT+01:00 Nathan Leung <ncleung@gmail.com>:

> Good point, but it shouldn't matter how many exclamation bolts there are.
> The number of spouts does because they are all reading the same file.
> On May 6, 2014 9:37 AM, "padma priya chitturi" <padmapriya30@gmail.com>
> wrote:
>
>> Hi,
>>
>> The issue lies with the number of tasks/executors specified for spout.
>> Try specifying 1 spout and see if you could see duplicates. I suppose there
>> would be no duplicates in specifying 1 spout and 1 exclamtion bolt.
>>
>>
>> On Tue, May 6, 2014 at 5:23 PM, Bilal Al Fartakh <
>> alfartaj.bilal@gmail.com> wrote:
>>
>>> HI ,Nathan and thank you for responding ,I appreciate it  !
>>> no I'm not , I just run this topology for the first time
>>>
>>>
>>>
>>>
>>>
>>>
>>> public class Ex {
>>>
>>>   public static class ExclamationBolt extends BaseRichBolt {
>>>     OutputCollector _collector;
>>>
>>>     @Override
>>>     public void prepare(Map conf, TopologyContext context,
>>> OutputCollector collector) {
>>>       _collector = collector;
>>>     }
>>>
>>>     @Override
>>>      public void execute(Tuple tuple) {
>>>       _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
>>>       _collector.ack(tuple);
>>>     }
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>       declarer.declare(new Fields("word"));
>>>     }
>>>
>>>
>>>   }
>>>
>>>   public static void main(String[] args) throws Exception {
>>>     TopologyBuilder builder = new TopologyBuilder();
>>> TryRead T = new TryRead();
>>> PrinterBolty P = new PrinterBolty();
>>>     builder.setSpout("word", T, 10);
>>>     builder.setBolt("exclaim1", new ExclamationBolt(),
>>> 3).shuffleGrouping("word");
>>>     builder.setBolt("exclaim2", P , 2).shuffleGrouping("exclaim1");
>>>
>>>
>>>
>>> -------------------------------------------------------------------------------------
>>> my spout Tryread
>>>
>>>
>>> public class TryRead extends BaseRichSpout {
>>>           SpoutOutputCollector _collector;
>>>           Random _rand;
>>>           BufferedReader fileReader;
>>>           FileSystem f;
>>>           WatchService watcher ;
>>>           String S;
>>>           Path dir;
>>>
>>>           @Override
>>>           public void open(Map conf, TopologyContext context,
>>> SpoutOutputCollector collector) {
>>>                   _collector = collector;
>>>
>>>
>>> try{
>>>                         f = FileSystems.getDefault();
>>>                                  watcher = f.newWatchService();
>>>
>>>  S="/root/src/storm-starter/src/jvm/storm/starter";
>>>
>>>                                  dir = f.getPath(S);
>>>
>>>                                 dir.register(watcher, ENTRY_CREATE);
>>>
>>> } catch (IOException e) {
>>>
>>>                                                 e.printStackTrace();
>>>                                         }
>>>
>>>           }
>>>  }
>>>
>>>           @Override
>>>           public void nextTuple() {
>>>
>>>
>>>                   Utils.sleep(2000);
>>>                    for (;;) {
>>>
>>>                     // wait for key to be signaled
>>>                     WatchKey key;
>>>                     try {
>>>                         key = watcher.take();
>>>                     } catch (InterruptedException x) {
>>>                         return;
>>>                     }
>>>
>>>                     for (WatchEvent<?> event: key.pollEvents()) {
>>>                         WatchEvent.Kind kind = event.kind();
>>>  if (kind == OVERFLOW) {
>>>                             continue;
>>>                         }
>>>
>>>
>>>                         WatchEvent<Path> ev = (WatchEvent<Path>)event;
>>>                         Path filename = ev.context();
>>>
>>>                         System.out.format("Emailing file %s%n",
>>> filename);
>>>                         try {
>>>
>>>
>>>                                                  fileReader = new
>>> BufferedReader(new FileReader(new File(S+"/"+filename)));
>>>
>>>                                                 RandomAccessFile access
>>> = null;
>>>                             String line = null;
>>>                                try
>>>                                {
>>>                                    while ((line = fileReader.readLine())
>>> != null)
>>>                                    {
>>>                                        if (line !=null)
>>>                                        {
>>>
>>>                                            _collector.emit(new
>>> Values(line));
>>>                                         }
>>>                                    }
>>>                                } catch (IOException e) {
>>>                                                 // TODO Auto-generated
>>> catch block
>>>                                                 e.printStackTrace();
>>>                                         }
>>>                                         } catch (FileNotFoundException
>>> e) {
>>>                                                  // TODO Auto-generated
>>> catch block
>>>                                                 e.printStackTrace();
>>>                                         }
>>>                     }
>>>    }
>>>
>>>                     boolean valid = key.reset();
>>>                     if (!valid) {
>>>                             break;
>>>                     }
>>>                 }
>>>
>>>
>>>           }
>>>
>>>           @Override
>>>           public void ack(Object id) {
>>>           }
>>>
>>>           @Override
>>>           public void fail(Object id) {
>>>           }
>>>
>>>           @Override
>>>           public void declareOutputFields(OutputFieldsDeclarer declarer)
>>> {
>>>             declarer.declare(new Fields("word"));
>>>           }
>>>
>>>
>>>
>>> 2014-05-06 12:35 GMT+01:00 Nathan Leung <ncleung@gmail.com>:
>>>
>>> You are creating your file writer with append set to true. It's it
>>>> possible your topology was run more than once?
>>>> On May 6, 2014 6:39 AM, "Bilal Al Fartakh" <alfartaj.bilal@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm using a bolt that receives tuples from another bolt (exclamation
>>>>> bolt ) and writes it on a file , the problem I got is that I have
>>>>> duplicated results four times , like when I emit a word , I found the
word
>>>>> Written four times . where's the problem possibly could be ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> public class PrinterBolty extends BaseBasicBolt {
>>>>>
>>>>>   @Override
>>>>>   public void execute(Tuple tuple, BasicOutputCollector collector) {
>>>>>
>>>>> try {
>>>>>
>>>>>                         BufferedWriter output;
>>>>>                         output = new BufferedWriter(new
>>>>> FileWriter("/root/src/storm-starter/hh.txt", true));
>>>>>                         output.newLine();
>>>>>                         output.append(tuple.getString(0));
>>>>>                         output.close();
>>>>>
>>>>>                 } catch (IOException e) {
>>>>>                         // TODO Auto-generated catch block
>>>>>                         e.printStackTrace();
>>>>>                 }
>>>>>  }
>>>>>
>>>>>   @Override
>>>>>   public void declareOutputFields(OutputFieldsDeclarer ofd) {
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> --
>>>>> *Al Fartakh Bilal*
>>>>>
>>>>
>>>
>>>
>>> --
>>> *Al Fartakh Bilal*
>>>
>>
>>


-- 
*Al Fartakh Bilal*

Mime
View raw message