Hi Akhil,

Thank you very much for your help and support.

Regards,
Rajesh


On Fri, Aug 1, 2014 at 7:57 PM, Akhil Das <akhil@sigmoidanalytics.com> wrote:
Here's a piece of code. In your case, you are missing the call() method inside the map function.


import java.util.Iterator;
import java.util.List;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.NewHadoopRDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import com.google.common.collect.Lists;
import scala.Function1;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.JavaConverters.*;
import scala.reflect.ClassTag;
public class SparkHBaseMain {
@SuppressWarnings("deprecation")
public static void main(String[] arg){
try{
List<String> jars = Lists.newArrayList("/home/akhld/Desktop/tools/spark-9/jars/spark-assembly-0.9.0-incubating-hadoop2.3.0-mr1-cdh5.0.0.jar",
"/home/akhld/Downloads/sparkhbasecode/hbase-server-0.96.0-hadoop2.jar",
"/home/akhld/Downloads/sparkhbasecode/hbase-protocol-0.96.0-hadoop2.jar",
"/home/akhld/Downloads/sparkhbasecode/hbase-hadoop2-compat-0.96.0-hadoop2.jar",
"/home/akhld/Downloads/sparkhbasecode/hbase-common-0.96.0-hadoop2.jar",
"/home/akhld/Downloads/sparkhbasecode/hbase-client-0.96.0-hadoop2.jar",
"/home/akhld/Downloads/sparkhbasecode/htrace-core-2.02.jar");
SparkConf spconf = new SparkConf();
spconf.setMaster("local");
spconf.setAppName("SparkHBase");
spconf.setSparkHome("/home/akhld/Desktop/tools/spark-9");
spconf.setJars(jars.toArray(new String[jars.size()]));
spconf.set("spark.executor.memory", "1g");
final JavaSparkContext sc = new JavaSparkContext(spconf);
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.addResource("/home/akhld/Downloads/sparkhbasecode/hbase-site.xml");
conf.set(TableInputFormat.INPUT_TABLE, "blogposts");
NewHadoopRDD<ImmutableBytesWritable, Result> rdd = new NewHadoopRDD<ImmutableBytesWritable, Result>(JavaSparkContext.toSparkContext(sc), TableInputFormat.class, ImmutableBytesWritable.class, Result.class, conf);
JavaRDD<Tuple2<ImmutableBytesWritable, Result>> jrdd = rdd.toJavaRDD();
ForEachFunction f = new ForEachFunction();
JavaRDD<Iterator<String>> retrdd = jrdd.map(f);
 
System.out.println("Count =>" + retrdd.count());
}catch(Exception e){
e.printStackTrace();
System.out.println("Craaaashed : " + e);
}
}
@SuppressWarnings("serial")
    private static class ForEachFunction extends Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{
            public Iterator<String> call(Tuple2<ImmutableBytesWritable, Result> test) {
            Result tmp = (Result) test._2;
List<KeyValue> kvl = tmp.getColumn("post".getBytes(), "title".getBytes());
for(KeyValue kl:kvl){
String sb = new String(kl.getValue());
System.out.println("Value :" + sb);
}
            return null;
            }
     }

}

Hope it helps. 


Thanks
Best Regards


On Fri, Aug 1, 2014 at 4:44 PM, Madabhattula Rajesh Kumar <mrajaforu@gmail.com> wrote:
Hi Akhil,

Thank you for your response. I'm facing below issues.

I'm not able to print the values. Am I missing any thing. Could you please look into this issue.

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(
            conf,
    TableInputFormat.class,
    ImmutableBytesWritable.class,
    Result.class);

    System.out.println(" ROWS COUNT = "+ hBaseRDD.count());
   
  JavaRDD R = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>(){

        public Iterator<String> call(Tuple2<ImmutableBytesWritable, Result> test)
        {
            Result tmp = (Result) test._2;
           
            System.out.println("Inside ");
           
        //    List<KeyValue> kvl = tmp.getColumn("post".getBytes(), "title".getBytes());
            for(KeyValue kl:tmp.raw())

            {
            String sb = new String(kl.getValue());
            System.out.println(sb);
            }
            return null;
        }
    }
    );


Output :

ROWS COUNT = 8

It is not printing "Inside" statement also. I think it is not going into this function.

Could you please help me on this issue.

Thank you for your support and help

Regards,
Rajesh



On Fri, Aug 1, 2014 at 12:17 PM, Akhil Das <akhil@sigmoidanalytics.com> wrote:
You can use a map function like the following and do whatever you want with the Result.

Function<Tuple2<ImmutableBytesWritable, Result>, Iterator<String>>{
            public Iterator<String> call(Tuple2<ImmutableBytesWritable, Result> test) {
            Result tmp = (Result) test._2;
List<KeyValue> kvl = tmp.getColumn("post".getBytes(), "title".getBytes());
for(KeyValue kl:kvl){
String sb = new String(kl.getValue());
System.out.println(sb);
}

 

Thanks
Best Regards


On Thu, Jul 31, 2014 at 10:19 PM, Madabhattula Rajesh Kumar <mrajaforu@gmail.com> wrote:
Hi Team,

I'm using below code to read table from hbase

Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "table1");

JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(
            conf,
    TableInputFormat.class,
    ImmutableBytesWritable.class,
    Result.class);

I got hBaseRDD. I'm not able to read the column values from hBaseRDD.

Could you please let me know, how to read the column values from hBaseRDD?

Thank you for your help.

Regards,
Rajesh