spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rok Roskar <>
Subject Re: PySpark, numpy arrays and binary data
Date Thu, 07 Aug 2014 07:06:48 GMT
thanks for the quick answer!

> numpy array only can support basic types, so we can not use it during collect()
> by default.

sure, but if you knew that a numpy array went in on one end, you could safely use it on the
other end, no? Perhaps it would require an extension of the RDD class and overriding the colect()

> Could you give a short example about how numpy array is used in your project?

sure -- basically our main data structure is a container class (acts like a dictionary) that
holds various arrays that represent particle data. Each particle has various properties, position,
velocity, mass etc. you get at these individual properties by calling something like 


where 's' is the container object and 'pos' is the name of the array. A really common use
case then is to select particles based on their properties and do some plotting, or take a
slice of the particles, e.g. you might do 

r = np.sqrt((s['pos']**2).sum(axis=1))
ind = np.where(r < 5)
plot(s[ind]['x'], s[ind]['y'])

Internally, the various arrays are kept in a dictionary -- I'm hoping to write a class that
keeps them in an RDD instead. To the user, this would have to be transparent, i.e. if the
user wants to get at the data for specific particles, she would just have to do 


for example, and the data would be fetched for her from the RDD just like it would be if she
were simply using the usual single-machine version. This is why the writing to/from files
when retrieving data from the RDD really is a no-go -- can you recommend how this can be circumvented?

>> * is there a preferred way to read binary data off a local disk directly
>> into an RDD? Our I/O routines are built to read data in chunks and each
>> chunk could be read by a different process/RDD, but it's not clear to me how
>> to accomplish this with the existing API. Since the idea is to process data
>> sets that don't fit into a single node's memory, reading first and then
>> distributing via sc.parallelize is obviously not an option.
> If you already know how to partition the data, then you could use
> sc.parallelize()
> to distribute the description of your data, then read the data in parallel by
> given descriptions.
> For examples, you can partition your data into (path, start, length), then
> partitions = [(path1, start1, length), (path1, start2, length), ...]
> def read_chunk(path, start, length):
>      f = open(path)
>      data =
>      #processing the data
> rdd = sc.parallelize(partitions, len(partitions)).flatMap(read_chunk)

right... this is totally obvious in retrospect!  Thanks!


>> * related to the first question -- when an RDD is created by parallelizing a
>> numpy array, the array gets serialized and distributed. I see in the source
>> that it actually gets written into a file first (!?) -- but surely the Py4J
>> bottleneck for python array types (mentioned in the source comment) doesn't
>> really apply to numpy arrays? Is it really necessary to dump the data onto
>> disk first? Conversely, the collect() seems really slow and I suspect that
>> this is due to the combination of disk I/O and python list creation. Are
>> there any ways of getting around this if numpy arrays are being used?
>> I'd be curious about any other best-practices tips anyone might have for
>> running pyspark with numpy data...!
>> Thanks!
>> Rok

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message