spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: numpy arrays and spark sql
Date Tue, 02 Dec 2014 05:51:56 GMT
applySchema() only accept RDD of Row/list/tuple, it does not work with
numpy.array.

After applySchema(), the Python RDD will be pickled and unpickled in
JVM, so you will not have any benefit by using numpy.array.

It will work if you convert ndarray into list:

schemaRDD = sqlContext.applySchema(rdd.map(list), schema)

On Mon, Dec 1, 2014 at 6:33 PM, Joseph Winston <josephwinston@me.com> wrote:
> This works as expected in the 1.1 branch:
>
> from pyspark.sql import *
>
> rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)]
>
> # define the schema
> schemaString = "value1 value2 value3 value4 value5 value6 value7 value8 value9 value10"
> fields = [StructField(field_name, IntegerType(), True) for field_name in schemaString.split()]
> schema = StructType(fields)
>
> # Apply the schema to the RDD.
> schemaRDD = sqlContext.applySchema(rdd, schema)
>
> # Register the table
> schemaRDD.registerTempTable("slice")
>
> # SQL can be run over SchemaRDDs that have been registered as a table.
> results = sqlContext.sql("SELECT value1 FROM slice")
>
> # The results of SQL queries are RDDs and support all the normal RDD operations.
> print results.collect()
>
> However changing the rdd to use a numpy array fails:
>
> import np as np
> rdd = sc.parallelize(np.arange(20).reshape(2, 10))
>
> # define the schema
> schemaString = "value1 value2 value3 value4 value5 value6 value7 value8 value9 value10"
> fields = [StructField(field_name, np.ndarray, True) for field_name in schemaString.split()]
> schema = StructType(fields)
>
> # Apply the schema to the RDD.
> schemaRDD = sqlContext.applySchema(rdd, schema)
>
> The error is:
> Traceback (most recent call last):
>   File "<stdin>", line 2, in <module>
>   File "/Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py", line 1119, in applySchema
>     _verify_type(row, schema)
>   File "/Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py", line 735, in _verify_type
>     % (dataType, type(obj)))
> TypeError: StructType(List(StructField(value1,<type 'numpy.ndarray'>,true),StructField(value2,<type
'numpy.ndarray'>,true),StructField(value3,<type 'numpy.ndarray'>,true),StructField(value4,<type
'numpy.ndarray'>,true),StructField(value5,<type 'numpy.ndarray'>,true),StructField(value6,<type
'numpy.ndarray'>,true),StructField(value7,<type 'numpy.ndarray'>,true),StructField(value8,<type
'numpy.ndarray'>,true),StructField(value9,<type 'numpy.ndarray'>,true),StructField(value10,<type
'numpy.ndarray'>,true))) can not accept abject in type <type 'numpy.ndarray'>
>
> I’ve tried np.int_ and np.int32 and they fail too.  What type should I use to make
a numpy arrays work?
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message