spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nick Pentreath <nick.pentre...@gmail.com>
Subject Re: Using mongo with PySpark
Date Mon, 19 May 2014 09:24:14 GMT
You need to use mapPartitions (or foreachPartition) to instantiate your
client in each partition as it is not serializable by the pickle library.
Something like

def mapper(iter):
    db = MongoClient()['spark_test_db']
*    collec = db['programs']*
*    for val in iter:*
        asc = val.encode('ascii','ignore')
        json = convertToJSON(asc, indexMap)
        yield collec.insert(json)



def convertToJSON(string, indexMap):
    values = string.strip().split(",")
    json = {}
    for i in range(len(values)):
        json[indexMap[i]] = values[i]
    return json

*doc_ids = data.mapPartitions(mapper)*




On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist <
mailinglistsamarth@gmail.com> wrote:

> db = MongoClient()['spark_test_db']
> *collec = db['programs']*
>
> def mapper(val):
>     asc = val.encode('ascii','ignore')
>     json = convertToJSON(asc, indexMap)
>     collec.insert(json) # *this is not working*
>
> def convertToJSON(string, indexMap):
>     values = string.strip().split(",")
>      json = {}
>     for i in range(len(values)):
>         json[indexMap[i]] = values[i]
>     return json
>
> *jsons = data.map(mapper)*
>
>
>
> *The last line does the mapping. I am very new to Spark, can you explain
> what explicit serialization, etc is in the context of spark?  The error I
> am getting:*
> *Traceback (most recent call last):  File "<stdin>", line 1, in <module>
> File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in
> saveAsTextFile
> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
> "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd
> pickled_command = CloudPickleSerializer().dumps(command)   File
> "/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps
>   def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps
>     cp.dump(obj)  File
> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump
>   return pickle.Pickler.dump(self, obj)  File
> "/usr/lib/python2.7/pickle.py", line 224, in dump     self.save(obj)  File
> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
> unbound method with explicit self  File "/usr/lib/python2.7/pickle.py",
> line 548, in save_tuple     save(element)  File
> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
> unbound method with explicit self  File
> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
> save_function     self.save_function_tuple(obj, [themodule])  File
> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
> save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
> line 286, in save     f(self, obj) # Call unbound method with explicit
> self  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
> self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
> 633, in _batch_appends     save(x)  File "/usr/lib/python2.7/pickle.py",
> line 286, in save    f(self, obj) # Call unbound method with explicit self
> File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
> save_function     self.save_function_tuple(obj, [themodule])  File
> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
> save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
> line 286, in save     f(self, obj) # Call unbound method with explicit
> self  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
> self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
> 636, in _batch_appends     save(tmp[0])  File
> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
> unbound method with explicit self  File
> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in
> save_function     self.save_function_tuple(obj, modList)  File
> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in
> save_function_tuple    save(f_globals)  File
> "/usr/lib/python2.7/pickle.py", line 286, in save     f(self, obj) # Call
> unbound method with explicit self  File
> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in
> save_dict    pickle.Pickler.save_dict(self, obj)   File
> "/usr/lib/python2.7/pickle.py", line 649, in save_dict
> self._batch_setitems(obj.iteritems())  File "/usr/lib/python2.7/pickle.py",
> line 681, in _batch_setitems     save(v)  File
> "/usr/lib/python2.7/pickle.py", line 306, in save    rv =
> reduce(self.proto)  File
> "/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489,
> in __call__     self.__name.split(".")[-1])TypeError: 'Collection' object
> is not callable. If you meant to call the '__getnewargs__' method on a
> 'Collection' object it is failing because no such method exists. *
>
>
> On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <mayur.rustagi@gmail.com>wrote:
>
>> You have to ideally pass the mongoclient object along with your data in
>> the mapper(python should be try to serialize your mongoclient, but explicit
>> is better)....
>> if client is serializable then all should end well.. if not then you are
>> better off using map partition & initilizing the driver in each iteration &
>> load data of each partition. Thr is a similar discussion in the list in the
>> past.
>> Regards
>> Mayur
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>>  @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>> On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas <
>> nicholas.chammas@gmail.com> wrote:
>>
>>> Where's your driver code (the code interacting with the RDDs)? Are you
>>> getting serialization errors?
>>>
>>> 2014년 5월 17일 토요일, Samarth Mailinglist<mailinglistsamarth@gmail.com>님이
>>> 작성한 메시지:
>>>
>>> Hi all,
>>>>
>>>> I am trying to store the results of a reduce into mongo.
>>>> I want to share the variable "collection" in the mappers.
>>>>
>>>>
>>>> Here's what I have so far (I'm using pymongo)
>>>>
>>>> db = MongoClient()['spark_test_db']
>>>> collec = db['programs']
>>>> db = MongoClient()['spark_test_db']
>>>> *collec = db['programs']*
>>>>
>>>> def mapper(val):
>>>>     asc = val.encode('ascii','ignore')
>>>>     json = convertToJSON(asc, indexMap)
>>>>     collec.insert(json) # *this is not working*
>>>>
>>>> def convertToJSON(string, indexMap):
>>>>     values = string.strip().split(",")
>>>>     json = {}
>>>>     for i in range(len(values)):
>>>>         json[indexMap[i]] = values[i]
>>>>     return json
>>>>
>>>> How do I do this?
>>>>
>>>
>>
>

Mime
View raw message