Hi Andrew,
sorry but to me it seems s3 is the culprit....
I have downloaded your json file and stored locally. Then write this simple app (a subset of what you have in ur github, sorry i m littebit rusty on how to create new column out of existing ones) which basically read the json file
It's in Scala, but the Python equivaletnt shouldn't be difficult
i noticed that in your schema you forgot the timezone column.... was that intentional?
Anyway, below the code. i ran it  with Spark 2.0 and  similarly 1.6... found no issues in reading the  data. If i have some time i'll try to store your json on one of my s3 bucket and read it via spark from EC2


def main(args:Array[String]) = {
    import org.apache.spark.sql.types._
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd._
    import org.apache.spark.SparkContext._
    import org.apache.spark.sql._

    
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    
    // no schema
    val jsonContentNoSchema = sqlContext.read.json("file:///c:/tmp/1973-01-11.json")
    jsonContentNoSchema.printSchema()
    println(s"TheJsonContent with No SChema has ${jsonContentNoSchema.count()}")   
    // with schema
    
    val jsonRdd = sc.textFile("file:///c:/tmp/1973-01-11.json")
    import sqlContext.implicits._
    
    val schema = (new StructType).add("hour", StringType).add("month", StringType)
                  .add("second", StringType).add("year", StringType)
                  .add("timezone", StringType).add("day", StringType)
                  .add("minute", StringType)
    
    val jsonContentWithSchema = sqlContext.jsonRDD(jsonRdd, schema)
    println(s"----- And the Json withSchema has ${jsonContentWithSchema.count()} rows")
 
 
  }
 
hope this helps
kr
marco



On Mon, Nov 28, 2016 at 2:48 PM, Andrew Holway <andrew.holway@otternetworks.de> wrote:
I extracted out the boto bits and tested in vanilla python on the nodes. I am pretty sure that the data from S3 is ok. I've applied a public policy to the bucket s3://time-waits-for-no-man. There is a publicly available object here: https://s3-eu-west-1.amazonaws.com/time-waits-for-no-man/1973-01-11

I'm using boto because using proxies with spark and hadoop in general is a bit of a black art.
 

[centos@hadoop002 ~]$ python s3_test.py
object key
1973-01-11
Length of List
86400
First row
{u'hour': u'00', 'timestamp': 95558400, u'month': u'01', u'second': u'00', u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'00'}
Last row
{u'hour': u'23', 'timestamp': 95644799, u'month': u'01', u'second': u'59', u'year': u'1973', u'timezone': u'-00:00', u'day': u'11', u'minute': u'59'}
[centos@hadoop002 ~]$ cat s3_test.py
import boto3
import ujson
import arrow
import sys
import os
import getpass

os.environ['HTTPS_PROXY'] = 'https://webproxy:8080'

def add_timestamp(dict):
     dict['timestamp'] = arrow.get(
                         int(dict['year']),
                         int(dict['month']),
                         int(dict['day']),
                         int(dict['hour']),
                         int(dict['minute']),
                         int(dict['second'])
                         ).timestamp
     return dict

s3_list = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('time-waits-for-no-man')
for object in my_bucket.objects.filter(Prefix='1973-01-11'):
     s3_list.append(object.key)

print("object key")
print (s3_list[0])

s3obj = boto3.resource('s3').Object(bucket_name='time-waits-for-no-man', key=s3_list[0])
contents = s3obj.get()['Body'].read().decode()
meow = contents.splitlines()
result_wo_timestamp = map(ujson.loads, meow)
result_wo_timestamp[0]
result_wi_timestamp = map(add_timestamp, result_wo_timestamp)

print("Length of List")
print(len(result_wi_timestamp))
print("First row")
print(result_wi_timestamp[0])
print("Last row")
print(result_wi_timestamp[86399])




On Sun, Nov 27, 2016 at 7:11 PM, Marco Mistroni <mmistroni@gmail.com> wrote:

Hi

pickle erros normally point to serialisation issue. i am suspecting something wrong with ur S3 data , but is just a wild guess...

Is your s3 object publicly available? 

few suggestions to nail down the problem

1 - try  to see if you can read your object from s3 using boto3 library 'offline', meaning not in a spark code

2 - try to replace your distributedJsonRead. instead of reading from s3, generate a string out of a snippet of your json object

3 - Spark can read  data from s3 as well , just do  a sc.textFile('s3://....) ==> http://www.sparktutorials.net/reading-and-writing-s3-data-with-apache-spark. Try to se spark entirely to read and process the data, rather than go via boto3. It adds an extra complexity which you dont need

If you send a snippet ofyour json content, then everyone on the list can run the code and try to reproduce


hth

 Marco


On 27 Nov 2016 7:33 pm, "Andrew Holway" <andrew.holway@otternetworks.de> wrote:
I get a slight different error when not specifying a schema:

Traceback (most recent call last):
  File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py", line 61, in <module>
    df = sqlContext.createDataFrame(foo)
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", line 299, in createDataFrame
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 520, in createDataFrame
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 360, in _createFromRDD
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 331, in _inferSchema
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1328, in first
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1310, in take
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/context.py", line 941, in runJob
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2403, in _jrdd
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2336, in _wrap_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 657, in dumps
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump
  File "/usr/lib64/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 204, in save_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 204, in save_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 204, in save_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 198, in save_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 246, in save_function_tuple
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o33.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)


On Sun, Nov 27, 2016 at 8:32 PM, Andrew Holway <andrew.holway@otternetworks.de> wrote:
Hi,

Can anyone tell me what is causing this error 
Spark 2.0.0
Python 2.7.5

df = sqlContext.createDataFrame(foo, schema)

Traceback (most recent call last):
  File "/home/centos/fun-functions/spark-parrallel-read-from-s3/tick.py", line 61, in <module>
    df = sqlContext.createDataFrame(foo, schema)
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/context.py", line 299, in createDataFrame
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 523, in createDataFrame
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2220, in _to_java_object_rdd
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2403, in _jrdd
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2336, in _wrap_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2315, in _prepare_for_python_RDD
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 657, in dumps
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump
  File "/usr/lib64/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 204, in save_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 204, in save_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 204, in save_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 198, in save_function
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 246, in save_function_tuple
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/hdp/2.5.0.0-1245/spark2/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 316, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o33.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)



--
Otter Networks UG
http://otternetworks.de
Gotenstraße 17
10829 Berlin



--
Otter Networks UG
http://otternetworks.de
Gotenstraße 17
10829 Berlin



--
Otter Networks UG
http://otternetworks.de
Gotenstraße 17
10829 Berlin