mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Angelo Immediata <angelo...@gmail.com>
Subject KMeans cluster analysis
Date Thu, 05 Dec 2013 11:38:22 GMT
Hi

First of all I'm sorry if I repeat this question..but it's pretty old one
and I really need some help since I'm a really newbie to mahout and hadoop

I need to do some cluster analysis by using some data. At the beginning
this data can be not too much huge, but after some time they can be really
huge (I did some calculation and after 1 year this data cann be around 37
billion of records) Since I have this huge data, I decided to do the
cluster analysis by using Mahout on the top of Apache Hadoop and its HDFS.
Regarding where to store this big amount of data I decided to use Apache
HBase always on the top of Apache Hadoop HDFS

Now I need to do this cluster analysi by considering some environment
variables. These variable may be the following:

   - *recordId* = id of the record
   - *arcId *= id of the arc between 2 points of my "street graph"
   - *mediumVelocity *= medium velocity of the considered arc in the
   specified
   - *vehiclesNumber* = number of the monitored vehicles in order to get
   that velocity
   - *meteo *= weather condition (a numeric representing if there is sun,
   rain etc...)
   - *manifestation *= a numeric representing if there is any kind of
   manifestation (sport manifestation or other)
   - *day of the week*
   - *month of the year*
   - *hour of the day*
   - *vacation *= a numeric representing if it's a vacation day or a
   working day

So my data are so formatted (raw representation):

*recordId arcId mediumVelocity vehiclesNumber meteo manifestation
weekDay yearMonth dayHour vacation*
1         1      34.5            20            1      3            4
   2011       10      3
2         156    66.5            3             2      5            1
   2008        6      2

The clustering should be done by taking care of at least these variables:
meteo, manifestation, weekDay, dayHour, vacation

No in order to take data from HBase I used the MapReduce funcionalities
provided by HBase; basically I wrote this code:

My MapperReducer class:

package hadoop.mapred;

import hadoop.hbase.model.HistoricalDataModel;

import java.io.IOException;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.join.TupleWritable;

public class HistoricalDataMapRed {

public static class HistoricalDataMapper extends TableMapper<Text,
TupleWritable> {

private static final Log logger =
LogFactory.getLog(HistoricalDataMapper.class.getName());

private int numRecords = 0;

@SuppressWarnings({ "unchecked", "rawtypes" })

protected void map(Text key, Result result,
org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException,
InterruptedException {

try{

Writable[] vals = new Writable[4];

IntWritable calFest = new
IntWritable(Bytes.toInt(result.getValue(HistoricalDataModel.HISTORICAL_DATA_FAMILY,
HistoricalDataModel.CALENDARIO_FESTIVO)));

vals[0] = calFest;

IntWritable calEven = new
IntWritable(Bytes.toInt(result.getValue(HistoricalDataModel.HISTORICAL_DATA_FAMILY,
HistoricalDataModel.CALENDARIO_EVENTI)));

vals[1] = calEven;

IntWritable meteo = new
IntWritable(Bytes.toInt(result.getValue(HistoricalDataModel.HISTORICAL_DATA_FAMILY,
HistoricalDataModel.EVENTO_METEO)));

vals[2] = meteo;

IntWritable manifestazione = new
IntWritable(Bytes.toInt(result.getValue(HistoricalDataModel.HISTORICAL_DATA_FAMILY,
HistoricalDataModel.MANIFESTAZIONE)));

vals[3] = manifestazione;

String chiave = Bytes.toString(result.getRow());

Text text = new Text();

text.set(chiave);

context.write(text, new TupleWritable(vals));

numRecords++;

if ((numRecords % 10000) == 0) {

context.setStatus("mapper processed " + numRecords + " records so far");

}

}catch(Exception e){

String message = "Errore nel mapper; messaggio errore: "+e.getMessage();

logger.fatal(message, e);

throw new IOException(message);

}

}

}

public static class HistoricalDataReducer extends
TableReducer<ImmutableBytesWritable, TupleWritable, ImmutableBytesWritable>
{

private static final Log logger =
LogFactory.getLog(HistoricalDataReducer.class.getName());

@SuppressWarnings({ "rawtypes", "unchecked" })

protected void reduce(Text key, Iterable<TupleWritable> values,
org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException,
InterruptedException {

try{

context.write(key, values);

}catch(Exception e){

String message = "Errore nel mapper; messaggio errore: "+e.getMessage();

logger.fatal(message, e);

throw new IOException(message);

}

}

}

}

And then I wrote:

try {

Configuration conf = HBaseConfiguration.create();

Job job = new Job(conf, "HBase_historicaldataJob");

job.setJarByClass(HistoricalDataMapper.class);

Scan scan = new Scan();

scan.addFamily(HistoricalDataModel.HISTORICAL_DATA_FAMILY);

scan.setCaching(500);

scan.setCacheBlocks(false);

TableMapReduceUtil.initTableMapperJob(

ClusteringHistoricalDataDao.HBASE_TABLE_NAME,

scan,

HistoricalDataMapper.class,

ImmutableBytesWritable.class,

TupleWritable.class,

job

);

job.setReducerClass(HistoricalDataReducer.class);

job.setNumReduceTasks(2);

Path path = new Path("/tmp/mr/mySummaryFile");

HadoopUtil.delete(conf, path);

FileOutputFormat.setOutputPath(job, path);  // adjust directories as
required

boolean b = job.waitForCompletion(true);

} catch (Exception e) {

logger.fatal("Errore ", e);

throw new IllegalStateException(e);

}

By doing in this way I can generate my SequenceFile with the input data;
now I should use it in order to do the cluster analysis; here there is the
problem; how can I use the generated file in order to make a cluster
analysis by taking care of the variable previously listed?

Moreover, is this approach a good approach in order to make the cluster
analysis?

I searched around, but I was not able in finding any good sample Any
suggestion would be really appreciated

Thank you Angelo

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message