spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Holway <andrew.hol...@otternetworks.de>
Subject Re: createDataFrame causing a strange error.
Date Mon, 28 Nov 2016 14:48:30 GMT
I extracted out the boto bits and tested in vanilla python on the nodes. I
am pretty sure that the data from S3 is ok. I've applied a public policy to
the bucket s3://time-waits-for-no-man. There is a publicly available object
here: https://s3-eu-west-1.amazonaws.com/time-waits-for-no-man/1973-01-11

I'm using boto because using proxies with spark and hadoop in general is a
bit of a black art.


[centos@hadoop002 ~]$ python s3_test.py
object key
1973-01-11
Length of List
86400
First row
{u'hour': u'00', 'timestamp': 95558400, u'month': u'01', u'second': u'00',
u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'00'}
Last row
{u'hour': u'23', 'timestamp': 95644799, u'month': u'01', u'second': u'59',
u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'59'}
[centos@hadoop002 ~]$ cat s3_test.py
import boto3
import ujson
import arrow
import sys
import os
import getpass

os.environ['HTTPS_PROXY'] = 'https://webproxy:8080
<https://webproxy.aws.db.de:8080>'

def add_timestamp(dict):
     dict['timestamp'] = arrow.get(
                         int(dict['year']),
                         int(dict['month']),
                         int(dict['day']),
                         int(dict['hour']),
                         int(dict['minute']),
                         int(dict['second'])
                         ).timestamp
     return dict

s3_list = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('time-waits-for-no-man')
for object in my_bucket.objects.filter(Prefix='1973-01-11'):
     s3_list.append(object.key)

print("object key")
print (s3_list[0])

s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man',
key=s3_list[0])
contents = s3obj.get()['Body'].read().decode()
meow = contents.splitlines()
result_wo_timestamp = map(ujson.loads, meow)
result_wo_timestamp[0]
result_wi_timestamp = map(add_timestamp, result_wo_timestamp)

print("Length of List")
print(len(result_wi_timestamp))
print("First row")
print(result_wi_timestamp[0])
print("Last row")
print(result_wi_timestamp[86399])




On Sun, Nov 27, 2016 at 7:11 PM, Marco Mistroni <mmistroni@gmail.com> wrote:

> Hi
>
> pickle erros normally point to serialisation issue. i am suspecting
> something wrong with ur S3 data , but is just a wild guess...
>
> Is your s3 object publicly available?
>
> few suggestions to nail down the problem
>
> 1 - try  to see if you can read your object from s3 using boto3 library
> 'offline', meaning not in a spark code
>
> 2 - try to replace your distributedJsonRead. instead of reading from s3,
> generate a string out of a snippet of your json object
>
> 3 - Spark can read  data from s3 as well , just do  a
> sc.textFile('s3://....) ==> http://www.sparktutorials.
> net/reading-and-writing-s3-data-with-apache-spark. Try to se spark
> entirely to read and process the data, rather than go via boto3. It adds an
> extra complexity which you dont need
>
> If you send a snippet ofyour json content, then everyone on the list can
> run the code and try to reproduce
>
>
> hth
>
>  Marco
>
>
> On 27 Nov 2016 7:33 pm, "Andrew Holway" <andrew.holway@otternetworks.de>
> wrote:
>
>> I get a slight different error when not specifying a schema:
>>
>> Traceback (most recent call last):
>>   File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py",
>> line 61, in <module>
>>     df = sqlContext.createDataFrame(foo)
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py",
>> line 299, in createDataFrame
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>> line 520, in createDataFrame
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>> line 360, in _createFromRDD
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>> line 331, in _inferSchema
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>> line 1328, in first
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>> line 1310, in take
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/context.py",
>> line 941, in runJob
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>> line 2403, in _jrdd
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>> line 2336, in _wrap_function
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>> line 2315, in _prepare_for_python_RDD
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>> line 428, in dumps
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 657, in dumps
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 107, in dump
>>   File "/usr/lib64/python2.7/pickle.py", line 224, in dump
>>     self.save(obj)
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
>>     save(element)
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 204, in save_function
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 241, in save_function_tuple
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>     save(element)
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>     self._batch_appends(iter(obj))
>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>     save(x)
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 204, in save_function
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 241, in save_function_tuple
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>     save(element)
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>     self._batch_appends(iter(obj))
>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>     save(x)
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 204, in save_function
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 241, in save_function_tuple
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>     save(element)
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>     self._batch_appends(iter(obj))
>>   File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
>>     save(tmp[0])
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 198, in save_function
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>> line 246, in save_function_tuple
>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>     f(self, obj) # Call unbound method with explicit self
>>   File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
>>     self._batch_setitems(obj.iteritems())
>>   File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
>>     save(v)
>>   File "/usr/lib64/python2.7/pickle.py", line 306, in save
>>     rv = reduce(self.proto)
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>> line 933, in __call__
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
>> line 63, in deco
>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
>> line 316, in get_return_value
>> py4j.protocol.Py4JError: An error occurred while calling
>> o33.__getnewargs__. Trace:
>> py4j.Py4JException: Method __getnewargs__([]) does not exist
>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
>> at py4j.Gateway.invoke(Gateway.java:272)
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:211)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> On Sun, Nov 27, 2016 at 8:32 PM, Andrew Holway <
>> andrew.holway@otternetworks.de> wrote:
>>
>>> Hi,
>>>
>>> Can anyone tell me what is causing this error
>>> Spark 2.0.0
>>> Python 2.7.5
>>>
>>> df = sqlContext.createDataFrame(foo, schema)
>>> https://gist.github.com/mooperd/368e3453c29694c8b2c038d6b7b4413a
>>>
>>> Traceback (most recent call last):
>>>   File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py",
>>> line 61, in <module>
>>>     df = sqlContext.createDataFrame(foo, schema)
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py",
>>> line 299, in createDataFrame
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>> line 523, in createDataFrame
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2220, in _to_java_object_rdd
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2403, in _jrdd
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2336, in _wrap_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2315, in _prepare_for_python_RDD
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 428, in dumps
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 657, in dumps
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 107, in dump
>>>   File "/usr/lib64/python2.7/pickle.py", line 224, in dump
>>>     self.save(obj)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
>>>     save(element)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>     save(element)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>     self._batch_appends(iter(obj))
>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>     save(x)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>     save(element)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>     self._batch_appends(iter(obj))
>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>     save(x)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>     save(element)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>     self._batch_appends(iter(obj))
>>>   File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
>>>     save(tmp[0])
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 198, in save_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 246, in save_function_tuple
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
>>>     self._batch_setitems(obj.iteritems())
>>>   File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
>>>     save(v)
>>>   File "/usr/lib64/python2.7/pickle.py", line 306, in save
>>>     rv = reduce(self.proto)
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>>> line 933, in __call__
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>> line 63, in deco
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
>>> line 316, in get_return_value
>>> py4j.protocol.Py4JError: An error occurred while calling
>>> o33.__getnewargs__. Trace:
>>> py4j.Py4JException: Method __getnewargs__([]) does not exist
>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
>>> at py4j.Gateway.invoke(Gateway.java:272)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:211)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
>>> --
>>> Otter Networks UG
>>> http://otternetworks.de
>>> Gotenstraße 17
>>> 10829 Berlin
>>>
>>
>>
>>
>> --
>> Otter Networks UG
>> http://otternetworks.de
>> Gotenstraße 17
>> 10829 Berlin
>>
>


-- 
Otter Networks UG
http://otternetworks.de
Gotenstraße 17
10829 Berlin

Mime
View raw message