spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Hbase
Date Fri, 01 Aug 2014 14:27:32 GMT
Here's a piece of code. In your case, you are missing the call() method
inside the map function.


import java.util.Iterator;

import java.util.List;

import org.apache.commons.configuration.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;

import org.apache.spark.SparkContext;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.rdd.NewHadoopRDD;

import org.apache.spark.streaming.Duration;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import com.google.common.collect.Lists;

import scala.Function1;

import scala.Tuple2;

import scala.collection.JavaConversions;

import scala.collection.Seq;

import scala.collection.JavaConverters.*;

import scala.reflect.ClassTag;

public class SparkHBaseMain {

 @SuppressWarnings("deprecation")

public static void main(String[] arg){

 try{

 List<String> jars =
> Lists.newArrayList("/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar",

"/home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar");

SparkConf spconf = new SparkConf();

spconf.setMaster("local");

spconf.setAppName("SparkHBase");

spconf.setSparkHome("/home/akhld/Desktop/tools/spark-9");

spconf.setJars(jars.toArray(new String[jars.size()]));

spconf.set("spark.executor.memory", "1g");

final JavaSparkContext sc = new JavaSparkContext(spconf);

 org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();

conf.addResource("/home/akhld/Downloads/sparkhbasecode/hbase-site.xml");

conf.set(TableInputFormat.INPUT_TABLE, "blogposts");

 NewHadoopRDD<ImmutableBytesWritable, Result> rdd = new
> NewHadoopRDD<ImmutableBytesWritable,
> Result>(JavaSparkContext.toSparkContext(sc), TableInputFormat.class,
> ImmutableBytesWritable.class, Result.class, conf);

 JavaRDD<Tuple2<ImmutableBytesWritable, Result>> jrdd = rdd.toJavaRDD();

 *ForEachFunction f = new ForEachFunction();*

* JavaRDD<Iterator<String>> retrdd = jrdd.map(f);*



> System.out.println("Count =>" + retrdd.count());

 }catch(Exception e){

 e.printStackTrace();

System.out.println("Craaaashed : " + e);

 }

 }

 @SuppressWarnings("serial")

    private static class ForEachFunction extends
> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{

            *public Iterator<String> call(Tuple2<ImmutableBytesWritable,
> Result> test) {*

*            Result tmp = (Result) test._2;*

* List<KeyValue> kvl = tmp.getColumn("post".getBytes(),
> "title".getBytes());*

* for(KeyValue kl:kvl){*

* String sb = new String(kl.getValue());*

* System.out.println("Value :" + sb);*

* }*

*            return null;*

*            }*

     }


> }


Hope it helps.


Thanks
Best Regards


On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar <
mrajaforu@gmail.com> wrote:

> Hi Akhil,
>
> Thank you for your response. I'm facing below issues.
>
> I'm not able to print the values. Am I missing any thing. Could you please
> look into this issue.
>
>     JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
> sc.newAPIHadoopRDD(
>             conf,
>     TableInputFormat.class,
>     ImmutableBytesWritable.class,
>     Result.class);
>
>     System.out.println(" ROWS COUNT = "+ hBaseRDD.count());
>
>   JavaRDD R = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable,
> Result>, Iterator<String>>(){
>
>         public Iterator<String> call(Tuple2<ImmutableBytesWritable,
> Result> test)
>         {
>             Result tmp = (Result) test._2;
>
>             System.out.println("Inside ");
>
>         //    List<KeyValue> kvl = tmp.getColumn("post".getBytes(),
> "title".getBytes());
>             for(KeyValue kl:tmp.raw())
>
>             {
>             String sb = new String(kl.getValue());
>             System.out.println(sb);
>             }
>             return null;
>         }
>     }
>     );
>
> *Output :*
>
> ROWS COUNT = 8
>
> It is not printing "Inside" statement also. I think it is not going into
> this function.
>
> Could you please help me on this issue.
>
> Thank you for your support and help
>
> Regards,
> Rajesh
>
>
>
> On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das <akhil@sigmoidanalytics.com>
> wrote:
>
>> You can use a map function like the following and do whatever you want
>> with the Result.
>>
>> Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{
>>>             public Iterator<String> call(Tuple2<ImmutableBytesWritable,
>>> Result> test) {
>>>             Result tmp = (Result) test._2;
>>>  List<KeyValue> kvl = *tmp.getColumn("post".getBytes(),
>>> "title".getBytes());*
>>> for(KeyValue kl:kvl){
>>>  String sb = new String(kl.getValue());
>>> System.out.println(sb);
>>>  }
>>
>>
>>
>>
>> Thanks
>> Best Regards
>>
>>
>> On Thu, Jul 31, 2014 at 10:19 PM, Madabhattula Rajesh Kumar <
>> mrajaforu@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I'm using below code to read table from hbase
>>>
>>> Configuration conf = HBaseConfiguration.create();
>>> conf.set(TableInputFormat.INPUT_TABLE, "table1");
>>>
>>> JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(
>>>             conf,
>>>     TableInputFormat.class,
>>>     ImmutableBytesWritable.class,
>>>     Result.class);
>>>
>>> I got hBaseRDD. I'm not able to read the column values from hBaseRDD.
>>>
>>> *Could you please let me know, how to read the column values from
>>> hBaseRDD?*
>>>
>>> Thank you for your help.
>>>
>>> Regards,
>>> Rajesh
>>>
>>>
>>
>

Mime
View raw message