Dear Wiki user,
You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification.
The "HadoopSupport" page has been changed by jeremyhanna.
http://wiki.apache.org/cassandra/HadoopSupport?action=diff&rev1=8&rev2=9
--------------------------------------------------
- Cassandra version 0.6 and later support running Hadoop jobs against data in Cassandra, out
of the box. See https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ for
an example. (Inserting the ''output'' of a Hadoop job into Cassandra has always been possible.)
Cassandra rows or row fragments (that is, pairs of key + `SortedMap` of columns) are input
to Map tasks for processing by your job, as specified by a `SlicePredicate` that describes
which columns to fetch from each row. Here's how this looks in the word_count example, which
selects just one configurable columnName from each row:
+ == Overview ==
+ Cassandra version 0.6 and later enable certain Hadoop functionality against Cassandra's
data store. Specifically, support has been added for MapReduce and Pig.
+
+ == MapReduce ==
+ While writing output to Cassandra has always been possible by implementing certain interfaces
from the Hadoop library, version 0.6 of Cassandra added support for retrieving data from Cassandra.
Cassandra 0.6 adds implementations of InputSplit, InputFormat, and RecordReader so that Hadoop
MapReduce jobs can retrieve data from Cassandra. For an example of how this works, see the
contrib/word_count example in 0.6 or later. Cassandra rows or row fragments (that is, pairs
of key + `SortedMap` of columns) are input to Map tasks for processing by your job, as specified
by a `SlicePredicate` that describes which columns to fetch from each row.
+
+ Here's how this looks in the word_count example, which selects just one configurable columnName
from each row:
{{{
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
}}}
- Cassandra also provides a [[http://hadoop.apache.org/pig/|Pig]] `LoadFunc` for running jobs
in the Pig DSL instead of writing Java code by hand. This is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/.
+ Cassandra's splits are location-aware (this is the nature of the Hadoop InputSplit design).
Cassandra gives the Hadoop JobTracker a list of locations with each split of data. That
way, the JobTracker can try to preserve data locality when assigning tasks to TaskTrackers.
Therefore, when using Hadoop alongside Cassandra, it is best to have a TaskTracker running
on the same node as the Cassandra nodes, if data locality while processing is desired and
to minimize copying data between Cassandra and Hadoop nodes.
- Cassandra's splits are location-aware (this is the nature of the Hadoop InputSplit design).
Cassandra gives hadoop a list of locations with each split of data, and Hadoop tries to schedule
jobs on instances near that data, which in practice means you should have Hadoop instances
on each of your Cassandra machines.
+ As of 0.7, there will be a basic mechanism included in Cassandra for outputting data to
cassandra. See [[https://issues.apache.org/jira/browse/CASSANDRA-1101|CASSANDRA-1101]] for
details.
- Releases before 0.6.2/0.7 are affected by a small resource leak that may cause jobs to
fail (connections are not released properly, causing a resource leak). Depending on your local
setup you may hit this issue, and workaround it by raising the limit of open file descriptors
for the process (e.g. in linux/bash using `ulimit -n 32000`).
+ Releases before 0.6.2/0.7 are affected by a small resource leak that may cause jobs to
fail (connections are not released properly, causing a resource leak). Depending on your
local setup you may hit this issue, and workaround it by raising the limit of open file
descriptors for the process (e.g. in linux/bash using `ulimit -n 32000`). The error will
be reported on the hadoop job side as a thrift TimedOutException.
- The error will be reported on the hadoop job side as a thrift TimedOutException.
- If you are testing the integration against a single node and you obtain some failures, this
may be normal: you are probably overloading the single machine, which may again result in
timeout errors. You can workaround it by reducing the number of concurrent tasks
+ If you are testing the integration against a single node and you obtain some failures,
this may be normal: you are probably overloading the single machine, which may again result
in timeout errors. You can workaround it by reducing the number of concurrent tasks
+
{{{
- Configuration conf = job.getConfiguration();
+ Configuration conf = job.getConfiguration();
- conf.setInt("mapred.tasktracker.map.tasks.maximum",1);
+ conf.setInt("mapred.tasktracker.map.tasks.maximum",1);
}}}
+ Also, you may reduce the size in rows of the batch you are reading from cassandra
- Also, you may reduce the size in rows of the batch you are reading from cassandra
{{{
ConfigHelper.setRangeBatchSize(job.getConfiguration(), 1000);
}}}
+ == Pig ==
+ Cassandra 0.6 also adds support for [[http://hadoop.apache.org/pig/|Pig]] with its own implementation
of LoadFunc. This allows Pig queries to be run against data stored in Cassandra. For an
example of this, see the contrib/pig example in 0.6 and later.
+ == Hive ==
+ Hive is currently not supported in Cassandra but there has been thought given to support
Hive in the future - [[https://issues.apache.org/jira/browse/CASSANDRA-913|CASSANDRA-913]]
+
|