Dear Wiki user, You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification. The "HadoopSupport" page has been changed by jeremyhanna. http://wiki.apache.org/cassandra/HadoopSupport?action=diff&rev1=8&rev2=9 -------------------------------------------------- - Cassandra version 0.6 and later support running Hadoop jobs against data in Cassandra, out of the box. See https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ for an example. (Inserting the ''output'' of a Hadoop job into Cassandra has always been possible.) Cassandra rows or row fragments (that is, pairs of key + `SortedMap` of columns) are input to Map tasks for processing by your job, as specified by a `SlicePredicate` that describes which columns to fetch from each row. Here's how this looks in the word_count example, which selects just one configurable columnName from each row: + == Overview == + Cassandra version 0.6 and later enable certain Hadoop functionality against Cassandra's data store. Specifically, support has been added for MapReduce and Pig. + + == MapReduce == + While writing output to Cassandra has always been possible by implementing certain interfaces from the Hadoop library, version 0.6 of Cassandra added support for retrieving data from Cassandra. Cassandra 0.6 adds implementations of InputSplit, InputFormat, and RecordReader so that Hadoop MapReduce jobs can retrieve data from Cassandra. For an example of how this works, see the contrib/word_count example in 0.6 or later. Cassandra rows or row fragments (that is, pairs of key + `SortedMap` of columns) are input to Map tasks for processing by your job, as specified by a `SlicePredicate` that describes which columns to fetch from each row. + + Here's how this looks in the word_count example, which selects just one configurable columnName from each row: {{{ ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes())); ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate); }}} - Cassandra also provides a [[http://hadoop.apache.org/pig/|Pig]] `LoadFunc` for running jobs in the Pig DSL instead of writing Java code by hand. This is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/. + Cassandra's splits are location-aware (this is the nature of the Hadoop InputSplit design). Cassandra gives the Hadoop JobTracker a list of locations with each split of data. That way, the JobTracker can try to preserve data locality when assigning tasks to TaskTrackers. Therefore, when using Hadoop alongside Cassandra, it is best to have a TaskTracker running on the same node as the Cassandra nodes, if data locality while processing is desired and to minimize copying data between Cassandra and Hadoop nodes. - Cassandra's splits are location-aware (this is the nature of the Hadoop InputSplit design). Cassandra gives hadoop a list of locations with each split of data, and Hadoop tries to schedule jobs on instances near that data, which in practice means you should have Hadoop instances on each of your Cassandra machines. + As of 0.7, there will be a basic mechanism included in Cassandra for outputting data to cassandra. See [[https://issues.apache.org/jira/browse/CASSANDRA-1101|CASSANDRA-1101]] for details. - Releases before 0.6.2/0.7 are affected by a small resource leak that may cause jobs to fail (connections are not released properly, causing a resource leak). Depending on your local setup you may hit this issue, and workaround it by raising the limit of open file descriptors for the process (e.g. in linux/bash using `ulimit -n 32000`). + Releases before 0.6.2/0.7 are affected by a small resource leak that may cause jobs to fail (connections are not released properly, causing a resource leak). Depending on your local setup you may hit this issue, and workaround it by raising the limit of open file descriptors for the process (e.g. in linux/bash using `ulimit -n 32000`). The error will be reported on the hadoop job side as a thrift TimedOutException. - The error will be reported on the hadoop job side as a thrift TimedOutException. - If you are testing the integration against a single node and you obtain some failures, this may be normal: you are probably overloading the single machine, which may again result in timeout errors. You can workaround it by reducing the number of concurrent tasks + If you are testing the integration against a single node and you obtain some failures, this may be normal: you are probably overloading the single machine, which may again result in timeout errors. You can workaround it by reducing the number of concurrent tasks + {{{ - Configuration conf = job.getConfiguration(); + Configuration conf = job.getConfiguration(); - conf.setInt("mapred.tasktracker.map.tasks.maximum",1); + conf.setInt("mapred.tasktracker.map.tasks.maximum",1); }}} + Also, you may reduce the size in rows of the batch you are reading from cassandra - Also, you may reduce the size in rows of the batch you are reading from cassandra {{{ ConfigHelper.setRangeBatchSize(job.getConfiguration(), 1000); }}} + == Pig == + Cassandra 0.6 also adds support for [[http://hadoop.apache.org/pig/|Pig]] with its own implementation of LoadFunc. This allows Pig queries to be run against data stored in Cassandra. For an example of this, see the contrib/pig example in 0.6 and later. + == Hive == + Hive is currently not supported in Cassandra but there has been thought given to support Hive in the future - [[https://issues.apache.org/jira/browse/CASSANDRA-913|CASSANDRA-913]] +