spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: Error when mapping a schema RDD when converting lists
Date Mon, 08 Dec 2014 18:25:19 GMT
This is fixed in 1.2. Also, in 1.2+ you could call row.asDict() to
convert the Row object into dict.

On Mon, Dec 8, 2014 at 6:38 AM, sahanbull <sahan@skimlinks.com> wrote:
> Hi Guys,
>
> I used applySchema to store a set of nested dictionaries and lists in a
> parquet file.
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-td20228.html#a20461
>
> It was successful and i could successfully load the data as well.Now im
> trying to convert this SchemaRDD to a RDD of dictionaries so that I can run
> some reduces on them.
>
> The schema of my RDD is as follows:
>  |-- field1: string (nullable = true)
>  |-- field2: integer (nullable = true)
>  |-- field3: map (nullable = true)
>  |    |-- key: integer
>  |    |-- value: integer (valueContainsNull = true)
>  |-- field4: map (nullable = true)
>  |    |-- key: string
>  |    |-- value: integer (valueContainsNull = true)
>  |-- field5: array (nullable = true)
>  |    |-- element: string (containsNull = true)
>  |-- field6: array (nullable = true)
>  |    |-- element: struct (containsNull = true)
>  |    |    |-- field61: string (nullable = true)
>  |    |    |-- field62: string (nullable = true)
>  |    |    |-- field63: integer (nullable = true)
>
> And Im using the following mapper to map these fields to a RDD that I can
> reduce later.
>
> def generateRecords(line):
>         # input : the row stored in parquet file
>         # output : a python dictionary with all the key value pairs
>         field1 = line.field1
>         summary = {}
>         summary['field2'] = line.field2
>         summary['field3'] = line.field3
>         summary['field4'] = line.field4
>         summary['field5'] = line.field5
>         summary['field6'] = line.field6
>         return (guid,summary)
>
> profiles = sqc.parquetFile(path)
> profileRecords = profiles.map(lambda line: generateRecords(line))
>
> This code works perfectly well when field6 is not mapped. IE when you
> comment out the line that maps field6 in generateRecords. the RDD gets
> generated perfoectly. Even field 5 gets mapped. The key difference between
> field 5 and 6 are, field5 is a list of strings and field 6 is a list of
> tupes in the forma (String, String, Int) . But when you try to map field6,
> it throws :
>
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/root/spark/python/pyspark/rdd.py", line 847, in count
>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>   File "/root/spark/python/pyspark/rdd.py", line 838, in sum
>     return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
>   File "/root/spark/python/pyspark/rdd.py", line 759, in reduce
>     vals = self.mapPartitions(func).collect()
>   File "/root/spark/python/pyspark/rdd.py", line 723, in collect
>     bytesInJava = self._jrdd.collect().iterator()
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o88.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 32
> in stage 3.0 failed 4 times, most recent failure: Lost task 32.3 in stage
> 3.0 (TID 1829, ip-172-31-18-36.ec2.internal):
> org.apache.spark.api.python.PythonException: Traceback (most recent call
> last):
>   File "/root/spark/python/pyspark/worker.py", line 79, in main
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
>     self.serializer.dump_stream(self._batched(iterator), stream)
>   File "/root/spark/python/pyspark/serializers.py", line 128, in dump_stream
>     self._write_with_length(obj, stream)
>   File "/root/spark/python/pyspark/serializers.py", line 138, in
> _write_with_length
>     serialized = self.dumps(obj)
>   File "/root/spark/python/pyspark/serializers.py", line 356, in dumps
>     return cPickle.dumps(obj, 2)
> PicklingError: Can't pickle <class 'pyspark.sql.List'>: attribute lookup
> pyspark.sql.List failed
>
> Can someone help me to understand what is going wrong here.
>
> Many thanks
> SahanB
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message