Re: Implementing TableProvider in Spark 3.0

Re: Implementing TableProvider in Spark 3.0

Timothy Bisson

Following up on this older thread.

Looking at the implementation of DataFrameWriter, it doesn't seem possible to use the schema from the dataframe itself when writing out a V2 interface?

In order to pass the dataframe schema to a datasourceV2 implementation, a custom write DataSource needs to extend FileDataSourceV2. However, in lookupV2Provider(), if the Datasource is FileDataSourceV2, isDefined will be be None, so isDefiend is always false. The result is saveToV1Source() is always called.

Snippet of  DataFrameWriter:save():
 val maybeV2Provider = lookupV2Provider()
    if (maybeV2Provider.isDefined) {
      val provider = maybeV2Provider.get
      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
        provider, df.sparkSession.sessionState.conf)
      val options = sessionOptions ++ extraOptions
      val dsOptions = new CaseInsensitiveStringMap(options.asJava)

      def getTable: Table = {
        // For file source, it's expensive to infer schema/partition at each write. Here we pass
        // the schema of input query and the user-specified partitioning to `getTable`. If the
        // query schema is not compatible with the existing data, the write can still success but
        // following reads would fail.
        if (provider.isInstanceOf[FileDataSourceV2]) {
        } else {
          DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema = None)


quote author="Richard Xin-2"




On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj wrote:


Hello Spark Team


I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case of write- how do I get the user specified schema?


This is what I am trying to achieve-



valdata =Seq(

   Row("one",1,true,12.34,6L, date, Decimal(999.00), timestamp,2f, byteVal, shortVal),

   Row("two",1,false,13.34,7L, date, Decimal(3.3), timestamp,3.59f, byteVal, shortVal)



val schema = new StructType()

   .add(StructField("name", StringType,true))

   .add(StructField("id", IntegerType,true))

   .add(StructField("flag", BooleanType,true))

   .add(StructField("salary", DoubleType,true))

   .add(StructField("phone", LongType,true))

   .add(StructField("dob", DateType,true))

   .add(StructField("weight",  DecimalType(Constants.DECIMAL_PRECISION,7),true))

   .add(StructField("time", TimestampType,true))

   .add(StructField("float", FloatType,true))

   .add(StructField("byte", ByteType,true))

   .add(StructField("short", ShortType,true))



val df = spark.createDataFrame(spark.sparkContext.parallelize(data),



//Create a new manifest and add the entity to it












I have my custom DataSource Implementation as follows –


class DefaultSource extends DataSourceRegister with TableProvider  {


  override def shortName(): String = "spark-cdm"


  override def inferSchema(options: CaseInsensitiveStringMap): StructType = null


  override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = {

    getTable(null, null, options).partitioning



  override def supportsExternalMetadata = true


  override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = {


    new MysqlTable(schema, properties)




I get null here. I am not sure how should I get the StructType I created on df.write.. Any help would be appreciated.


Thank and Regards,

Sricheta Ruj.




Sricheta Ruj