spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marco Mistroni <mmistr...@gmail.com>
Subject Re: createDataFrame causing a strange error.
Date Mon, 28 Nov 2016 21:50:30 GMT
Hi Andrew,
sorry but to me it seems s3 is the culprit....
I have downloaded your json file and stored locally. Then write this simple
app (a subset of what you have in ur github, sorry i m littebit rusty on
how to create new column out of existing ones) which basically read the
json file
It's in Scala, but the Python equivaletnt shouldn't be difficult
i noticed that in your schema you forgot the timezone column.... was that
intentional?
Anyway, below the code. i ran it  with Spark 2.0 and  similarly 1.6...
found no issues in reading the  data. If i have some time i'll try to store
your json on one of my s3 bucket and read it via spark from EC2


def main(args:Array[String]) = {
    import org.apache.spark.sql.types._
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd._
    import org.apache.spark.SparkContext._
    import org.apache.spark.sql._


    val conf = new SparkConf().setAppName("Simple
Application").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    // no schema
    val jsonContentNoSchema =
sqlContext.read.json("file:///c:/tmp/1973-01-11.json")
    jsonContentNoSchema.printSchema()
    println(s"TheJsonContent with No SChema has
${jsonContentNoSchema.count()}")
    // with schema

    val jsonRdd = sc.textFile("file:///c:/tmp/1973-01-11.json")
    import sqlContext.implicits._

    val schema = (new StructType).add("hour", StringType).add("month",
StringType)
                  .add("second", StringType).add("year", StringType)
                  .add("timezone", StringType).add("day", StringType)
                  .add("minute", StringType)

    val jsonContentWithSchema = sqlContext.jsonRDD(jsonRdd, schema)
    println(s"----- And the Json withSchema has
${jsonContentWithSchema.count()} rows")


  }

hope this helps
kr
marco



On Mon, Nov 28, 2016 at 2:48 PM, Andrew Holway <
andrew.holway@otternetworks.de> wrote:

> I extracted out the boto bits and tested in vanilla python on the nodes. I
> am pretty sure that the data from S3 is ok. I've applied a public policy to
> the bucket s3://time-waits-for-no-man. There is a publicly available object
> here: https://s3-eu-west-1.amazonaws.com/time-waits-for-no-man/1973-01-11
>
> I'm using boto because using proxies with spark and hadoop in general is a
> bit of a black art.
>
>
> [centos@hadoop002 ~]$ python s3_test.py
> object key
> 1973-01-11
> Length of List
> 86400
> First row
> {u'hour': u'00', 'timestamp': 95558400, u'month': u'01', u'second': u'00',
> u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'00'}
> Last row
> {u'hour': u'23', 'timestamp': 95644799, u'month': u'01', u'second': u'59',
> u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'59'}
> [centos@hadoop002 ~]$ cat s3_test.py
> import boto3
> import ujson
> import arrow
> import sys
> import os
> import getpass
>
> os.environ['HTTPS_PROXY'] = 'https://webproxy:8080
> <https://webproxy.aws.db.de:8080>'
>
> def add_timestamp(dict):
>      dict['timestamp'] = arrow.get(
>                          int(dict['year']),
>                          int(dict['month']),
>                          int(dict['day']),
>                          int(dict['hour']),
>                          int(dict['minute']),
>                          int(dict['second'])
>                          ).timestamp
>      return dict
>
> s3_list = []
> s3 = boto3.resource('s3')
> my_bucket = s3.Bucket('time-waits-for-no-man')
> for object in my_bucket.objects.filter(Prefix='1973-01-11'):
>      s3_list.append(object.key)
>
> print("object key")
> print (s3_list[0])
>
> s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man',
> key=s3_list[0])
> contents = s3obj.get()['Body'].read().decode()
> meow = contents.splitlines()
> result_wo_timestamp = map(ujson.loads, meow)
> result_wo_timestamp[0]
> result_wi_timestamp = map(add_timestamp, result_wo_timestamp)
>
> print("Length of List")
> print(len(result_wi_timestamp))
> print("First row")
> print(result_wi_timestamp[0])
> print("Last row")
> print(result_wi_timestamp[86399])
>
>
>
>
> On Sun, Nov 27, 2016 at 7:11 PM, Marco Mistroni <mmistroni@gmail.com>
> wrote:
>
>> Hi
>>
>> pickle erros normally point to serialisation issue. i am suspecting
>> something wrong with ur S3 data , but is just a wild guess...
>>
>> Is your s3 object publicly available?
>>
>> few suggestions to nail down the problem
>>
>> 1 - try  to see if you can read your object from s3 using boto3 library
>> 'offline', meaning not in a spark code
>>
>> 2 - try to replace your distributedJsonRead. instead of reading from s3,
>> generate a string out of a snippet of your json object
>>
>> 3 - Spark can read  data from s3 as well , just do  a
>> sc.textFile('s3://....) ==> http://www.sparktutorials.
>> net/reading-and-writing-s3-data-with-apache-spark. Try to se spark
>> entirely to read and process the data, rather than go via boto3. It adds an
>> extra complexity which you dont need
>>
>> If you send a snippet ofyour json content, then everyone on the list can
>> run the code and try to reproduce
>>
>>
>> hth
>>
>>  Marco
>>
>>
>> On 27 Nov 2016 7:33 pm, "Andrew Holway" <andrew.holway@otternetworks.de>
>> wrote:
>>
>>> I get a slight different error when not specifying a schema:
>>>
>>> Traceback (most recent call last):
>>>   File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py",
>>> line 61, in <module>
>>>     df = sqlContext.createDataFrame(foo)
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py",
>>> line 299, in createDataFrame
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>> line 520, in createDataFrame
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>> line 360, in _createFromRDD
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>> line 331, in _inferSchema
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 1328, in first
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 1310, in take
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/context.py",
>>> line 941, in runJob
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2403, in _jrdd
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2336, in _wrap_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>> line 2315, in _prepare_for_python_RDD
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 428, in dumps
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 657, in dumps
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 107, in dump
>>>   File "/usr/lib64/python2.7/pickle.py", line 224, in dump
>>>     self.save(obj)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
>>>     save(element)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>     save(element)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>     self._batch_appends(iter(obj))
>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>     save(x)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>     save(element)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>     self._batch_appends(iter(obj))
>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>     save(x)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 204, in save_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 241, in save_function_tuple
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>     save(element)
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>     self._batch_appends(iter(obj))
>>>   File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
>>>     save(tmp[0])
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 198, in save_function
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>> line 246, in save_function_tuple
>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>     f(self, obj) # Call unbound method with explicit self
>>>   File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
>>>     self._batch_setitems(obj.iteritems())
>>>   File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
>>>     save(v)
>>>   File "/usr/lib64/python2.7/pickle.py", line 306, in save
>>>     rv = reduce(self.proto)
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>>> line 933, in __call__
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>> line 63, in deco
>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
>>> line 316, in get_return_value
>>> py4j.protocol.Py4JError: An error occurred while calling
>>> o33.__getnewargs__. Trace:
>>> py4j.Py4JException: Method __getnewargs__([]) does not exist
>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
>>> at py4j.Gateway.invoke(Gateway.java:272)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:211)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> On Sun, Nov 27, 2016 at 8:32 PM, Andrew Holway <
>>> andrew.holway@otternetworks.de> wrote:
>>>
>>>> Hi,
>>>>
>>>> Can anyone tell me what is causing this error
>>>> Spark 2.0.0
>>>> Python 2.7.5
>>>>
>>>> df = sqlContext.createDataFrame(foo, schema)
>>>> https://gist.github.com/mooperd/368e3453c29694c8b2c038d6b7b4413a
>>>>
>>>> Traceback (most recent call last):
>>>>   File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py",
>>>> line 61, in <module>
>>>>     df = sqlContext.createDataFrame(foo, schema)
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py",
>>>> line 299, in createDataFrame
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py",
>>>> line 523, in createDataFrame
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2220, in _to_java_object_rdd
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2403, in _jrdd
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2336, in _wrap_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py",
>>>> line 2315, in _prepare_for_python_RDD
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>>>> line 428, in dumps
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 657, in dumps
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 107, in dump
>>>>   File "/usr/lib64/python2.7/pickle.py", line 224, in dump
>>>>     self.save(obj)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
>>>>     save(element)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>     save(element)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>     self._batch_appends(iter(obj))
>>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>>     save(x)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>     save(element)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>     self._batch_appends(iter(obj))
>>>>   File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
>>>>     save(x)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 204, in save_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 241, in save_function_tuple
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
>>>>     save(element)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
>>>>     self._batch_appends(iter(obj))
>>>>   File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
>>>>     save(tmp[0])
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 198, in save_function
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py",
>>>> line 246, in save_function_tuple
>>>>   File "/usr/lib64/python2.7/pickle.py", line 286, in save
>>>>     f(self, obj) # Call unbound method with explicit self
>>>>   File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
>>>>     self._batch_setitems(obj.iteritems())
>>>>   File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
>>>>     save(v)
>>>>   File "/usr/lib64/python2.7/pickle.py", line 306, in save
>>>>     rv = reduce(self.proto)
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py",
>>>> line 933, in __call__
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>>> line 63, in deco
>>>>   File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py",
>>>> line 316, in get_return_value
>>>> py4j.protocol.Py4JError: An error occurred while calling
>>>> o33.__getnewargs__. Trace:
>>>> py4j.Py4JException: Method __getnewargs__([]) does not exist
>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.
>>>> java:318)
>>>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.
>>>> java:326)
>>>> at py4j.Gateway.invoke(Gateway.java:272)
>>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>> at py4j.GatewayConnection.run(GatewayConnection.java:211)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>>
>>>> --
>>>> Otter Networks UG
>>>> http://otternetworks.de
>>>> Gotenstraße 17
>>>> 10829 Berlin
>>>>
>>>
>>>
>>>
>>> --
>>> Otter Networks UG
>>> http://otternetworks.de
>>> Gotenstraße 17
>>> 10829 Berlin
>>>
>>
>
>
> --
> Otter Networks UG
> http://otternetworks.de
> Gotenstraße 17
> 10829 Berlin
>

Mime
View raw message