spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jim Green <openkbi...@gmail.com>
Subject Re: Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
Date Thu, 29 Jan 2015 02:33:52 GMT
Thanks for all respnding.

Finally I figured out the way to use bulk load to hbase using scala on
spark.
The sample code is here which others can refer in future:

http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html

Thanks!

On Tue, Jan 27, 2015 at 6:27 PM, Jim Green <openkbinfo@gmail.com> wrote:

> Thanks Sun.
> My understanding is , savaAsNewHadoopFile is to save as Hfile on hdfs.
>
> Is it doable to use saveAsNewAPIHadoopDataset to directly loading to hbase?
> If so, is there any sample code for that?
>
> Thanks!
>
> On Tue, Jan 27, 2015 at 6:07 PM, fightfate@163.com <fightfate@163.com>
> wrote:
>
>> Hi, Jim
>> Your generated rdd should be the type of RDD[ImmutableBytesWritable,
>> KeyValue], while your current type goes to RDD[ImmutableBytesWritable, Put].
>> You can go like this and the result should be type of
>> RDD[ImmutableBytesWritable, KeyValue] that can be savaAsNewHadoopFile
>>      val result = num.flatMap ( v=> {
>>           keyValueBuilder(v).map(v => (v,1))
>>        }).map(v => ( new ImmutableBytesWritable(v._1.getBuffer(),
>> v._1.getRowOffset(), v._1.getRowLength()),v._1))
>>
>> where keyValueBuider would be defined as RDD[T] => RDD[List[KeyValue]],
>> for example, you can go:
>>            val keyValueBuilder = (data: (Int, Int))  =>{
>>                val rowkeyBytes = Bytes.toBytes(data._1)
>>                val colfam = Bytes.toBytes("cf")
>>                val qual = Bytes.toBytes("c1")
>>                val value = Bytes.toBytes("val_xxx")
>>
>>                val kv = new KeyValue(rowkeyBytes,colfam,qual,value)
>>                List(kv)
>>           }
>>
>>
>> Thanks,
>> Sun
>> ------------------------------
>> fightfate@163.com
>>
>>
>> *From:* Jim Green <openkbinfo@gmail.com>
>> *Date:* 2015-01-28 04:44
>> *To:* Ted Yu <yuzhihong@gmail.com>
>> *CC:* user <user@spark.apache.org>
>> *Subject:* Re: Bulk loading into hbase using saveAsNewAPIHadoopFile
>> I used below code, and it still failed with the same error.
>> Anyone has experience on bulk loading using scala?
>> Thanks.
>>
>> import org.apache.spark._
>> import org.apache.spark.rdd.NewHadoopRDD
>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
>> import org.apache.hadoop.hbase.client.HBaseAdmin
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.hbase.HColumnDescriptor
>> import org.apache.hadoop.hbase.util.Bytes
>> import org.apache.hadoop.hbase.client.Put;
>> import org.apache.hadoop.hbase.client.HTable;
>> import org.apache.hadoop.hbase.mapred.TableOutputFormat
>> import org.apache.hadoop.mapred.JobConf
>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>> import org.apache.hadoop.mapreduce.Job
>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
>> import org.apache.hadoop.hbase.KeyValue
>> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
>>
>> val conf = HBaseConfiguration.create()
>> val tableName = "t1"
>> val table = new HTable(conf, tableName)
>>
>> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
>> val job = Job.getInstance(conf)
>> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
>> job.setMapOutputValueClass (classOf[KeyValue])
>> HFileOutputFormat.configureIncrementalLoad (job, table)
>>
>> val num = sc.parallelize(1 to 10)
>> val rdd = num.map(x=>{
>>     val put: Put = new Put(Bytes.toBytes(x))
>>     put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
>>     (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
>> })
>> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx13",
>> classOf[ImmutableBytesWritable], classOf[KeyValue],
>> classOf[HFileOutputFormat], conf)
>>
>>
>>
>> On Tue, Jan 27, 2015 at 12:17 PM, Jim Green <openkbinfo@gmail.com> wrote:
>>
>>> Thanks Ted. Could you give me a simple example to load one row data in
>>> hbase? How should I generate the KeyValue?
>>> I tried multiple times, and still can not figure it out.
>>>
>>> On Tue, Jan 27, 2015 at 12:10 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>>
>>>> Here is the method signature used by HFileOutputFormat :
>>>>       public void write(ImmutableBytesWritable row, KeyValue kv)
>>>>
>>>> Meaning, KeyValue is expected, not Put.
>>>>
>>>> On Tue, Jan 27, 2015 at 10:54 AM, Jim Green <openkbinfo@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I need some help on writing a scala to bulk load some data into hbase.
>>>>> *Env:*
>>>>> hbase 0.94
>>>>> spark-1.0.2
>>>>>
>>>>> I am trying below code to just bulk load some data into hbase table
>>>>> “t1”.
>>>>>
>>>>> import org.apache.spark._
>>>>> import org.apache.spark.rdd.NewHadoopRDD
>>>>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
>>>>> import org.apache.hadoop.hbase.client.HBaseAdmin
>>>>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>>>>> import org.apache.hadoop.fs.Path;
>>>>> import org.apache.hadoop.hbase.HColumnDescriptor
>>>>> import org.apache.hadoop.hbase.util.Bytes
>>>>> import org.apache.hadoop.hbase.client.Put;
>>>>> import org.apache.hadoop.hbase.client.HTable;
>>>>> import org.apache.hadoop.hbase.mapred.TableOutputFormat
>>>>> import org.apache.hadoop.mapred.JobConf
>>>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>>>>> import org.apache.hadoop.mapreduce.Job
>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
>>>>> import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
>>>>> import org.apache.hadoop.hbase.KeyValue
>>>>> import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
>>>>>
>>>>> val conf = HBaseConfiguration.create()
>>>>> val tableName = "t1"
>>>>> val table = new HTable(conf, tableName)
>>>>>
>>>>> conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
>>>>> val job = Job.getInstance(conf)
>>>>> job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
>>>>> job.setMapOutputValueClass (classOf[KeyValue])
>>>>> HFileOutputFormat.configureIncrementalLoad (job, table)
>>>>>
>>>>> val num = sc.parallelize(1 to 10)
>>>>> val rdd = num.map(x=>{
>>>>>     val put: Put = new Put(Bytes.toBytes(x))
>>>>>     put.add("cf".getBytes(), "c1".getBytes(), ("value_xxx").getBytes())
>>>>>     (new ImmutableBytesWritable(Bytes.toBytes(x)), put)
>>>>> })
>>>>> rdd.saveAsNewAPIHadoopFile("/tmp/xxxx8",
>>>>> classOf[ImmutableBytesWritable], classOf[Put], classOf[HFileOutputFormat],
>>>>> conf)
>>>>>
>>>>>
>>>>> However I am allways getting below error:
>>>>> java.lang.ClassCastException: org.apache.hadoop.hbase.client.Put
>>>>> cannot be cast to org.apache.hadoop.hbase.KeyValue
>>>>> at
>>>>> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:161)
>>>>> at
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:718)
>>>>> at
>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:699)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:51)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> My questions are:
>>>>> 1. Do we have a sample code to do bulk load into hbase directly?
>>>>> Can we use saveAsNewAPIHadoopFile?
>>>>>
>>>>> 2. Is there any other way to do this?
>>>>> For example, firstly write a hfile on hdfs, and then use hbase command
>>>>> to bulk load?
>>>>> Any sample code using scala?
>>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> www.openkb.info
>>>>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> www.openkb.info
>>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>>
>>
>>
>>
>> --
>> Thanks,
>> www.openkb.info
>> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>>
>>
>
>
> --
> Thanks,
> www.openkb.info
> (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)
>



-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)

Mime
View raw message