spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jim Green <openkbi...@gmail.com>
Subject Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
Date Tue, 27 Jan 2015 20:44:39 GMT
I used below code, and it still failed with the same error.
Anyone has experience on bulk loading using scala?
Thanks.

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat

val conf = HBaseConfiguration.create()
val tableName = "t1"
val table = new HTable(conf, tableName)

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val put: Put = new Put(Bytes.toBytes(x))
    put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
    (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
})
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx13", classOf[ImmutableBytesWritable],
classOf[KeyValue], classOf[HFileOutputFormat], conf)



On Tue, Jan 27, 2015 at 12:17 PM, Jim Green <openkbinfo@gmail.com> wrote:

> Thanks Ted. Could you give me a simple example to load one row data in
> hbase? How should I generate the KeyValue?
> I tried multiple times, and still can not figure it out.
>
> On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>
>> Here is the method signature used by HFileOutputFormat :
>>       public void write(ImmutableBytesWritable row, KeyValue kv)
>>
>> Meaning, KeyValue is expected, not Put.
>>
>> On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <openkbinfo@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I need some help on writing a scala to bulk load some data into hbase.
>>> *Env:*
>>> hbase 0.94
>>> spark-1.0.2
>>>
>>> I am trying below code to just bulk load some data into hbase table “t1”.
>>>
>>> import org.apache.spark._
>>> import org.apache.spark.rdd.NewHadoopRDD
>>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
>>> import org.apache.hadoop.hbase.client.HBaseAdmin
>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>> import org.apache.hadoop.fs.Path;
>>> import org.apache.hadoop.hbase.HColumnDescriptor
>>> import org.apache.hadoop.hbase.util.Bytes
>>> import org.apache.hadoop.hbase.client.Put;
>>> import org.apache.hadoop.hbase.client.HTable;
>>> import org.apache.hadoop.hbase.mapred.TableOutputFormat
>>> import org.apache.hadoop.mapred.JobConf
>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>>> import org.apache.hadoop.mapreduce.Job
>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
>>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
>>> import org.apache.hadoop.hbase.KeyValue
>>> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
>>>
>>> val conf = HBaseConfiguration.create()
>>> val tableName = "t1"
>>> val table = new HTable(conf, tableName)
>>>
>>> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
>>> val job = Job.getInstance(conf)
>>> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
>>> job.setMapOutputValueClass (classOf[KeyValue])
>>> HFileOutputFormat.configureIncrementalLoad (job, table)
>>>
>>> val num = sc.parallelize(1 to 10)
>>> val rdd = num.map(x=>{
>>>     val put: Put = new Put(Bytes.toBytes(x))
>>>     put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
>>>     (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
>>> })
>>> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8",
>>> classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat],
>>> conf)
>>>
>>>
>>> However I am allways getting below error:
>>> java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put cannot
>>> be cast to org.apache.hadoop.hbase.KeyValue
>>> at
>>> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
>>> at
>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:51)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> My questions are:
>>> 1. Do we have a sample code to do bulk load into hbase directly?
>>> Can we use saveAsNewAPIHadoopFile?
>>>
>>> 2. Is there any other way to do this?
>>> For example, firstly write a hfile on hdfs, and then use hbase command
>>> to bulk load?
>>> Any sample code using scala?
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>> --
>>> Thanks,
>>> www.openkb.info
>>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>>
>>
>>
>
>
> --
> Thanks,
> www.openkb.info
> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>



-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)

Mime
View raw message