spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Bless <eric.bl...@yahoo.com.INVALID>
Subject Re: Problems getting expected results from hbase_inputformat.py
Date Mon, 10 Aug 2015 18:08:34 GMT
Thank you Gen, the changes to HBaseConverters.scala look to now be returning all column qualifiers,
as follows - 

(u'row1', {u'qualifier': u'a', u'timestamp': u'1438716994027', u'value': u'value1', u'columnFamily':
u'f1', u'type': u'Put', u'row': u'row1'})
(u'row1', {u'qualifier': u'b', u'timestamp': u'1438717004248', u'value': u'value2', u'columnFamily':
u'f1', u'type': u'Put', u'row': u'row1'})
(u'row2', {u'qualifier': u'', u'timestamp': u'1438717014529', u'value': u'value3', u'columnFamily':
u'f1', u'type': u'Put', u'row': u'row2'})
(u'row3', {u'qualifier': u'', u'timestamp': u'1438717022756', u'value': u'value4', u'columnFamily':
u'f1', u'type': u'Put', u'row': u'row3'})
Just to be clear, you refer to "Spark update these two scripts recently.". What two scripts
were you referencing? 


     On Friday, August 7, 2015 7:59 PM, gen tang <gen.tang86@gmail.com> wrote:
   

 Hi,
In fact, Pyspark use org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/)
to transform object of Hbase result to python string.Spark update these two scripts recently.
However, they are not included in the official release of spark. So you are trying to use
this new python script with old jar. 
You can clone the newest code of spark from github and build examples jar. Then you can get
correct result.
CheersGen

On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless <eric.bless@yahoo.com.invalid> wrote:

I’m having some difficulty getting the desired results fromthe Spark Python example hbase_inputformat.py.
I’m running with CDH5.4, hbaseVersion 1.0.0, Spark v 1.3.0 Using Python version 2.6.6 I
followed the example to create a test HBase table. Here’sthe data from the table I created
– hbase(main):001:0> scan 'dev_wx_test'ROW                      COLUMN+CELLrow1                    column=f1:a,
timestamp=1438716994027, value=value1row1                    column=f1:b,
timestamp=1438717004248, value=value2row2                    column=f1:,
timestamp=1438717014529, value=value3row3                    column=f1:,
timestamp=1438717022756, value=value43 row(s) in 0.2620 seconds When either of these statements
are included -“hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n"))”  or “hbase_rdd
= hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items()” the result is
- We only get the following printed; (row1, value2) is notprinted: 
        ((u'row1', u'value1'), 1)        ((u'row2', u'value3'), 1)       
((u'row3', u'value4'), 1) This looks like similar results to the following post Ifound -http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650but
it appears the pythonconverterHBaseResultToStringConverter has been updated since then.
And this problem will be resolved too. 
 When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).mapValues(json.loads)”
is included, the result is – ValueError: No JSON object could be decoded 
************************************************************************************** Here
is more info on this from the log – Traceback (most recent call last):  File"hbase_inputformat.py",
line 87, in <module>    output =hbase_rdd.collect()  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line
701, in collect  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py",line
538, in __call__  File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py",line
300, in get_return_valuepy4j.protocol.Py4JJavaError: An erroroccurred while calling o44.collect.:
org.apache.spark.SparkException: Jobaborted due to stage failure: Task 0 in stage 1.0 failed
4 times, most recentfailure: Lost task 0.3 in stage 1.0 (TID 4, stluhdpddev27.monsanto.com):org.apache.spark.api.python.PythonException:
Traceback (most recent call last):  File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line
101, in main    process()  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line
96, in process   serializer.dump_stream(func(split_index, iterator), outfile)  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py",line
236, in dump_stream    vs =list(itertools.islice(iterator, batch))  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line
1807, in <lambda>  File"/usr/lib64/python2.6/json/__init__.py", line 307, in loads   
return_default_decoder.decode(s)  File"/usr/lib64/python2.6/json/decoder.py", line 319, in
decode    obj, end =self.raw_decode(s, idx=_w(s, 0).end())  File "/usr/lib64/python2.6/json/decoder.py",line
338, in raw_decode    raiseValueError("No JSON object could be decoded")ValueError: No
JSON object could bedecoded        at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)       at
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)       at
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)       at
org.apache.spark.rdd.RDD.iterator(RDD.scala:244)       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)       at
org.apache.spark.scheduler.Task.run(Task.scala:64)       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)       at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)       atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)       at
java.lang.Thread.run(Thread.java:745) Driver stacktrace:       at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)       atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)       atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)       atscala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)       at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)       atorg.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)       at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)       at
scala.Option.foreach(Option.scala:236)       atorg.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)       at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)       atorg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)       at
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Any suggestions would be
most welcome. ****************************************************************Below is the
code we’re running. We did add a few things tothe original example in our attempts to get
it working.  from __future__ import print_function import sysimport json from pyspark import
SparkContextfrom pyspark.conf import SparkConf import os.pathos.environ["SPARK_HOME"] ="/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/"conf
= (SparkConf().setMaster('local').setAppName('a')) if __name__ == "__main__":   if len(sys.argv)
!= 3:      print("""       Usage:hbase_inputformat <host> <table>      
Run with example jar:       ./bin/spark-submit--driver-class-path /path/to/example/jar
\      /path/to/examples/hbase_inputformat.py <host> <table> [<znode>]      
Assumes you have somedata in HBase already, running on <host>, in <table>        
optionally,you can specify parent znode for your hbase cluster - <znode>      
""",file=sys.stderr)       exit(-1) host = sys.argv[1]table = sys.argv[2]sc = SparkContext(appName="HBaseInputFormat") #
Other options for configuring scan behavior are available.More information available at# https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.javaconf
= {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table}if len(sys.argv) >
3:    conf ={"hbase.zookeeper.quorum": host, "zookeeper.znode.parent":sys.argv[3],           "hbase.mapreduce.inputtable":
table}keyConv ="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"valueConv
= "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" hbase_rdd = sc.newAPIHadoopRDD(   "org.apache.hadoop.hbase.mapreduce.TableInputFormat",   "org.apache.hadoop.hbase.io.ImmutableBytesWritable",   "org.apache.hadoop.hbase.client.Result",   
keyConverter=keyConv,    valueConverter=valueConv,    conf=conf)hbase_rdd = hbase_rdd.flatMapValues(lambda
v:v.split("\n")).mapValues(json.loads)# hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n"))#
hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items() output
= hbase_rdd.collect()# output = hbase_rddfor (k, v) in output:    print((k, v)) sc.stop()



  
Mime
View raw message