spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Holway <andrew.hol...@otternetworks.de>
Subject Re: createDataFrame causing a strange error.
Date Tue, 29 Nov 2016 14:53:00 GMT
Hi Marco,

I was not able to find out what was causing the problem but a "git stash"
seems to have fixed it :/

Thanks for your help... :)

On Mon, Nov 28, 2016 at 10:50 PM, Marco Mistroni <mmistroni@gmail.com>
wrote:

> Hi Andrew,
> sorry but to me it seems s3 is the culprit....
> I have downloaded your json file and stored locally. Then write this
> simple app (a subset of what you have in ur github, sorry i m littebit
> rusty on how to create new column out of existing ones) which basically
> read the json file
> It's in Scala, but the Python equivaletnt shouldn't be difficult
> i noticed that in your schema you forgot the timezone column.... was that
> intentional?
> Anyway, below the code. i ran it  with Spark 2.0 and  similarly 1.6...
> found no issues in reading the  data. If i have some time i'll try to store
> your json on one of my s3 bucket and read it via spark from EC2
>
>
> def main(args:Array[String]) = {
>     import org.apache.spark.sql.types._
>     import org.apache.spark.SparkContext
>     import org.apache.spark.SparkConf
>     import org.apache.spark.rdd._
>     import org.apache.spark.SparkContext._
>     import org.apache.spark.sql._
>
>
>     val conf = new SparkConf().setAppName("Simple Application").setMaster("
> local")
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>
>     // no schema
>     val jsonContentNoSchema = sqlContext.read.json("file:///
> c:/tmp/1973-01-11.json")
>     jsonContentNoSchema.printSchema()
>     println(s"TheJsonContent with No SChema has
> ${jsonContentNoSchema.count()}")
>     // with schema
>
>     val jsonRdd = sc.textFile("file:///c:/tmp/1973-01-11.json")
>     import sqlContext.implicits._
>
>     val schema = (new StructType).add("hour", StringType).add("month",
> StringType)
>                   .add("second", StringType).add("year", StringType)
>                   .add("timezone", StringType).add("day", StringType)
>                   .add("minute", StringType)
>
>     val jsonContentWithSchema = sqlContext.jsonRDD(jsonRdd, schema)
>     println(s"----- And the Json withSchema has
> ${jsonContentWithSchema.count()} rows")
>
>
>   }
>
> hope this helps
> kr
> marco
>
>
>
> On Mon, Nov 28, 2016 at 2:48 PM, Andrew Holway <
> andrew.holway@otternetworks.de> wrote:
>
>> I extracted out the boto bits and tested in vanilla python on the nodes.
>> I am pretty sure that the data from S3 is ok. I've applied a public policy
>> to the bucket s3://time-waits-for-no-man. There is a publicly available
>> object here: https://s3-eu-west-1.amazonaws.com/time-waits-for-no-m
>> an/1973-01-11
>>
>> I'm using boto because using proxies with spark and hadoop in general is
>> a bit of a black art.
>>
>>
>> [centos@hadoop002 ~]$ python s3_test.py
>> object key
>> 1973-01-11
>> Length of List
>> 86400
>> First row
>> {u'hour': u'00', 'timestamp': 95558400, u'month': u'01', u'second':
>> u'00', u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute':
>> u'00'}
>> Last row
>> {u'hour': u'23', 'timestamp': 95644799, u'month': u'01', u'second':
>> u'59', u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute':
>> u'59'}
>> [centos@hadoop002 ~]$ cat s3_test.py
>> import boto3
>> import ujson
>> import arrow
>> import sys
>> import os
>> import getpass
>>
>> os.environ['HTTPS_PROXY'] = 'https://webproxy:8080
>> <https://webproxy.aws.db.de:8080>'
>>
>> def add_timestamp(dict):
>>      dict['timestamp'] = arrow.get(
>>                          int(dict['year']),
>>                          int(dict['month']),
>>                          int(dict['day']),
>>                          int(dict['hour']),
>>                          int(dict['minute']),
>>                          int(dict['second'])
>>                          ).timestamp
>>      return dict
>>
>> s3_list = []
>> s3 = boto3.resource('s3')
>> my_bucket = s3.Bucket('time-waits-for-no-man')
>> for object in my_bucket.objects.filter(Prefix='1973-01-11'):
>>      s3_list.append(object.key)
>>
>> print("object key")
>> print (s3_list[0])
>>
>> s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man',
>> key=s3_list[0])
>> contents = s3obj.get()['Body'].read().decode()
>> meow = contents.splitlines()
>> result_wo_timestamp = map(ujson.loads, meow)
>> result_wo_timestamp[0]
>> result_wi_timestamp = map(add_timestamp, result_wo_timestamp)
>>
>> print("Length of List")
>> print(len(result_wi_timestamp))
>> print("First row")
>> print(result_wi_timestamp[0])
>> print("Last row")
>> print(result_wi_timestamp[86399])
>>
>>
>>
>>
>> On Sun, Nov 27, 2016 at 7:11 PM, Marco Mistroni <mmistroni@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> pickle erros normally point to serialisation issue. i am suspecting
>>> something wrong with ur S3 data , but is just a wild guess...
>>>
>>> Is your s3 object publicly available?
>>>
>>> few suggestions to nail down the problem
>>>
>>> 1 - try  to see if you can read your object from s3 using boto3 library
>>> 'offline', meaning not in a spark code
>>>
>>> 2 - try to replace your distributedJsonRead. instead of reading from s3,
>>> generate a string out of a snippet of your json object
>>>
>>> 3 - Spark can read  data from s3 as well , just do  a
>>> sc.textFile('s3://....) ==> http://www.sparktutorials.
>>> net/reading-and-writing-s3-data-with-apache-spark. Try to se spark
>>> entirely to read and process the data, rather than go via boto3. It adds an
>>> extra complexity which you dont need
>>>
>>> If you send a snippet ofyour json content, then everyone on the list can
>>> run the code and try to reproduce
>>>
>>>
>>> hth
>>>
>>>  Marco
>>>
>>>
>>> On 27 Nov 2016 7:33 pm, "Andrew Holway" <andrew.holway@otternetworks.de>
>>> wrote:
>>>
>>>> I get a slight different error when not specifying a schema:
>>>>
>>>> Traceback (most recent call last):
>>>>   File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py",
>>>> line 61, in <module>
>>>>     df = sqlContext.createDataFrame(foo)
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py",
>>>> line 299, in createDataFrame
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>>> line 520, in createDataFrame
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>>> line 360, in _createFromRDD
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>>> line 331, in _inferSchema
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 1328, in first
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 1310, in take
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/context.py",
>>>> line 941, in runJob
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2403, in _jrdd
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2336, in _wrap_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2315, in _prepare_for_python_RDD
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>>>> line 428, in dumps
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 657, in dumps
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 107, in dump
>>>>   File "/usr/lib64/python2.7/pickle.py", line 224, in dump
>>>>     self.save(obj)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
>>>>     save(element)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>     save(element)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>     self._batch_appends(iter(obj))
>>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>>     save(x)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>     save(element)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>     self._batch_appends(iter(obj))
>>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>>     save(x)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>     save(element)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>     self._batch_appends(iter(obj))
>>>>   File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
>>>>     save(tmp[0])
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 198, in save_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 246, in save_function_tuple
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
>>>>     self._batch_setitems(obj.iteritems())
>>>>   File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
>>>>     save(v)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 306, in save
>>>>     rv = reduce(self.proto)
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>>>> line 933, in __call__
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>>> line 63, in deco
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
>>>> line 316, in get_return_value
>>>> py4j.protocol.Py4JError: An error occurred while calling
>>>> o33.__getnewargs__. Trace:
>>>> py4j.Py4JException: Method __getnewargs__([]) does not exist
>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.
>>>> java:318)
>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.
>>>> java:326)
>>>> at py4j.Gateway.invoke(Gateway.java:272)
>>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>> at py4j.GatewayConnection.run(GatewayConnection.java:211)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>> On Sun, Nov 27, 2016 at 8:32 PM, Andrew Holway <
>>>> andrew.holway@otternetworks.de> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Can anyone tell me what is causing this error
>>>>> Spark 2.0.0
>>>>> Python 2.7.5
>>>>>
>>>>> df = sqlContext.createDataFrame(foo, schema)
>>>>> https://gist.github.com/mooperd/368e3453c29694c8b2c038d6b7b4413a
>>>>>
>>>>> Traceback (most recent call last):
>>>>>   File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py",
>>>>> line 61, in <module>
>>>>>     df = sqlContext.createDataFrame(foo, schema)
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py",
>>>>> line 299, in createDataFrame
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>>>> line 523, in createDataFrame
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>>> line 2220, in _to_java_object_rdd
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>>> line 2403, in _jrdd
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>>> line 2336, in _wrap_function
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>>> line 2315, in _prepare_for_python_RDD
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>>>>> line 428, in dumps
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 657, in dumps
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 107, in dump
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 224, in dump
>>>>>     self.save(obj)
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
>>>>>     save(element)
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 204, in save_function
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 241, in save_function_tuple
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>>     save(element)
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>>     self._batch_appends(iter(obj))
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>>>     save(x)
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 204, in save_function
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 241, in save_function_tuple
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>>     save(element)
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>>     self._batch_appends(iter(obj))
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>>>     save(x)
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 204, in save_function
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 241, in save_function_tuple
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>>     save(element)
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>>     self._batch_appends(iter(obj))
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
>>>>>     save(tmp[0])
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 198, in save_function
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>>> line 246, in save_function_tuple
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>>     f(self, obj) # Call unbound method with explicit self
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
>>>>>     self._batch_setitems(obj.iteritems())
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
>>>>>     save(v)
>>>>>   File "/usr/lib64/python2.7/pickle.py", line 306, in save
>>>>>     rv = reduce(self.proto)
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>>>>> line 933, in __call__
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>>>> line 63, in deco
>>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
>>>>> line 316, in get_return_value
>>>>> py4j.protocol.Py4JError: An error occurred while calling
>>>>> o33.__getnewargs__. Trace:
>>>>> py4j.Py4JException: Method __getnewargs__([]) does not exist
>>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.
>>>>> java:318)
>>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.
>>>>> java:326)
>>>>> at py4j.Gateway.invoke(Gateway.java:272)
>>>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.j
>>>>> ava:128)
>>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>> at py4j.GatewayConnection.run(GatewayConnection.java:211)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Otter Networks UG
>>>>> http://otternetworks.de
>>>>> Gotenstraße 17
>>>>> 10829 Berlin
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Otter Networks UG
>>>> http://otternetworks.de
>>>> Gotenstraße 17
>>>> 10829 Berlin
>>>>
>>>
>>
>>
>> --
>> Otter Networks UG
>> http://otternetworks.de
>> Gotenstraße 17
>> 10829 Berlin
>>
>
>


-- 
Otter Networks UG
http://otternetworks.de
Gotenstraße 17
10829 Berlin

Mime
View raw message