Incorrect results in left_outer join in DSv2 implementation with filter pushdown - spark 2.3.2

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Incorrect results in left_outer join in DSv2 implementation with filter pushdown - spark 2.3.2

Shubham Chaurasia
Hi,

Consider the following statements:

1)
scala> val df = spark.read.format("com.shubham.MyDataSource").load
scala> df.show
+---+---+
|  i|  j|
+---+---+
|  0|  0|
|  1| -1|
|  2| -2|
|  3| -3|
|  4| -4|
+---+---+
2)
scala> val df1 = df.filter("i < 3")
scala> df1.show
+---+---+
|  i|  j|
+---+---+
|  0|  0|
|  1| -1|
|  2| -2|
+---+---+
3)
scala> df.join(df1, Seq("i"), "left_outer").show
+---+---+---+
|  i|  j|  j|
+---+---+---+
|  1| -1| -1|
|  2| -2| -2|
|  0|  0|  0|
+---+---+---+

3) is not producing the right results for left_outer join.

Here is the minimal code.
-------------------------------------------------------------------
public class MyDataSourceReader implements DataSourceReader, SupportsPushDownFilters {

private Filter[] pushedFilters = new Filter[0];
private boolean hasFilters = false;

public MyDataSourceReader(Map<String, String> options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
}

@Override
public StructType readSchema() {
return (new StructType())
.add("i", "int")
.add("j", "int");
}

@Override
public Filter[] pushFilters(Filter[] filters) {
System.out.println("MyDataSourceReader.pushFilters: " + Arrays.toString(filters));
hasFilters = true;
pushedFilters = filters;
// filter's that can't be pushed down.
return new Filter[0];
}

@Override
public Filter[] pushedFilters() {
return pushedFilters;
}

@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
int ltFilter = Integer.MAX_VALUE;
if (hasFilters) {
ltFilter = getLTFilter("i");
}
hasFilters = false;
return Lists.newArrayList(new SimpleDataReaderFactory(0, 5, ltFilter));
}

private int getLTFilter(String attributeName) {
int filterValue = Integer.MAX_VALUE;
for (Filter pushedFilter : pushedFilters) {
if (pushedFilter instanceof LessThan) {
LessThan lt = (LessThan) pushedFilter;
if (lt.attribute().equals(attributeName)) {
filterValue = (int) lt.value();
}
}
}
return filterValue;
}
}
------------------------------------------------------------
public class SimpleDataReaderFactory implements DataReaderFactory<Row> {
  private final int start;
private final int end;
private int current;
private final int iLTFilter;

public SimpleDataReaderFactory(int start, int end, int iLTFilter) {
this.start = start;
this.end = end;
this.iLTFilter = iLTFilter;
}

@Override
public DataReader<Row> createDataReader() {
return new SimpleDataReader(start, end, iLTFilter);
}

public static class SimpleDataReader implements DataReader<Row> {
private final int start;
private final int end;
private int current;
private int iLTFilter;

public SimpleDataReader(int start, int end, int iLTFilter) {
this.start = start;
this.end = end;
this.current = start - 1;
this.iLTFilter = iLTFilter;
}
@Override
public boolean next() {
current++;
return current < end && current < iLTFilter ;
}
@Override
public Row get() {
return new GenericRow(new Object[]{current, -current});
}
@Override
public void close() {
}
}
}
------------------------------------------------------------

It seems that somehow spark is applying filter (i < 3) after left_join operation too because of which we see incorrect results in 3).
However I don't see any filter node after join in plan.

== Physical Plan ==
*(5) Project [i#136, j#137, j#228]
+- SortMergeJoin [i#136], [i#227], LeftOuter
   :- *(2) Sort [i#136 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(i#136, 200)
   :     +- *(1) DataSourceV2Scan [i#136, j#137], com.shubham.reader.MyDataSourceReader@714bd7ad
   +- *(4) Sort [i#227 ASC NULLS FIRST], false, 0
      +- ReusedExchange [i#227, j#228], Exchange hashpartitioning(i#136, 200)

Any ideas what might be going wrong? 

Thanks,
Shubham