spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Samarth Mailinglist <mailinglistsama...@gmail.com>
Subject Re: Using mongo with PySpark
Date Mon, 19 May 2014 06:00:14 GMT
db = MongoClient()['spark_test_db']
*collec = db['programs']*

def mapper(val):
    asc = val.encode('ascii','ignore')
    json = convertToJSON(asc, indexMap)
    collec.insert(json) # *this is not working*

def convertToJSON(string, indexMap):
    values = string.strip().split(",")
    json = {}
    for i in range(len(values)):
        json[indexMap[i]] = values[i]
    return json

*jsons = data.map(mapper)*



*The last line does the mapping. I am very new to Spark, can you explain
what explicit serialization, etc is in the context of spark? The error I am
getting:*
*Traceback (most recent call last):  File "<stdin>", line 1, in <module>
File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in
saveAsTextFile
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
"/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd
pickled_command = CloudPickleSerializer().dumps(command)  File
"/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps
  def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
"/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps
  cp.dump(obj)  File
"/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump
  return pickle.Pickler.dump(self, obj)  File
"/usr/lib/python2.7/pickle.py", line 224, in dump    self.save(obj)  File
"/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
unbound method with explicit self  File "/usr/lib/python2.7/pickle.py",
line 548, in save_tuple    save(element)  File
"/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
unbound method with explicit self  File
"/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
save_function    self.save_function_tuple(obj, [themodule])  File
"/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
line 286, in save    f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
633, in _batch_appends    save(x)  File "/usr/lib/python2.7/pickle.py",
line 286, in save    f(self, obj) # Call unbound method with explicit self
File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
save_function    self.save_function_tuple(obj, [themodule])  File
"/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
line 286, in save    f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python2.7/pickle.py", line 600, in save_list
self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
636, in _batch_appends    save(tmp[0])  File
"/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
unbound method with explicit self  File
"/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in
save_function    self.save_function_tuple(obj, modList)  File
"/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in
save_function_tuple    save(f_globals)  File
"/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
unbound method with explicit self  File
"/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in
save_dict    pickle.Pickler.save_dict(self, obj)  File
"/usr/lib/python2.7/pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())  File "/usr/lib/python2.7/pickle.py",
line 681, in _batch_setitems    save(v)  File
"/usr/lib/python2.7/pickle.py", line 306, in save    rv =
reduce(self.proto)  File
"/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489,
in __call__    self.__name.split(".")[-1])TypeError: 'Collection' object is
not callable. If you meant to call the '__getnewargs__' method on a
'Collection' object it is failing because no such method exists.*


On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <mayur.rustagi@gmail.com>wrote:

> You have to ideally pass the mongoclient object along with your data in
> the mapper(python should be try to serialize your mongoclient, but explicit
> is better)....
> if client is serializable then all should end well.. if not then you are
> better off using map partition & initilizing the driver in each iteration &
> load data of each partition. Thr is a similar discussion in the list in the
> past.
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>
>
>
> On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> Where's your driver code (the code interacting with the RDDs)? Are you
>> getting serialization errors?
>>
>> 2014년 5월 17일 토요일, Samarth Mailinglist<mailinglistsamarth@gmail.com>님이
>> 작성한 메시지:
>>
>> Hi all,
>>>
>>> I am trying to store the results of a reduce into mongo.
>>> I want to share the variable "collection" in the mappers.
>>>
>>>
>>> Here's what I have so far (I'm using pymongo)
>>>
>>> db = MongoClient()['spark_test_db']
>>> collec = db['programs']
>>> db = MongoClient()['spark_test_db']
>>> *collec = db['programs']*
>>>
>>> def mapper(val):
>>>     asc = val.encode('ascii','ignore')
>>>     json = convertToJSON(asc, indexMap)
>>>     collec.insert(json) # *this is not working*
>>>
>>> def convertToJSON(string, indexMap):
>>>     values = string.strip().split(",")
>>>     json = {}
>>>     for i in range(len(values)):
>>>         json[indexMap[i]] = values[i]
>>>     return json
>>>
>>> How do I do this?
>>>
>>
>

Mime
View raw message