DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Shubham Chaurasia
Hi All,

--Spark built with tags/v2.4.0-rc2

Consider following DataSourceReader implementation:

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

StructType schema = null;
Map<String, String> options;

public MyDataSourceReader(Map<String, String> options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
this.options = options;
}

@Override
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
//variable this.schema is null here since readSchema() was called on a different instance
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
//more logic......
return null;
}

@Override
public StructType readSchema() {
//some logic to discover schema
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
return this.schema;
}
}
1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().

How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?

Console Logs:

scala> mysource.executeQuery("select * from movie").show

MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@59ea8f1b
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader: Instantiated....MyDataSourceReader@a3cd3ff
MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null

Thanks,
Shubham

Reply | Threaded
Open this post in threaded view
|

RE: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

AssafMendelson

Could you add a fuller code example? I tried to reproduce it in my environment and I am getting just one instance of the reader…

 

Thanks,

        Assaf

 

From: Shubham Chaurasia [mailto:[hidden email]]
Sent: Tuesday, October 9, 2018 9:31 AM
To: [hidden email]
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

Hi All,

 

--Spark built with tags/v2.4.0-rc2

 

Consider following DataSourceReader implementation:

 

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType
schema = null;
  Map<String, String>
options;

 
public MyDataSourceReader(Map<String, String> options) {
    System.
out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
   
this.options = options;
  }

 
@Override
 
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
   
//variable this.schema is null here since readSchema() was called on a different instance
   
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
   
//more logic......
   
return null;
  }

 
@Override
 
public StructType readSchema() {
   
//some logic to discover schema
   
this.schema = (new StructType())
        .add(
"col1", "int")
        .add(
"col2", "string");
    System.
out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
   
return this.schema;
  }
}
1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
 
How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
 
Console Logs:
 
scala> mysource.executeQuery("select * from movie").show
 
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null

 

Thanks,

Shubham

 
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Shubham Chaurasia
Thanks Assaf, you tried with tags/v2.4.0-rc2?

Full Code:

MyDataSource is the entry point which simply creates Reader and Writer

public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport {

  @Override public DataSourceReader createReader(DataSourceOptions options) {
    return new MyDataSourceReader(options.asMap());
  }

  @Override
  public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,
      SaveMode mode, DataSourceOptions options) {
    // creates a dataSourcewriter here..
    return Optional.of(dataSourcewriter);
  }

  @Override public String keyPrefix() {
    return "myprefix";
  }

}

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType schema = null;
  Map<String, String> options;

  public MyDataSourceReader(Map<String, String> options) {
    System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
    this.options = options;
  }

  @Override
  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
    //variable this.schema is null here since readSchema() was called on a different instance
    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
    //more logic......
    return null;
  }

  @Override
  public StructType readSchema() {
    //some logic to discover schema
    this.schema = (new StructType())
        .add("col1", "int")
        .add("col2", "string");
    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
    return this.schema;
  }
}

Thanks,
Shubham

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <[hidden email]> wrote:

Could you add a fuller code example? I tried to reproduce it in my environment and I am getting just one instance of the reader…

 

Thanks,

        Assaf

 

From: Shubham Chaurasia [mailto:[hidden email]]
Sent: Tuesday, October 9, 2018 9:31 AM
To: [hidden email]
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

Hi All,

 

--Spark built with tags/v2.4.0-rc2

 

Consider following DataSourceReader implementation:

 

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType
schema = null;
  Map<String, String>
options;

 
public MyDataSourceReader(Map<String, String> options) {
    System.
out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
   
this.options = options;
  }

 
@Override
 
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
   
//variable this.schema is null here since readSchema() was called on a different instance
   
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
   
//more logic......
   
return null;
  }

 
@Override
 
public StructType readSchema() {
   
//some logic to discover schema
   
this.schema = (new StructType())
        .add(
"col1", "int")
        .add(
"col2", "string");
    System.
out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
   
return this.schema;
  }
}
1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
 
How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
 
Console Logs:
 
scala> mysource.executeQuery("select * from movie").show
 
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null

 

Thanks,

Shubham

 
Reply | Threaded
Open this post in threaded view
|

RE: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

AssafMendelson

I am using v2.4.0-RC2

 

The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How are you calling it?

 

When I do:

Val df = spark.read.format(mypackage).load().show()

I am getting a single creation, how are you creating the reader?

 

Thanks,

        Assaf

 

From: Shubham Chaurasia [mailto:[hidden email]]
Sent: Tuesday, October 9, 2018 2:02 PM
To: Mendelson, Assaf; [hidden email]
Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

Thanks Assaf, you tried with tags/v2.4.0-rc2?

 

Full Code:

 

MyDataSource is the entry point which simply creates Reader and Writer

 

public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport {

 

  @Override public DataSourceReader createReader(DataSourceOptions options) {

    return new MyDataSourceReader(options.asMap());

  }

 

  @Override

  public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,

      SaveMode mode, DataSourceOptions options) {

    // creates a dataSourcewriter here..

    return Optional.of(dataSourcewriter);

  }

 

  @Override public String keyPrefix() {

    return "myprefix";

  }

 

}

 

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

 

  StructType schema = null;

  Map<String, String> options;

 

  public MyDataSourceReader(Map<String, String> options) {

    System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);

    this.options = options;

  }

 

  @Override

  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {

    //variable this.schema is null here since readSchema() was called on a different instance

    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);

    //more logic......

    return null;

  }

 

  @Override

  public StructType readSchema() {

    //some logic to discover schema

    this.schema = (new StructType())

        .add("col1", "int")

        .add("col2", "string");

    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);

    return this.schema;

  }

}

 

Thanks,

Shubham

 

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <[hidden email]> wrote:

Could you add a fuller code example? I tried to reproduce it in my environment and I am getting just one instance of the reader…

 

Thanks,

        Assaf

 

From: Shubham Chaurasia [mailto:[hidden email]]
Sent: Tuesday, October 9, 2018 9:31 AM
To: [hidden email]
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

Hi All,

 

--Spark built with tags/v2.4.0-rc2

 

Consider following DataSourceReader implementation:

 

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType
schema = null;
  Map<String, String>
options;

 
public MyDataSourceReader(Map<String, String> options) {
    System.
out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
   
this.options = options;
  }

 
@Override
 
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
   
//variable this.schema is null here since readSchema() was called on a different instance
   
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
   
//more logic......
   
return null;
  }

 
@Override
 
public StructType readSchema() {
   
//some logic to discover schema
   
this.schema = (new StructType())
        .add(
"col1", "int")
        .add(
"col2", "string");
    System.
out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
   
return this.schema;
  }
}
1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
 
How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
 
Console Logs:
 
scala> mysource.executeQuery("select * from movie").show
 
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null

 

Thanks,

Shubham

 
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Shubham Chaurasia
Alright, so it is a big project which uses a SQL store underneath.
I extracted out the minimal code and made a smaller project out of it and still it is creating multiple instances. 

Here is my project:

├── my-datasource.iml
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── shubham
│   │   │           ├── MyDataSource.java
│   │   │           └── reader
│   │   │               └── MyDataSourceReader.java


MyDataSource.java
-------------------------------------------------
package com.shubham;

import com.shubham.reader.MyDataSourceReader;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;

import java.util.Optional;

public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {

public DataSourceReader createReader(DataSourceOptions options) {
System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
return new MyDataSourceReader(options.asMap());
}

public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
return Optional.empty();
}
}

MyDataSourceReader.java
-------------------------------------------------
package com.shubham.reader;

import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

private Map<String, String> options;
private StructType schema;

public MyDataSourceReader(Map<String, String> options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
this.options = options;
}

@Override
public StructType readSchema() {
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
return this.schema;
}

@Override
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
return new ArrayList<>();
}
}

----------------------------------------
spark-shell output
----------------------------------------
scala> spark.read.format("com.shubham.MyDataSource").option("query", "select * from some_table").load.show

MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
MyDataSourceReader.planBatchInputPartitions: com.shubham.reader.MyDataSourceReader@3095c449 schema: null
+----+----+
|col1|col2|
+----+----+
+----+----+


Here 2 instances of reader, MyDataSourceReader@69fa5536 and MyDataSourceReader@3095c449 are being created. Consequently schema is null in MyDataSourceReader@3095c449.

Am I not doing it the correct way?

Thanks,
Shubham

On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <[hidden email]> wrote:

I am using v2.4.0-RC2

 

The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How are you calling it?

 

When I do:

Val df = spark.read.format(mypackage).load().show()

I am getting a single creation, how are you creating the reader?

 

Thanks,

        Assaf

 

From: Shubham Chaurasia [mailto:[hidden email]]
Sent: Tuesday, October 9, 2018 2:02 PM
To: Mendelson, Assaf; [hidden email]
Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

Thanks Assaf, you tried with tags/v2.4.0-rc2?

 

Full Code:

 

MyDataSource is the entry point which simply creates Reader and Writer

 

public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport {

 

  @Override public DataSourceReader createReader(DataSourceOptions options) {

    return new MyDataSourceReader(options.asMap());

  }

 

  @Override

  public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,

      SaveMode mode, DataSourceOptions options) {

    // creates a dataSourcewriter here..

    return Optional.of(dataSourcewriter);

  }

 

  @Override public String keyPrefix() {

    return "myprefix";

  }

 

}

 

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

 

  StructType schema = null;

  Map<String, String> options;

 

  public MyDataSourceReader(Map<String, String> options) {

    System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);

    this.options = options;

  }

 

  @Override

  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {

    //variable this.schema is null here since readSchema() was called on a different instance

    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);

    //more logic......

    return null;

  }

 

  @Override

  public StructType readSchema() {

    //some logic to discover schema

    this.schema = (new StructType())

        .add("col1", "int")

        .add("col2", "string");

    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);

    return this.schema;

  }

}

 

Thanks,

Shubham

 

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <[hidden email]> wrote:

Could you add a fuller code example? I tried to reproduce it in my environment and I am getting just one instance of the reader…

 

Thanks,

        Assaf

 

From: Shubham Chaurasia [mailto:[hidden email]]
Sent: Tuesday, October 9, 2018 9:31 AM
To: [hidden email]
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

Hi All,

 

--Spark built with tags/v2.4.0-rc2

 

Consider following DataSourceReader implementation:

 

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType
schema = null;
  Map<String, String>
options;

 
public MyDataSourceReader(Map<String, String> options) {
    System.
out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
   
this.options = options;
  }

 
@Override
 
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
   
//variable this.schema is null here since readSchema() was called on a different instance
   
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
   
//more logic......
   
return null;
  }

 
@Override
 
public StructType readSchema() {
   
//some logic to discover schema
   
this.schema = (new StructType())
        .add(
"col1", "int")
        .add(
"col2", "string");
    System.
out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
   
return this.schema;
  }
}
1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
 
How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
 
Console Logs:
 
scala> mysource.executeQuery("select * from movie").show
 
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null

 

Thanks,

Shubham

 
Reply | Threaded
Open this post in threaded view
|

Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

Jörn Franke
Generally please avoid System.out.println, but use a logger -even for examples. People may take these examples from here and put it in their production code.

Am 09.10.2018 um 15:39 schrieb Shubham Chaurasia <[hidden email]>:

Alright, so it is a big project which uses a SQL store underneath.
I extracted out the minimal code and made a smaller project out of it and still it is creating multiple instances. 

Here is my project:

├── my-datasource.iml
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── com
│   │   │       └── shubham
│   │   │           ├── MyDataSource.java
│   │   │           └── reader
│   │   │               └── MyDataSourceReader.java


MyDataSource.java
-------------------------------------------------
package com.shubham;

import com.shubham.reader.MyDataSourceReader;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;

import java.util.Optional;

public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport {

public DataSourceReader createReader(DataSourceOptions options) {
System.out.println("MyDataSource.createReader: Going to create a new MyDataSourceReader");
return new MyDataSourceReader(options.asMap());
}

public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
return Optional.empty();
}
}

MyDataSourceReader.java
-------------------------------------------------
package com.shubham.reader;

import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

private Map<String, String> options;
private StructType schema;

public MyDataSourceReader(Map<String, String> options) {
System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
this.options = options;
}

@Override
public StructType readSchema() {
this.schema = (new StructType())
.add("col1", "int")
.add("col2", "string");
System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
return this.schema;
}

@Override
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
return new ArrayList<>();
}
}

----------------------------------------
spark-shell output
----------------------------------------
scala> spark.read.format("com.shubham.MyDataSource").option("query", "select * from some_table").load.show

MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536
MyDataSourceReader.readSchema: com.shubham.reader.MyDataSourceReader@69fa5536 schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
MyDataSource.createReader: Going to create a new MyDataSourceReader
MyDataSourceReader.MyDataSourceReader: Instantiated....com.shubham.reader.MyDataSourceReader@3095c449
MyDataSourceReader.planBatchInputPartitions: com.shubham.reader.MyDataSourceReader@3095c449 schema: null
+----+----+
|col1|col2|
+----+----+
+----+----+


Here 2 instances of reader, MyDataSourceReader@69fa5536 and MyDataSourceReader@3095c449 are being created. Consequently schema is null in MyDataSourceReader@3095c449.

Am I not doing it the correct way?

Thanks,
Shubham

On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <[hidden email]> wrote:

I am using v2.4.0-RC2

 

The code as is wouldn’t run (e.g. planBatchInputPartitions returns null). How are you calling it?

 

When I do:

Val df = spark.read.format(mypackage).load().show()

I am getting a single creation, how are you creating the reader?

 

Thanks,

        Assaf

 

From: Shubham Chaurasia [mailto:[hidden email]]
Sent: Tuesday, October 9, 2018 2:02 PM
To: Mendelson, Assaf; [hidden email]
Subject: Re: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

Thanks Assaf, you tried with tags/v2.4.0-rc2?

 

Full Code:

 

MyDataSource is the entry point which simply creates Reader and Writer

 

public class MyDataSource implements DataSourceV2, WriteSupport, ReadSupport, SessionConfigSupport {

 

  @Override public DataSourceReader createReader(DataSourceOptions options) {

    return new MyDataSourceReader(options.asMap());

  }

 

  @Override

  public Optional<DataSourceWriter> createWriter(String jobId, StructType schema,

      SaveMode mode, DataSourceOptions options) {

    // creates a dataSourcewriter here..

    return Optional.of(dataSourcewriter);

  }

 

  @Override public String keyPrefix() {

    return "myprefix";

  }

 

}

 

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

 

  StructType schema = null;

  Map<String, String> options;

 

  public MyDataSourceReader(Map<String, String> options) {

    System.out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);

    this.options = options;

  }

 

  @Override

  public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {

    //variable this.schema is null here since readSchema() was called on a different instance

    System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);

    //more logic......

    return null;

  }

 

  @Override

  public StructType readSchema() {

    //some logic to discover schema

    this.schema = (new StructType())

        .add("col1", "int")

        .add("col2", "string");

    System.out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);

    return this.schema;

  }

}

 

Thanks,

Shubham

 

On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <[hidden email]> wrote:

Could you add a fuller code example? I tried to reproduce it in my environment and I am getting just one instance of the reader…

 

Thanks,

        Assaf

 

From: Shubham Chaurasia [mailto:[hidden email]]
Sent: Tuesday, October 9, 2018 9:31 AM
To: [hidden email]
Subject: DataSourceV2 APIs creating multiple instances of DataSourceReader and hence not preserving the state

 

[EXTERNAL EMAIL]
Please report any suspicious attachments, links, or requests for sensitive information.

Hi All,

 

--Spark built with tags/v2.4.0-rc2

 

Consider following DataSourceReader implementation:

 

public class MyDataSourceReader implements DataSourceReader, SupportsScanColumnarBatch {

  StructType
schema = null;
  Map<String, String>
options;

 
public MyDataSourceReader(Map<String, String> options) {
    System.
out.println("MyDataSourceReader.MyDataSourceReader: Instantiated...." + this);
   
this.options = options;
  }

 
@Override
 
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
   
//variable this.schema is null here since readSchema() was called on a different instance
   
System.out.println("MyDataSourceReader.planBatchInputPartitions: " + this + " schema: " + this.schema);
   
//more logic......
   
return null;
  }

 
@Override
 
public StructType readSchema() {
   
//some logic to discover schema
   
this.schema = (new StructType())
        .add(
"col1", "int")
        .add(
"col2", "string");
    System.
out.println("MyDataSourceReader.readSchema: " + this + " schema: " + this.schema);
   
return this.schema;
  }
}
1) First readSchema() is called on MyDataSourceReader@instance1 which sets class variable schema.
2) Now when planBatchInputPartitions() is called, it is being called on a different instance of MyDataSourceReader and hence I am not getting the value of schema in method planBatchInputPartitions().
 
How can I get value of schema which was set in readSchema() method, in planBatchInputPartitions() method?
 
Console Logs:
 
scala> mysource.executeQuery("select * from movie").show
 
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: StructType(StructField(col1,IntegerType,true), StructField(col2,StringType,true))
MyDataSourceReader.MyDataSourceReader: [hidden email]
MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff schema: null

 

Thanks,

Shubham