spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sam Stoelinga <sammiest...@gmail.com>
Subject Re: Spark Python with SequenceFile containing numpy deserialized data in str form
Date Tue, 09 Jun 2015 04:37:05 GMT
Update: I've done a workaround to use saveAsPickleFile instead which
handles everything correctly. It stays in byte format. Noticed python got
messy with str and byte being the same in Python 2.7, wondering whether
using Python 3 would have the same problem.

I would still like to use a cross language usable SequenceFile instead of
using Picklefile though, so if anybody has pointers would appreciate that :)

On Tue, Jun 9, 2015 at 11:35 AM, Sam Stoelinga <sammiestoel@gmail.com>
wrote:

> Update: Using bytearray before storing to RDD is not a solution either.
> This happens when trying to read the RDD when the value was stored as
> python bytearray:
>
> Traceback (most recent call last):
>
>         [0/9120]
>   File "/vagrant/python/kmeans.py", line 24, in <module>
>     features = sc.sequenceFile(feature_sequencefile_path)
>   File "/usr/local/spark/python/pyspark/context.py", line 490, in
> sequenceFile
>     keyConverter, valueConverter, minSplits, batchSize)
>   File
> "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.sequenceFile.
> : org.apache.hadoop.mapred.InvalidInputException: Input path does not
> exist: hdfs://localhost:9000/tmp/feature-bytearray
>         at
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
>         at
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
>         at
> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:45)
>         at
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
>         at
> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
>         at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>         at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>         at scala.Option.getOrElse(Option.scala:120)
>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>         at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>         at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>         at scala.Option.getOrElse(Option.scala:120)
>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>         at org.apache.spark.rdd.RDD.take(RDD.scala:1156)
>         at
> org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:205)
>         at
> org.apache.spark.api.python.PythonRDD$.sequenceFile(PythonRDD.scala:447)
>         at
> org.apache.spark.api.python.PythonRDD.sequenceFile(PythonRDD.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>         at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>         at py4j.Gateway.invoke(Gateway.java:259)
>         at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at py4j.GatewayConnection.run(GatewayConnection.java:207)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> On Tue, Jun 9, 2015 at 11:04 AM, Sam Stoelinga <sammiestoel@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I'm storing an rdd as sequencefile with the following content:
>> key=filename(string) value=python str from numpy.savez(not unicode)
>>
>> In order to make sure the whole numpy array get's stored I have to first
>> serialize it with:
>> def serialize_numpy_array(numpy_array):
>>     output = io.BytesIO()
>>     np.savez_compressed(output, x=numpy_array)
>>     return output.getvalue()
>>
>> >> type(output.getvalue())
>> str
>>
>> The deserialization returns a python str, *not unicode object*. After
>> deserialization I call
>>
>> my_dersialized_numpy_rdd.saveAsSequenceFile(path)
>>
>> all works well and the RDD get stored successfully. Now the problem
>> starts I want to read the sequencefile again:
>>
>> >> my_dersialized_numpy_rdd = sc.sequenceFile(path)
>> >> first = my_dersialized_numpy_rdd.first()
>> >> type(first[1])
>> unicode
>>
>> The previous str became a unicode object after we stored it to a
>> sequencefile and read it again. Trying to convert it back with
>> first[1].decode("ascii") fails with UnicodeEncodeError: 'ascii' codec can't
>> encode characters in position 1-3: ordinal not in range(128)
>>
>> My expectation was that I would get the data back as how I stored it for
>> example in str format and not in unicode format. Anybody suggestion how I
>> can read back the original data. Will try converting the str to bytearray
>> before storing it to a seqeencefile.
>>
>> Thanks,
>> Sam Stoelinga
>>
>>
>

Mime
View raw message