DataSourceV2: pushFilters() is not invoked for each read call - spark 2.3.2

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

DataSourceV2: pushFilters() is not invoked for each read call - spark 2.3.2

Shubham Chaurasia
Hi,

I am using spark v2.3.2. I have an implementation of DSV2. Here is what is happening:

1) Obtained a dataframe using MyDataSource

scala> val df1 = spark.read.format("com.shubham.MyDataSource").load
MyDataSource.MyDataSource
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@2b85edc7
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
df1: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field]

2) show() on df1 
scala> df1.show
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pruneColumns: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
=======MyDataSourceReader.createBatchDataReaderFactories=======
prunedSchema = StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
pushedFilters = [] 
=======MyDataSourceReader.createBatchDataReaderFactories=======
+---+---+---+
| c1| c2| c3|
+---+---+---+
+---+---+---+

3) val df2 = df1.filter($"c3" > 1)

scala> df2.show
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pruneColumns: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
=======MyDataSourceReader.createBatchDataReaderFactories=======
prunedSchema = StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]
=======MyDataSourceReader.createBatchDataReaderFactories=======
+---+---+---+
| c1| c2| c3|
+---+---+---+
+---+---+---+

4) Again df1.show() <=== As df2 is derived from df1(and share same instance of MyDataSourceReader), this modifies pushedFilters even for df1
scala> df1.show
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pruneColumns: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
=======MyDataSourceReader.createBatchDataReaderFactories=======
prunedSchema = StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]

=======MyDataSourceReader.createBatchDataReaderFactories=======
+---+---+---+
| c1| c2| c3|
+---+---+---+
+---+---+---+

pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)] is not correct in step 4) as there were no filters specified for df1.

This is due to I am maintaining pushedFilters variable in MyDataSourceReader which is modified by df2.filter().show.

Questions:
Q1: How to maintain this state in DataSourceReader implementations? 
Q2: Shouldn't spark call pushFilters() method every time(regardless of .filter() is present or not) we invoke some action, in the similar manner as it calls pruneColumns()? 

I understand that pushFilters() is only invoked when .filter() is there in dataframe but as we saw in above scenario, it's making the state of MyDataSourceReader inconsistent and hence the question Q2.

Minimal Code:

public class MyDataSource implements DataSourceRegister, DataSourceV2, ReadSupport, WriteSupport {

public MyDataSource() {
System.out.println("MyDataSource.MyDataSource");
}

@Override
public DataSourceReader createReader(DataSourceOptions options) {
System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
return new MyDataSourceReader(options.asMap());
}

@Override
public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
System.out.println("MyDataSource.createWriter: Going to create a new MyDataSourceWriter");
return Optional.of(new MyDataSourceWriter(schema));
}

@Override
public String shortName() {
return "com.shubham.MyDataSource";
}
}

public class MyDataSourceReader implements DataSourceReader, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch, SupportsPushDownFilters {

private Map<String, String> options;
private StructType baseSchema;
private StructType prunedSchema;
private Filter[] pushedFilters = new Filter[0];

public MyDataSourceReader(Map<String, String> options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
this.options = options;
}

@Override
public StructType readSchema() {
this.baseSchema = (new StructType())
.add("c1", "int")
.add("c2", "int")
.add("c3", "int");
System.out.println("MyDataSourceReader.readSchema: " + this + " baseSchema: " + this.baseSchema);
return this.baseSchema;
}

@Override
public Filter[] pushFilters(Filter[] filters) {
System.out.println("MyDataSourceReader.pushFilters: " + Arrays.toString(filters));
// filters that can be pushed down.
// for this example, let's assume all the filters can be pushed down.
this.pushedFilters = filters;

// filter's that can't be pushed down.
return new Filter[0];
}

@Override
public Filter[] pushedFilters() {
//System.out.println("MyDataSourceReader.pushedFilters: " + Arrays.toString(pushedFilters));
return this.pushedFilters;
}

@Override
public void pruneColumns(StructType requiredSchema) {
System.out.println("MyDataSourceReader.pruneColumns: " + requiredSchema);
this.prunedSchema = requiredSchema;
}

@Override
public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
// do the actual operation with baseSchema, prunedSchema, pushedFilters

System.out.println("prunedSchema = " + prunedSchema);
System.out.println("pushedFilters = " + Arrays.toString(pushedFilters));

System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");

return new ArrayList<>();
}
} 

Thanks,
Shubham



Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2: pushFilters() is not invoked for each read call - spark 2.3.2

Hyukjin Kwon
I believe this issue was fixed in Spark 2.4.

Spark DataSource V2 has been still being radically developed - It is not complete yet until now.
So, I think the feasible option to get through at the current moment is:
  1. upgrade to higher Spark versions
  2. disable filter push down at your DataSource V2 implementation

I don't think Spark community will backport or fix things at branch-2.3 which will be EOL release soon.
For each branch, DataSource V2 has totally different codes.
Fixing those specifically in each branch will bring considerable overhead.
I believe that's usually the same case too for some internal Spark forks as well.



2019년 9월 6일 (금) 오후 3:25, Shubham Chaurasia <[hidden email]>님이 작성:
Hi,

I am using spark v2.3.2. I have an implementation of DSV2. Here is what is happening:

1) Obtained a dataframe using MyDataSource

scala> val df1 = spark.read.format("com.shubham.MyDataSource").load
MyDataSource.MyDataSource
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@2b85edc7
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
df1: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 1 more field]

2) show() on df1 
scala> df1.show
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pruneColumns: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
=======MyDataSourceReader.createBatchDataReaderFactories=======
prunedSchema = StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
pushedFilters = [] 
=======MyDataSourceReader.createBatchDataReaderFactories=======
+---+---+---+
| c1| c2| c3|
+---+---+---+
+---+---+---+

3) val df2 = df1.filter($"c3" > 1)

scala> df2.show
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushedFilters: []
MyDataSourceReader.pushFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pruneColumns: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
=======MyDataSourceReader.createBatchDataReaderFactories=======
prunedSchema = StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]
=======MyDataSourceReader.createBatchDataReaderFactories=======
+---+---+---+
| c1| c2| c3|
+---+---+---+
+---+---+---+

4) Again df1.show() <=== As df2 is derived from df1(and share same instance of MyDataSourceReader), this modifies pushedFilters even for df1
scala> df1.show
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pruneColumns: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@2b85edc7 baseSchema: StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
MyDataSourceReader.pushedFilters: [IsNotNull(c3), GreaterThan(c3,1)]
=======MyDataSourceReader.createBatchDataReaderFactories=======
prunedSchema = StructType(StructField(c1,IntegerType,true), StructField(c2,IntegerType,true), StructField(c3,IntegerType,true))
pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)]

=======MyDataSourceReader.createBatchDataReaderFactories=======
+---+---+---+
| c1| c2| c3|
+---+---+---+
+---+---+---+

pushedFilters = [IsNotNull(c3), GreaterThan(c3,1)] is not correct in step 4) as there were no filters specified for df1.

This is due to I am maintaining pushedFilters variable in MyDataSourceReader which is modified by df2.filter().show.

Questions:
Q1: How to maintain this state in DataSourceReader implementations? 
Q2: Shouldn't spark call pushFilters() method every time(regardless of .filter() is present or not) we invoke some action, in the similar manner as it calls pruneColumns()? 

I understand that pushFilters() is only invoked when .filter() is there in dataframe but as we saw in above scenario, it's making the state of MyDataSourceReader inconsistent and hence the question Q2.

Minimal Code:

public class MyDataSource implements DataSourceRegister, DataSourceV2, ReadSupport, WriteSupport {

public MyDataSource() {
System.out.println("MyDataSource.MyDataSource");
}

@Override
public DataSourceReader createReader(DataSourceOptions options) {
System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
return new MyDataSourceReader(options.asMap());
}

@Override
public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
System.out.println("MyDataSource.createWriter: Going to create a new MyDataSourceWriter");
return Optional.of(new MyDataSourceWriter(schema));
}

@Override
public String shortName() {
return "com.shubham.MyDataSource";
}
}

public class MyDataSourceReader implements DataSourceReader, SupportsPushDownRequiredColumns, SupportsScanColumnarBatch, SupportsPushDownFilters {

private Map<String, String> options;
private StructType baseSchema;
private StructType prunedSchema;
private Filter[] pushedFilters = new Filter[0];

public MyDataSourceReader(Map<String, String> options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
this.options = options;
}

@Override
public StructType readSchema() {
this.baseSchema = (new StructType())
.add("c1", "int")
.add("c2", "int")
.add("c3", "int");
System.out.println("MyDataSourceReader.readSchema: " + this + " baseSchema: " + this.baseSchema);
return this.baseSchema;
}

@Override
public Filter[] pushFilters(Filter[] filters) {
System.out.println("MyDataSourceReader.pushFilters: " + Arrays.toString(filters));
// filters that can be pushed down.
// for this example, let's assume all the filters can be pushed down.
this.pushedFilters = filters;

// filter's that can't be pushed down.
return new Filter[0];
}

@Override
public Filter[] pushedFilters() {
//System.out.println("MyDataSourceReader.pushedFilters: " + Arrays.toString(pushedFilters));
return this.pushedFilters;
}

@Override
public void pruneColumns(StructType requiredSchema) {
System.out.println("MyDataSourceReader.pruneColumns: " + requiredSchema);
this.prunedSchema = requiredSchema;
}

@Override
public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");
// do the actual operation with baseSchema, prunedSchema, pushedFilters

System.out.println("prunedSchema = " + prunedSchema);
System.out.println("pushedFilters = " + Arrays.toString(pushedFilters));

System.out.println("=======MyDataSourceReader.createBatchDataReaderFactories=======");

return new ArrayList<>();
}
} 

Thanks,
Shubham