spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yuri Oleynikov (יורי אולייניקוב <yur...@gmail.com>
Subject Re: RDD filter in for loop gave strange results
Date Wed, 20 Jan 2021 15:43:54 GMT
A. global scope and global variables are bad habits in Python (this is
about an 'rdd' and 'i' variable used in lambda).
B. lambdas are usually misused and abused in Python especially when they
used in global context: ideally you'd like to use pure functions and use
something like:
```

def my_rdd_filter(value, cur_elem):
    return cur_elem != value

rdd = spark.sparkContext.parallelize([0, 1, 2])

for i in range(3):
    func_filter = functools.partial(my_rdd_filter, i)
    rdd = rdd.filter(func_filter)

```
This is better and testable Pythonic code: if you want to pass a context
for callable -> use partial or create a callable object with context in
__init__ arg (BTW this is what is done in Java).

Unfortunately partials and  callable objects are not supported in PySpark
- though they considered more Pythonic way.

anyway,
following works as you expected

def filter_rdd(j, my_rdd):
    # this is a local context
    print("RDD is ", my_rdd.collect())
    print("Filtered RDD is ", my_rdd.filter(lambda x: x != j).collect())
    my_rdd = my_rdd.filter(lambda x: x != j)
    print("Result is ", my_rdd.collect())
    print()
    return my_rdd

# this is global context
rdd = spark.sparkContext.parallelize([0, 1, 2])

for i in range(3):
    rdd = filter_rdd(i, rdd)


This is better and testable Pythonic code: if you want to pass a context
for callable -> use partial or create a callable object with context in
__init__ arg (BTW this is what is done in Java).

Unfortunately partials and callable objects are not supported in PySpark  -
though they considered more Pythonic way.

anyway running code other than calling main/seting contstants in global
context is bad practice in Python.

Hope this helps


ср, 20 янв. 2021 г. в 15:08, Marco Wong <mckwxp@gmail.com>:

> Dear Spark users,
>
> I ran the Python code below on a simple RDD, but it gave strange results.
> The filtered RDD contains non-existent elements which were filtered away
> earlier. Any idea why this happened?
> ```
> rdd = spark.sparkContext.parallelize([0,1,2])
> for i in range(3):
>     print("RDD is ", rdd.collect())
>     print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
>     rdd = rdd.filter(lambda x:x!=i)
>     print("Result is ", rdd.collect())
>     print()
> ```
> which gave
> ```
> RDD is  [0, 1, 2]
> Filtered RDD is  [1, 2]
> Result is  [1, 2]
>
> RDD is  [1, 2]
> Filtered RDD is  [0, 2]
> Result is  [0, 2]
>
> RDD is  [0, 2]
> Filtered RDD is  [0, 1]
> Result is  [0, 1]
> ```
>
> Thanks,
>
> Marco
>

Mime
View raw message