spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 林家銘 <robin890...@gmail.com>
Subject pyspark pickle error when using itertools.groupby
Date Fri, 05 Aug 2016 05:31:09 GMT
Hi
I wrote a map function to aggregate data in a partition, and this function
using  itertools.groupby for more than twice, then there comes the pickle
error .

Here is what I do

===Driver Code===
pair_count = df.mapPartitions(lambda iterable: pair_func_cnt(iterable))
pair_count.collection()

===Map Function ===
def pair_func_cnt(iterable):
    from itertools import groupby

    ls = [[1,2,3],[1,2,5],[1,3,5],[2,4,6]]
    grp1 = [(k,g) for k,g in groupby(ls, lambda e: e[0])]
    grp2 = [(k,g) for k,g in groupby(grp1, lambda e: e[1])]
    return iter(grp2)

===Error Message===

Caused by: org.apache.spark.api.python.PythonException: Traceback
(most recent call last):
  File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py",
line 111, in main
    process()
  File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/worker.py",
line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py",
line 267, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "/opt/zeppelin-0.6.0-bin-netinst/interpreter/spark/pyspark/pyspark.zip/pyspark/serializers.py",
line 415, in dumps
    return pickle.dumps(obj, protocol)PicklingError: Can't pickle
<type 'itertools._grouper'>: attribute lookup itertools._grouper
failed
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

Mime
View raw message