spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Bless <eric.bl...@yahoo.com.INVALID>
Subject Problems getting expected results from hbase_inputformat.py
Date Fri, 07 Aug 2015 21:03:38 GMT
I’m having some difficulty getting the desired results fromthe Spark Python example hbase_inputformat.py.
I’m running with CDH5.4, hbaseVersion 1.0.0, Spark v 1.3.0 Using Python version 2.6.6
 
I followed the example to create a test HBase table. Here’sthe data from the table I created
– hbase(main):001:0> scan 'dev_wx_test'ROW                      COLUMN+CELLrow1                    column=f1:a,
timestamp=1438716994027, value=value1row1                    column=f1:b,
timestamp=1438717004248, value=value2row2                    column=f1:,
timestamp=1438717014529, value=value3row3                    column=f1:,
timestamp=1438717022756, value=value43 row(s) in 0.2620 seconds
 
When either of these statements are included -“hbase_rdd = hbase_rdd.flatMapValues(lambda
v:v.split("\n"))”  or “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items()”
the result is - We only get the following printed; (row1, value2) is notprinted: 
        ((u'row1', u'value1'), 1)        ((u'row2', u'value3'), 1)       
((u'row3', u'value4'), 1)
 
This looks like similar results to the following post Ifound -http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650but
it appears the pythonconverterHBaseResultToStringConverter has been updated since then.
 
When the statement “hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).mapValues(json.loads)”
is included, the result is – ValueError: No JSON object could be decoded 
************************************************************************************** Here
is more info on this from the log – Traceback (most recent call last):

  File"hbase_inputformat.py", line 87, in <module>

    output =hbase_rdd.collect()

  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line
701, in collect

  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py",line
538, in __call__

  File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py",line
300, in get_return_value

py4j.protocol.Py4JJavaError: An erroroccurred while calling o44.collect.

: org.apache.spark.SparkException: Jobaborted due to stage failure: Task 0 in stage 1.0 failed
4 times, most recentfailure: Lost task 0.3 in stage 1.0 (TID 4, stluhdpddev27.monsanto.com):org.apache.spark.api.python.PythonException:
Traceback (most recent call last):

  File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line
101, in main

    process()

  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",line
96, in process

   serializer.dump_stream(func(split_index, iterator), outfile)

  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py",line
236, in dump_stream

    vs =list(itertools.islice(iterator, batch))

  File"/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",line
1807, in <lambda>

  File"/usr/lib64/python2.6/json/__init__.py", line 307, in loads

    return_default_decoder.decode(s)

  File"/usr/lib64/python2.6/json/decoder.py", line 319, in decode

    obj, end =self.raw_decode(s, idx=_w(s, 0).end())

  File "/usr/lib64/python2.6/json/decoder.py",line 338, in raw_decode

    raiseValueError("No JSON object could be decoded")

ValueError: No JSON object could bedecoded


 
       at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)

       at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)

       at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)

       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)

       at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)

       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)

       at org.apache.spark.scheduler.Task.run(Task.scala:64)

       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

       atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

       at java.lang.Thread.run(Thread.java:745)


 
Driver stacktrace:

       at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)

       atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

       atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)

       atscala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)

       atorg.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

       at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

       at scala.Option.foreach(Option.scala:236)

       atorg.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)

       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)

       atorg.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)


 
Any suggestions would be most welcome.
 
****************************************************************Below is the code we’re
running. We did add a few things tothe original example in our attempts to get it working.

 
from __future__ import print_function
 
import sysimport json
 
from pyspark import SparkContextfrom pyspark.conf import SparkConf
 
import os.pathos.environ["SPARK_HOME"] ="/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/"conf
= (SparkConf().setMaster('local').setAppName('a'))
 
if __name__ == "__main__":   if len(sys.argv) != 3:      print("""       Usage:hbase_inputformat
<host> <table>       Run with example jar:       ./bin/spark-submit--driver-class-path
/path/to/example/jar \      /path/to/examples/hbase_inputformat.py <host> <table>
[<znode>]       Assumes you have somedata in HBase already, running on <host>,
in <table>         optionally,you can specify parent znode for your hbase cluster
- <znode>       """,file=sys.stderr)       exit(-1)
 
host = sys.argv[1]table = sys.argv[2]sc = SparkContext(appName="HBaseInputFormat")
 
# Other options for configuring scan behavior are available.More information available at#
https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.javaconf
= {"hbase.zookeeper.quorum": host,"hbase.mapreduce.inputtable": table}if len(sys.argv) >
3:    conf ={"hbase.zookeeper.quorum": host, "zookeeper.znode.parent":sys.argv[3],           "hbase.mapreduce.inputtable":
table}keyConv ="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"valueConv
= "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
 
hbase_rdd = sc.newAPIHadoopRDD(   "org.apache.hadoop.hbase.mapreduce.TableInputFormat",   "org.apache.hadoop.hbase.io.ImmutableBytesWritable",   "org.apache.hadoop.hbase.client.Result",   
keyConverter=keyConv,    valueConverter=valueConv,    conf=conf)hbase_rdd = hbase_rdd.flatMapValues(lambda
v:v.split("\n")).mapValues(json.loads)# hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n"))#
hbase_rdd = hbase_rdd.flatMapValues(lambda v:v.split("\n")).countByValue().items()
 
output = hbase_rdd.collect()# output = hbase_rddfor (k, v) in output:    print((k, v))
 
sc.stop()
Mime
View raw message