spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Problems getting expected results from hbase_inputformat.py
Date Mon, 10 Aug 2015 18:11:41 GMT
Eric:
Other than HBaseConverters.scala, examples/src/main/python/hbase_inputformat.py
was also updated.
FYI

On Mon, Aug 10, 2015 at 11:08 AM, Eric Bless <eric.bless@yahoo.com.invalid>
wrote:

> Thank you Gen, the changes to HBaseConverters.scala look to now be
> returning all column qualifiers, as follows -
>
> (u'row1', {u'qualifier': u'a', u'timestamp': u'1438716994027', u'value': u'value1', u'columnFamily':
u'f1', u'type': u'Put', u'row': u'row1'})
> (u'row1', {u'qualifier': u'b', u'timestamp': u'1438717004248', u'value': u'value2', u'columnFamily':
u'f1', u'type': u'Put', u'row': u'row1'})
> (u'row2', {u'qualifier': u'', u'timestamp': u'1438717014529', u'value': u'value3', u'columnFamily':
u'f1', u'type': u'Put', u'row': u'row2'})
> (u'row3', {u'qualifier': u'', u'timestamp': u'1438717022756', u'value': u'value4', u'columnFamily':
u'f1', u'type': u'Put', u'row': u'row3'})
>
>
> Just to be clear, you refer to "Spark update these two scripts
> recently.". What two scripts were you referencing?
>
>
>
> On Friday, August 7, 2015 7:59 PM, gen tang <gen.tang86@gmail.com> wrote:
>
>
> Hi,
>
> In fact, Pyspark use
> org.apache.spark.examples.pythonconverters(./examples/src/main/scala/org/apache/spark/pythonconverters/)
> to transform object of Hbase result to python string.
> Spark update these two scripts recently. However, they are not included in
> the official release of spark. So you are trying to use this new python
> script with old jar.
>
> You can clone the newest code of spark from github and build examples jar.
> Then you can get correct result.
>
> Cheers
> Gen
>
>
> On Sat, Aug 8, 2015 at 5:03 AM, Eric Bless <eric.bless@yahoo.com.invalid>
> wrote:
>
> I’m having some difficulty getting the desired results from the Spark
> Python example hbase_inputformat.py. I’m running with CDH5.4, hbase Version
> 1.0.0, Spark v 1.3.0 Using Python version 2.6.6
>
> I followed the example to create a test HBase table. Here’s the data from
> the table I created –
> hbase(main):001:0> scan 'dev_wx_test'
> ROW                       COLUMN+CELL
> row1                     column=f1:a, timestamp=1438716994027, value=value1
> row1                     column=f1:b, timestamp=1438717004248, value=value2
> row2                     column=f1:, timestamp=1438717014529, value=value3
> row3                     column=f1:, timestamp=1438717022756, value=value4
> 3 row(s) in 0.2620 seconds
>
> When either of these statements are included -
> “hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n"))”  or
> “hbase_rdd = hbase_rdd.flatMapValues(lambda v:
> v.split("\n")).countByValue().items()” the result is -
> We only get the following printed; (row1, value2) is not printed:
>         ((u'row1', u'value1'), 1)         ((u'row2', u'value3'), 1)
> ((u'row3', u'value4'), 1)
>  This looks like similar results to the following post I found -
>
> http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-get-column-family-and-qualifier-names-from-hbase-table-td18613.html#a18650
> but it appears the pythonconverter HBaseResultToStringConverter has been
> updated since then.
>
> And this problem will be resolved too.
>
>
>
> When the statement
> “hbase_rdd = hbase_rdd.flatMapValues(lambda v:
> v.split("\n")).mapValues(json.loads)” is included, the result is –
> ValueError: No JSON object could be decoded
>
>
> **************************************************************************************
> Here is more info on this from the log –
> Traceback (most recent call last):
>   File "hbase_inputformat.py", line 87, in <module>
>     output = hbase_rdd.collect()
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",
> line 701, in collect
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o44.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 1.0 (TID 4, stluhdpddev27.monsanto.com):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",
> line 101, in main
>     process()
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/worker.py",
> line 96, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/serializers.py",
> line 236, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/rdd.py",
> line 1807, in <lambda>
>   File "/usr/lib64/python2.6/json/__init__.py", line 307, in loads
>     return _default_decoder.decode(s)
>   File "/usr/lib64/python2.6/json/decoder.py", line 319, in decode
>     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
>   File "/usr/lib64/python2.6/json/decoder.py", line 338, in raw_decode
>     raise ValueError("No JSON object could be decoded")
> ValueError: No JSON object could be decoded
>
>         at
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:135)
>         at
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:176)
>         at
> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:94)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>         at org.apache.spark.scheduler.DAGScheduler.org
> <http://org.apache.spark.scheduler.dagscheduler.org/>
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>         at scala.Option.foreach(Option.scala:236)
>         at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Any suggestions would be most welcome.
>
> ****************************************************************
> Below is the code we’re running. We did add a few things to the original
> example in our attempts to get it working.
>
> from __future__ import print_function
>
> import sys
> import json
>
> from pyspark import SparkContext
> from pyspark.conf import SparkConf
>
> import os.path
> os.environ["SPARK_HOME"] =
> "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/"
> conf = (SparkConf().setMaster('local').setAppName('a'))
>
> if __name__ == "__main__":
>    if len(sys.argv) != 3:
>        print("""
>        Usage: hbase_inputformat <host> <table>
>        Run with example jar:
>        ./bin/spark-submit --driver-class-path /path/to/example/jar \
>        /path/to/examples/hbase_inputformat.py <host> <table> [<znode>]
>        Assumes you have some data in HBase already, running on <host>, in
> <table>
>          optionally, you can specify parent znode for your hbase cluster -
> <znode>
>        """, file=sys.stderr)
>        exit(-1)
>
> host = sys.argv[1]
> table = sys.argv[2]
> sc = SparkContext(appName="HBaseInputFormat")
>
> # Other options for configuring scan behavior are available. More
> information available at
> #
> https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
> conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable":
> table}
> if len(sys.argv) > 3:
>     conf = {"hbase.zookeeper.quorum": host, "zookeeper.znode.parent":
> sys.argv[3],
>             "hbase.mapreduce.inputtable": table}
> keyConv =
> "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
> valueConv =
> "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
>
> hbase_rdd = sc.newAPIHadoopRDD(
>     "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
>     "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
>     "org.apache.hadoop.hbase.client.Result",
>     keyConverter=keyConv,
>     valueConverter=valueConv,
>     conf=conf)
> hbase_rdd = hbase_rdd.flatMapValues(lambda v:
> v.split("\n")).mapValues(json.loads)
> # hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n"))
> # hbase_rdd = hbase_rdd.flatMapValues(lambda v:
> v.split("\n")).countByValue().items()
>
> output = hbase_rdd.collect()
> # output = hbase_rdd
> for (k, v) in output:
>     print((k, v))
>
> sc.stop()
>
>
>
>
>

Mime
View raw message