RDD filter in for loop gave strange results

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

RDD filter in for loop gave strange results

Marco Wong
Dear Spark users,

I ran the Python code below on a simple RDD, but it gave strange results. The filtered RDD contains non-existent elements which were filtered away earlier. Any idea why this happened?
```
rdd = spark.sparkContext.parallelize([0,1,2])
for i in range(3):
    print("RDD is ", rdd.collect())
    print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
    rdd = rdd.filter(lambda x:x!=i)
    print("Result is ", rdd.collect())
    print()
```
which gave
```
RDD is  [0, 1, 2]
Filtered RDD is  [1, 2]
Result is  [1, 2]

RDD is  [1, 2]
Filtered RDD is  [0, 2]
Result is  [0, 2]

RDD is  [0, 2]
Filtered RDD is  [0, 1]
Result is  [0, 1]
```

Thanks,

Marco
Reply | Threaded
Open this post in threaded view
|

Re: RDD filter in for loop gave strange results

Jacek Laskowski
Hi Marco,

A Scala dev here.

In short: yet another reason against Python :)

Honestly, I've got no idea why the code gives the output. Ran it with 3.1.1-rc1 and got the very same results. Hoping pyspark/python devs will chime in and shed more light on this.

On Wed, Jan 20, 2021 at 2:07 PM Marco Wong <[hidden email]> wrote:
Dear Spark users,

I ran the Python code below on a simple RDD, but it gave strange results. The filtered RDD contains non-existent elements which were filtered away earlier. Any idea why this happened?
```
rdd = spark.sparkContext.parallelize([0,1,2])
for i in range(3):
    print("RDD is ", rdd.collect())
    print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
    rdd = rdd.filter(lambda x:x!=i)
    print("Result is ", rdd.collect())
    print()
```
which gave
```
RDD is  [0, 1, 2]
Filtered RDD is  [1, 2]
Result is  [1, 2]

RDD is  [1, 2]
Filtered RDD is  [0, 2]
Result is  [0, 2]

RDD is  [0, 2]
Filtered RDD is  [0, 1]
Result is  [0, 1]
```

Thanks,

Marco
Reply | Threaded
Open this post in threaded view
|

Re: RDD filter in for loop gave strange results

srowen
In reply to this post by Marco Wong
That looks very odd indeed. Things like this work as expected:

rdd = spark.sparkContext.parallelize([0,1,2])

def my_filter(data, i):
  return data.filter(lambda x: x != i)

for i in range(3):
  rdd = my_filter(rdd, i)
rdd.collect()

... as does unrolling the loop.

But your example behaves as if only the final filter is applied. Is this is some really obscure Python scoping thing with lambdas that I don't understand, like the lambda only binds i once? but then you'd expect to only filter the first number.

I also keep looking in the code to figure out if these are somehow being erroneously 'collapsed' as the same function, but the RDD APIs don't do that kind of thing. They get put into a chain of pipeline_funcs, but, still shouldn't be an issue. I wonder if this is some strange interaction with serialization of the lambda and/or scoping?

Really strange! python people?

On Wed, Jan 20, 2021 at 7:14 AM Marco Wong <[hidden email]> wrote:
Dear Spark users,

I ran the Python code below on a simple RDD, but it gave strange results. The filtered RDD contains non-existent elements which were filtered away earlier. Any idea why this happened?
```
rdd = spark.sparkContext.parallelize([0,1,2])
for i in range(3):
    print("RDD is ", rdd.collect())
    print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
    rdd = rdd.filter(lambda x:x!=i)
    print("Result is ", rdd.collect())
    print()
```
which gave
```
RDD is  [0, 1, 2]
Filtered RDD is  [1, 2]
Result is  [1, 2]

RDD is  [1, 2]
Filtered RDD is  [0, 2]
Result is  [0, 2]

RDD is  [0, 2]
Filtered RDD is  [0, 1]
Result is  [0, 1]
```

Thanks,

Marco
Reply | Threaded
Open this post in threaded view
|

Re: RDD filter in for loop gave strange results

"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"
In reply to this post by Marco Wong
A. global scope and global variables are bad habits in Python (this is about an 'rdd' and 'i' variable used in lambda).
B. lambdas are usually misused and abused in Python especially when they used in global context: ideally you'd like to use pure functions and use something like:
```
def my_rdd_filter(value, cur_elem):
return cur_elem != value

rdd = spark.sparkContext.parallelize([0, 1, 2])

for i in range(3):
func_filter = functools.partial(my_rdd_filter, i)
rdd = rdd.filter(func_filter)
```
This is better and testable Pythonic code: if you want to pass a context for callable -> use partial or create a callable object with context in __init__ arg (BTW this is what is done in Java).

Unfortunately partials and  callable objects are not supported in PySpark  - though they considered more Pythonic way.

anyway,
following works as you expected
def filter_rdd(j, my_rdd):
# this is a local context
print("RDD is ", my_rdd.collect())
print("Filtered RDD is ", my_rdd.filter(lambda x: x != j).collect())
my_rdd = my_rdd.filter(lambda x: x != j)
print("Result is ", my_rdd.collect())
print()
return my_rdd

# this is global context
rdd = spark.sparkContext.parallelize([0, 1, 2])

for i in range(3):
rdd = filter_rdd(i, rdd)

This is better and testable Pythonic code: if you want to pass a context for callable -> use partial or create a callable object with context in __init__ arg (BTW this is what is done in Java).

Unfortunately partials and callable objects are not supported in PySpark  - though they considered more Pythonic way.

anyway running code other than calling main/seting contstants in global context is bad practice in Python.

Hope this helps


ср, 20 янв. 2021 г. в 15:08, Marco Wong <[hidden email]>:
Dear Spark users,

I ran the Python code below on a simple RDD, but it gave strange results. The filtered RDD contains non-existent elements which were filtered away earlier. Any idea why this happened?
```
rdd = spark.sparkContext.parallelize([0,1,2])
for i in range(3):
    print("RDD is ", rdd.collect())
    print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
    rdd = rdd.filter(lambda x:x!=i)
    print("Result is ", rdd.collect())
    print()
```
which gave
```
RDD is  [0, 1, 2]
Filtered RDD is  [1, 2]
Result is  [1, 2]

RDD is  [1, 2]
Filtered RDD is  [0, 2]
Result is  [0, 2]

RDD is  [0, 2]
Filtered RDD is  [0, 1]
Result is  [0, 1]
```

Thanks,

Marco
Reply | Threaded
Open this post in threaded view
|

Re: RDD filter in for loop gave strange results

Zhu Jingnan
In reply to this post by srowen
I thought that was right result.

As rdd runs on a lacy basis.  so every time rdd.collect() executed, the i will be updated to the latest i value, so only one will be filter out.

Regards

Jingnan

On Wed, Jan 20, 2021 at 9:01 AM Sean Owen <[hidden email]> wrote:
That looks very odd indeed. Things like this work as expected:

rdd = spark.sparkContext.parallelize([0,1,2])

def my_filter(data, i):
  return data.filter(lambda x: x != i)

for i in range(3):
  rdd = my_filter(rdd, i)
rdd.collect()

... as does unrolling the loop.

But your example behaves as if only the final filter is applied. Is this is some really obscure Python scoping thing with lambdas that I don't understand, like the lambda only binds i once? but then you'd expect to only filter the first number.

I also keep looking in the code to figure out if these are somehow being erroneously 'collapsed' as the same function, but the RDD APIs don't do that kind of thing. They get put into a chain of pipeline_funcs, but, still shouldn't be an issue. I wonder if this is some strange interaction with serialization of the lambda and/or scoping?

Really strange! python people?

On Wed, Jan 20, 2021 at 7:14 AM Marco Wong <[hidden email]> wrote:
Dear Spark users,

I ran the Python code below on a simple RDD, but it gave strange results. The filtered RDD contains non-existent elements which were filtered away earlier. Any idea why this happened?
```
rdd = spark.sparkContext.parallelize([0,1,2])
for i in range(3):
    print("RDD is ", rdd.collect())
    print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
    rdd = rdd.filter(lambda x:x!=i)
    print("Result is ", rdd.collect())
    print()
```
which gave
```
RDD is  [0, 1, 2]
Filtered RDD is  [1, 2]
Result is  [1, 2]

RDD is  [1, 2]
Filtered RDD is  [0, 2]
Result is  [0, 2]

RDD is  [0, 2]
Filtered RDD is  [0, 1]
Result is  [0, 1]
```

Thanks,

Marco
Reply | Threaded
Open this post in threaded view
|

Re: RDD filter in for loop gave strange results

srowen
No, because the final rdd is really the result of chaining 3 filter operations. They should all execute. It _should_ work like "rdd.filter(...).filter(..).filter(...)"

On Wed, Jan 20, 2021 at 9:46 AM Zhu Jingnan <[hidden email]> wrote:
I thought that was right result.

As rdd runs on a lacy basis.  so every time rdd.collect() executed, the i will be updated to the latest i value, so only one will be filter out.

Regards

Jingnan


Reply | Threaded
Open this post in threaded view
|

Re: RDD filter in for loop gave strange results

Marco Wong
Hmm, I think I got what Jingnan means. The lambda function is x != i and i is not evaluated when the lambda function was defined. So the pipelined rdd is rdd.filter(lambda x: x != i).filter(lambda x: x != i), rather than having the values of i substituted. Does that make sense to you, Sean?

On Wed, 20 Jan 2021 at 15:51, Sean Owen <[hidden email]> wrote:
No, because the final rdd is really the result of chaining 3 filter operations. They should all execute. It _should_ work like "rdd.filter(...).filter(..).filter(...)"

On Wed, Jan 20, 2021 at 9:46 AM Zhu Jingnan <[hidden email]> wrote:
I thought that was right result.

As rdd runs on a lacy basis.  so every time rdd.collect() executed, the i will be updated to the latest i value, so only one will be filter out.

Regards

Jingnan


Reply | Threaded
Open this post in threaded view
|

Re: RDD filter in for loop gave strange results

srowen
Heh that could make sense, but that definitely was not my mental model of how python binds variables! Definitely is not how Scala works. 

On Wed, Jan 20, 2021 at 10:00 AM Marco Wong <[hidden email]> wrote:
Hmm, I think I got what Jingnan means. The lambda function is x != i and i is not evaluated when the lambda function was defined. So the pipelined rdd is rdd.filter(lambda x: x != i).filter(lambda x: x != i), rather than having the values of i substituted. Does that make sense to you, Sean?