spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gen tang <gen.tan...@gmail.com>
Subject Re: Problems getting expected results from hbase_inputformat.py
Date Sat, 08 Aug 2015 00:58:16 GMT
Hi,

In fact, Pyspark use
org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/)
to transform object of Hbase result to python string.
Spark update these two scripts recently. However, they are not included in
the official release of spark. So you are trying to use this new python
script with old jar.

You can clone the newest code of spark from github and build examples jar.
Then you can get correct result.

Cheers
Gen


On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless <eric.bless@yahoo.com.invalid>
wrote:

> I’m having some difficulty getting the desired results from the Spark
> Python example hbase_inputformat.py. I’m running with CDH5.4, hbase Version
> 1.0.0, Spark v 1.3.0 Using Python version 2.6.6
>
> I followed the example to create a test HBase table. Here’s the data from
> the table I created –
> hbase(main):001:0> scan 'dev_wx_test'
> ROW                       COLUMN+CELL
> row1                     column=f1:a, timestamp=1438716994027, value=value1
> row1                     column=f1:b, timestamp=1438717004248, value=value2
> row2                     column=f1:, timestamp=1438717014529, value=value3
> row3                     column=f1:, timestamp=1438717022756, value=value4
> 3 row(s) in 0.2620 seconds
>
> When either of these statements are included -
> “hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n"))”  or
> “hbase_rdd = hbase_rdd.flatMapValues(lambda v:
> v.split("\n")).countByValue().items()” the result is -
> We only get the following printed; (row1, value2) is not printed:
>         ((u'row1', u'value1'), 1)         ((u'row2', u'value3'), 1)
> ((u'row3', u'value4'), 1)
>  This looks like similar results to the following post I found -
>
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650
> but it appears the pythonconverter HBaseResultToStringConverter has been
> updated since then.
>
And this problem will be resolved too.


>
> When the statement
> “hbase_rdd = hbase_rdd.flatMapValues(lambda v:
> v.split("\n")).mapValues(json.loads)” is included, the result is –
> ValueError: No JSON object could be decoded
>
>
> **************************************************************************************
> Here is more info on this from the log –
> Traceback (most recent call last):
>   File "hbase_inputformat.py", line 87, in <module>
>     output = hbase_rdd.collect()
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",
> line 701, in collect
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o44.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 1.0 (TID 4, stluhdpddev27.monsanto.com):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",
> line 101, in main
>     process()
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",
> line 96, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py",
> line 236, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",
> line 1807, in <lambda>
>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>     return _default_decoder.decode(s)
>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>   File "/usr/lib64/python2.6/json/decoder.py", line 338, in raw_decode
>     raise ValueError("No JSON object could be decoded")
> ValueError: No JSON object could be decoded
>
>         at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
>         at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
>         at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>         at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>         at scala.Option.foreach(Option.scala:236)
>         at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Any suggestions would be most welcome.
>
> ****************************************************************
> Below is the code we’re running. We did add a few things to the original
> example in our attempts to get it working.
>
> from __future__ import print_function
>
> import sys
> import json
>
> from pyspark import SparkContext
> from pyspark.conf import SparkConf
>
> import os.path
> os.environ["SPARK_HOME"] =
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/"
> conf = (SparkConf().setMaster('local').setAppName('a'))
>
> if __name__ == "__main__":
>    if len(sys.argv) != 3:
>        print("""
>        Usage: hbase_inputformat <host> <table>
>        Run with example jar:
>        ./bin/spark-submit --driver-class-path /path/to/example/jar \
>        /path/to/examples/hbase_inputformat.py <host> <table> [<znode>]
>        Assumes you have some data in HBase already, running on <host>, in
> <table>
>          optionally, you can specify parent znode for your hbase cluster -
> <znode>
>        """, file=sys.stderr)
>        exit(-1)
>
> host = sys.argv[1]
> table = sys.argv[2]
> sc = SparkContext(appName="HBaseInputFormat")
>
> # Other options for configuring scan behavior are available. More
> information available at
> #
> https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
> conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable":
> table}
> if len(sys.argv) > 3:
>     conf = {"hbase.zookeeper.quorum": host, "zookeeper.znode.parent":
> sys.argv[3],
>             "hbase.mapreduce.inputtable": table}
> keyConv =
> "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
> valueConv =
> "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
>
> hbase_rdd = sc.newAPIHadoopRDD(
>     "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
>     "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
>     "org.apache.hadoop.hbase.client.Result",
>     keyConverter=keyConv,
>     valueConverter=valueConv,
>     conf=conf)
> hbase_rdd = hbase_rdd.flatMapValues(lambda v:
> v.split("\n")).mapValues(json.loads)
> # hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n"))
> # hbase_rdd = hbase_rdd.flatMapValues(lambda v:
> v.split("\n")).countByValue().items()
>
> output = hbase_rdd.collect()
> # output = hbase_rdd
> for (k, v) in output:
>     print((k, v))
>
> sc.stop()
>

Mime
View raw message