spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: Using sparkSQL to convert a collection of python dictionary of dictionaries to schma RDD
Date Thu, 04 Dec 2014 19:14:26 GMT
Which version of Spark are you using? inferSchema() is improved to
support empty dict in 1.2+, could you try the 1.2-RC1?

Also, you can use applySchema():

from pyspark.sql import *

fields = [StructField('field1', IntegerType(), True),
StructField('field2', StringType(), True),
             StructField('field3', MapType(StringType(), IntegerType(), True))]
schema = StructType(fields)
rdd2 = rdd.map(lambda x: (x['field1'], x['field2'], x['field3']))
sqlContext.applySchema(rdd2, schema)

PS: the above code is not testsed.

Davies

On Thu, Dec 4, 2014 at 4:22 AM, sahanbull <sahan@skimlinks.com> wrote:
> Hi Davies,
>
> Thanks for the reply
>
> The problem is I have empty dictionaries in my field3 as well. It gives me
> an error :
>
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/root/spark/python/pyspark/sql.py", line 1042, in inferSchema
>     schema = _infer_schema(first)
>   File "/root/spark/python/pyspark/sql.py", line 495, in _infer_schema
>     fields = [StructField(k, _infer_type(v), True) for k, v in items]
>   File "/root/spark/python/pyspark/sql.py", line 460, in _infer_type
>     raise ValueError("Can not infer type for empty dict")
> ValueError: Can not infer type for empty dict
>
> When I remove the empty dictionary items from each record. That is, when
> mapping to the main dictionary, if field3 is an empty ditc, i do not include
> that hence the record converts from
>
> {
>     field1:5,
>     field2: 'string',
>     field3: {}
> }
>
> to >
>
> {
>     field1:5,
>     field2: 'string',
> }
>
> At this point, I get :
>
>  ERROR TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/root/spark/python/pyspark/sql.py", line 1044, in inferSchema
>     return self.applySchema(rdd, schema)
>   File "/root/spark/python/pyspark/sql.py", line 1117, in applySchema
>     rows = rdd.take(10)
>   File "/root/spark/python/pyspark/rdd.py", line 1153, in take
>     res = self.context.runJob(self, takeUpToNumLeft, p, True)
>   File "/root/spark/python/pyspark/context.py", line 770, in runJob
>     it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
> javaPartitions, allowLocal)
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 14.0 (TID 22628, ip-172-31-30-89.ec2.internal):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "/root/spark/python/pyspark/worker.py", line 79, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
>     self.serializer.dump_stream(self._batched(iterator), stream)
>   File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream
>     for obj in iterator:
>   File "/root/spark/python/pyspark/serializers.py", line 185, in _batched
>     for item in iterator:
>   File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft
>     yield next(iterator)
>   File "/root/spark/python/pyspark/sql.py", line 552, in _drop_schema
>     yield converter(i)
>   File "/root/spark/python/pyspark/sql.py", line 540, in nested_conv
>     return tuple(f(v) for f, v in zip(convs, conv(row)))
>   File "/root/spark/python/pyspark/sql.py", line 540, in <genexpr>
>     return tuple(f(v) for f, v in zip(convs, conv(row)))
>   File "/root/spark/python/pyspark/sql.py", line 508, in <lambda>
>     return lambda row: dict((k, conv(v)) for k, v in row.iteritems())
> AttributeError: 'int' object has no attribute 'iteritems'
>
> I am clueless what to do about this. Hope you can help :)
>
> Many thanks
> SahanB
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-tp20228p20364.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message