From commits-return-15062-apmail-cassandra-commits-archive=cassandra.apache.org@cassandra.apache.org Thu Jun 17 06:37:55 2010 Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 325 invoked from network); 16 Jun 2010 22:55:58 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 16 Jun 2010 22:55:58 -0000 Received: (qmail 56984 invoked by uid 500); 16 Jun 2010 22:55:58 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 56975 invoked by uid 500); 16 Jun 2010 22:55:57 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 56967 invoked by uid 500); 16 Jun 2010 22:55:57 -0000 Delivered-To: apmail-incubator-cassandra-commits@incubator.apache.org Received: (qmail 56964 invoked by uid 99); 16 Jun 2010 22:55:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jun 2010 22:55:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.130] (HELO eos.apache.org) (140.211.11.130) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Jun 2010 22:55:54 +0000 Received: from eos.apache.org (localhost [127.0.0.1]) by eos.apache.org (Postfix) with ESMTP id 8DC6A1761F; Wed, 16 Jun 2010 22:55:33 +0000 (GMT) MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable From: Apache Wiki To: Apache Wiki Date: Wed, 16 Jun 2010 22:55:33 -0000 Message-ID: <20100616225533.20077.89285@eos.apache.org> Subject: =?utf-8?q?=5BCassandra_Wiki=5D_Update_of_=22HadoopSupport=22_by_jeremyhan?= =?utf-8?q?na?= X-Virus-Checked: Checked by ClamAV on apache.org Dear Wiki user, You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for= change notification. The "HadoopSupport" page has been changed by jeremyhanna. http://wiki.apache.org/cassandra/HadoopSupport?action=3Ddiff&rev1=3D8&rev2= =3D9 -------------------------------------------------- - Cassandra version 0.6 and later support running Hadoop jobs against data = in Cassandra, out of the box. See https://svn.apache.org/repos/asf/cassand= ra/trunk/contrib/word_count/ for an example. (Inserting the ''output'' of = a Hadoop job into Cassandra has always been possible.) Cassandra rows or r= ow fragments (that is, pairs of key + `SortedMap` of columns) are input to = Map tasks for processing by your job, as specified by a `SlicePredicate` th= at describes which columns to fetch from each row. Here's how this looks i= n the word_count example, which selects just one configurable columnName fr= om each row: + =3D=3D Overview =3D=3D + Cassandra version 0.6 and later enable certain Hadoop functionality again= st Cassandra's data store. Specifically, support has been added for MapRed= uce and Pig. + = + =3D=3D MapReduce =3D=3D + While writing output to Cassandra has always been possible by implementin= g certain interfaces from the Hadoop library, version 0.6 of Cassandra adde= d support for retrieving data from Cassandra. Cassandra 0.6 adds implement= ations of InputSplit, InputFormat, and RecordReader so that Hadoop MapReduc= e jobs can retrieve data from Cassandra. For an example of how this works,= see the contrib/word_count example in 0.6 or later. Cassandra rows or row= fragments (that is, pairs of key + `SortedMap` of columns) are input to = Map tasks for processing by your job, as specified by a `SlicePredicate` = that describes which columns to fetch from each row. + = + Here's how this looks in the word_count example, which selects just one = configurable columnName from each row: = {{{ ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE= , COLUMN_FAMILY); SlicePredicate predicate =3D new SlicePredicate().setColumn_n= ames(Arrays.asList(columnName.getBytes())); ConfigHelper.setSlicePredicate(job.getConfiguration(), predic= ate); }}} - Cassandra also provides a [[http://hadoop.apache.org/pig/|Pig]] `LoadFunc= ` for running jobs in the Pig DSL instead of writing Java code by hand. Th= is is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/. + Cassandra's splits are location-aware (this is the nature of the Hadoop I= nputSplit design). Cassandra gives the Hadoop JobTracker a list of locati= ons with each split of data. That way, the JobTracker can try to preserve = data locality when assigning tasks to TaskTrackers. Therefore, when using= Hadoop alongside Cassandra, it is best to have a TaskTracker running on t= he same node as the Cassandra nodes, if data locality while processing is = desired and to minimize copying data between Cassandra and Hadoop nodes. = - Cassandra's splits are location-aware (this is the nature of the Hadoop I= nputSplit design). Cassandra gives hadoop a list of locations with each sp= lit of data, and Hadoop tries to schedule jobs on instances near that data,= which in practice means you should have Hadoop instances on each of your C= assandra machines. + As of 0.7, there will be a basic mechanism included in Cassandra for out= putting data to cassandra. See [[https://issues.apache.org/jira/browse/CAS= SANDRA-1101|CASSANDRA-1101]] for details. = - Releases before 0.6.2/0.7 are affected by a small resource leak that may= cause jobs to fail (connections are not released properly, causing a resou= rce leak). Depending on your local setup you may hit this issue, and workar= ound it by raising the limit of open file descriptors for the process (e.g.= in linux/bash using `ulimit -n 32000`). = + Releases before 0.6.2/0.7 are affected by a small resource leak that ma= y cause jobs to fail (connections are not released properly, causing a res= ource leak). Depending on your local setup you may hit this issue, and wor= karound it by raising the limit of open file descriptors for the process (= e.g. in linux/bash using `ulimit -n 32000`). The error will be reported on= the hadoop job side as a thrift TimedOutException. - The error will be reported on the hadoop job side as a thrift TimedOutExc= eption. = - If you are testing the integration against a single node and you obtain s= ome failures, this may be normal: you are probably overloading the single m= achine, which may again result in timeout errors. You can workaround it by = reducing the number of concurrent tasks + If you are testing the integration against a single node and you obtain = some failures, this may be normal: you are probably overloading the single= machine, which may again result in timeout errors. You can workaround it = by reducing the number of concurrent tasks + = {{{ - Configuration conf =3D job.getConfiguration(); = + Configuration conf =3D job.getConfiguration(); - conf.setInt("mapred.tasktracker.map.tasks.maximum",1); = + conf.setInt("mapred.tasktracker.map.tasks.maximum",1); }}} + Also, you may reduce the size in rows of the batch you are reading from = cassandra = - Also, you may reduce the size in rows of the batch you are reading from c= assandra = {{{ ConfigHelper.setRangeBatchSize(job.getConfiguration(), 1000); }}} + =3D=3D Pig =3D=3D + Cassandra 0.6 also adds support for [[http://hadoop.apache.org/pig/|Pig]]= with its own implementation of LoadFunc. This allows Pig queries to be ru= n against data stored in Cassandra. For an example of this, see the contri= b/pig example in 0.6 and later. = + =3D=3D Hive =3D=3D + Hive is currently not supported in Cassandra but there has been thought g= iven to support Hive in the future - [[https://issues.apache.org/jira/brows= e/CASSANDRA-913|CASSANDRA-913]] +=20