mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pat Ferrel <...@occamsmachete.com>
Subject More mappers in RowId
Date Tue, 05 Jun 2012 15:13:29 GMT
RowID+RowSimilarity are by far the slowest of the stops in my particular 
pipeline, which includes crawl, seq2sparse, and cluster.

What is the status of more parallelization of these? Dan offers a mod. 
Has this been tested with rowsimilarity?

I'd be happy to give it a try but am having trouble with clustering in 
the trunk right now.

On 6/4/12 4:20 AM, DAN HELM wrote:
>   
> I also developed a quick and dirty rowid mod so that it generates one martrix output
file for each "part" input file from seq2sparse command -- so multiple mappers are used to
process output from rowid (such as when running CVB clustering on rowid output).  For that
I just made a derivative rowid command called rowiddist.  So the final solution should be
logic rolled into a single rowid command, where the run type (multiple file output vs single
matrix file) is driven by parameters.  The code below was just a quick "solution".
>   
> **** code follows ****
> package home.mahout;
> import com.google.common.io.Closeables;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.FileStatus;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.IntWritable;
> import org.apache.hadoop.io.SequenceFile;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.util.ToolRunner;
> import org.apache.mahout.common.AbstractJob;
> import org.apache.mahout.common.Pair;
> import org.apache.mahout.common.iterator.sequencefile.PathFilters;
> import org.apache.mahout.common.iterator.sequencefile.PathType;
> import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
> import org.apache.mahout.math.VectorWritable;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.util.Map;
>   
> public class RowIdJobDistributed extends AbstractJob {
>    private static final Logger log = LoggerFactory.getLogger(RowIdJobDistributed.class);
>    @Override
>    public int run(String[] args) throws Exception {
>      addInputOption();
>      addOutputOption();
>      Map<String,String>  parsedArgs = parseArguments(args);
>      if (parsedArgs == null) {
>        return -1;
>      }
>      Configuration conf = getConf();
>      FileSystem fs = FileSystem.get(conf);
>      Path outputPath = getOutputPath();
>      Path indexPath = new Path(outputPath, "docIndex");
>      SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs,
>                                                                  conf,
>                                                                  indexPath,
>                                                                  IntWritable.class,
>                                                                  Text.class);
>      FileStatus fstatus[] = fs.listStatus(getInputPath());
>   
>      try {
>        IntWritable docId = new IntWritable();
>        int i = 0;
>        int numCols = 0;
>        int matrixCount = 0;
>       
>        for ( FileStatus f: fstatus ) {
>         if (f.isDir()) {
>        continue;
>       }      
>       Path inPath = new Path(f.getPath().toUri().toString());
>       Path matrixPath = new Path(outputPath, "matrix-"+matrixCount);
>       
>       SequenceFile.Writer matrixWriter = SequenceFile.createWriter(fs,
>                                                                       conf,
>                                                                       matrixPath,
>                                                                       IntWritable.class,
>                                                                       VectorWritable.class);
>       
>          for (Pair<Text,VectorWritable>  record :
>               new SequenceFileDirIterable<Text,VectorWritable>(inPath,
>                                                                PathType.LIST,
>                                                                PathFilters.logsCRCFilter(),
>                                                                null,
>                                                                true,
>                                                                conf)) {
>            VectorWritable value = record.getSecond();
>            docId.set(i);
>            indexWriter.append(docId, record.getFirst());
>            matrixWriter.append(docId, value);
>            i++;
>            numCols = value.get().size();
>          }
>         
>          Closeables.closeQuietly(matrixWriter);
>          matrixCount++;
>         
>        }
>        log.info("Wrote out matrix with {} rows and {} columns to {}", new Object[] {
i, numCols, outputPath });
>        return 0;
>      } finally {
>        Closeables.closeQuietly(indexWriter);
>      }
>    }
>    public static void main(String[] args) throws Exception {
>      ToolRunner.run(new RowIdJobDistributed(), args);
>    }
> }
>
>
>
> ________________________________
>   From: Suneel Marthi<suneel_marthi@yahoo.com>
> To: "user@mahout.apache.org"<user@mahout.apache.org>
> Sent: Monday, June 4, 2012 1:39 AM
> Subject: Questions on RowId job
>
>
> 3. There was an issue that was brought up on these forums sometime last week to which
Jake's response was that it would be a welcome patch to be able to configure the RowId Job
to run in a distributed mode with many mappers. I agree that this is a useful change. This
change means that the RowSimilarity Job which takes as inputs, the matrix and the number of
columns generated by the RowId job needs to be tested to ensure that nothing's broken.
>
> Should I go ahead and open a Jira for the above issues?
>
>
> Regards,
> Suneel

Mime
View raw message