storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Aaron Bossert" <maboss...@gmail.com>
Subject [Apache Storm] New to storm and need more detail on multilingual topologies
Date Sat, 02 Jan 2016 22:27:26 GMT
All,

Forgive me ahead of time...my Java is MIGHTY rusty as I don't use it much...

I am trying to create a Storm topology that uses a number of spouts and
bolts written in Perl.  I have read the documentation for IO::Storm on
metacpan and have a decent understanding of how to write the individual
spouts and bolts, but there is no detail about how to pull it all
together.  I have three questions:

1. In my emit function (from perl), I am outputting a file name to be
further processed as a simple string (e.g. '/somedir/somefile.txt').  What
format should I be emitting?  Is this correct?  If so, does the bolt
receiving the emitted message (again, written in Perl) need to do any
conversion on the string? or is it just a string that is ready for further
use?

2.  Given the source code files in perl and the java code for the topology,
I see references to the config...but have not stumbled on what to put in
the config for my cluster...am I missing some obvious documentation?

3. Finally, here is the code for my initial spout that simply grabs all
files in a directory, then listens for additional files to be added (think
log directory with new files being added periodically).  As the follow-on
bolt requests the next file to process (using the next_tuple method), send
the file name to the bolt and then, once the ack is received, remove the
file from the array of files to process.  Am I going about this the "right"
way?

#!/usr/bin/env perl


package NetflowFilesToProcessSpout;


use 5.016;

use strict;

use warnings;

use Data::Dumper;

use File::Monitor;


extends 'IO::Storm::Spout';


our $VERSION = '0.01';


#=+ Make it easier for portability

use Exporter;

our $SOURCE_DIR => '/someDir/someSubDir/';

our @EXPORT_OK = ($SOURCE_DIR);


#=+ Some variables that are needed across subs, namely the array of files
and the File::Monitor object

my (@files,$monitor);


sub initialize {

    my ( $self, $storm_conf, $context ) = @_;



    #=+ Now load all flow files into an array

    opendir(my $dh, $SOURCE_DIR);

    my @files = grep !/^\./, readdir($dh);

    my @sortedFiles = sort {$a cmp $b} @files;

    closedir($dh);



    #=+ Add full path to each file

    for(my $index=0;$index<=$#files;$index++){

      $files[$index] = $SOURCE_DIR.$files[$index];

    }



    #=+ Now set up the directory monitor to push new files onto the
existing file array

    my $monitor = File::Monitor->new();

    $monitor->watch($SOURCE_DIR);



    #=+ We do the first scan now so that any changes going forward will be
recognized

    @files = $monitor->scan;

}


sub next_tuple {

  my ($self) = @_;



  #=+ first, check if there are any new files in our flow directory

  my @changes = $monitor->scan();

  push @files, @changes if @changes;



  if(@files) {

    #=+ Now emit exactly one file name (oldest first, for now)

    my $nextFileToProc = $files[0];

    $self->emit($nextFileToProc);

  }

  else {

    #=+ in order to keep Storm from thinking the process is dead if there
are not currently any more files to process, return a noop (from the
IO::Storm documentation)

    return;

  }

}


sub ack {

  my ($self,$id) = @_;

  #=+ For now, this looks like a good way to ensure we don't get rid of a
file to process if there was a failure donw the pipeline.

  shift @files;

}


sub process {

    my ($self, $tuple) = @_;



    #=+ Let's log that have pushed out a file for processing

    my $filesize = -s $tuple;

    $self->log('NetflowFilesToProcessSpout: sent file to process'.$tuple.'
(size:'.$filesize.')');

}


#=+ Finally, need to actually run

NetflowFilesToProcessSpout->new->run;


1;

Mime
View raw message