Re: Implementing TableProvider in Spark 3.0

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: Implementing TableProvider in Spark 3.0

Timothy Bisson

Following up on this older thread.

Looking at the implementation of DataFrameWriter, it doesn't seem possible to use the schema from the dataframe itself when writing out a V2 interface?

In order to pass the dataframe schema to a datasourceV2 implementation, a custom write DataSource needs to extend FileDataSourceV2. However, in lookupV2Provider(), if the Datasource is FileDataSourceV2, isDefined will be be None, so isDefiend is always false. The result is saveToV1Source() is always called.


Snippet of  DataFrameWriter:save():
 val maybeV2Provider = lookupV2Provider()
    if (maybeV2Provider.isDefined) {
      val provider = maybeV2Provider.get
      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
        provider, df.sparkSession.sessionState.conf)
      val options = sessionOptions ++ extraOptions
      val dsOptions = new CaseInsensitiveStringMap(options.asJava)

      def getTable: Table = {
        // For file source, it's expensive to infer schema/partition at each write. Here we pass
        // the schema of input query and the user-specified partitioning to `getTable`. If the
        // query schema is not compatible with the existing data, the write can still success but
        // following reads would fail.
        if (provider.isInstanceOf[FileDataSourceV2]) {
          provider.getTable(
            df.schema.asNullable,
            partitioningAsV2.toArray,
            dsOptions.asCaseSensitiveMap())
        } else {
          DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema = None)
        }
      }

 

<quote author="Richard Xin-2">

 Saw

 

 

Sent from Yahoo Mail for iPhone

 

 

On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj &lt;<email>[hidden email]; wrote:

 

<!--#yiv2112349251 _filtered {} _filtered {} _filtered {}#yiv2112349251 #yiv2112349251 p.yiv2112349251MsoNormal, #yiv2112349251 li.yiv2112349251MsoNormal, #yiv2112349251 div.yiv2112349251MsoNormal {margin:0in;font-size:11.0pt;font-family:"Calibri", sans-serif;}#yiv2112349251 pre {margin:0in;margin-bottom:.0001pt;font-size:10.0pt;font-family:"Courier New";}#yiv2112349251 span.yiv2112349251EmailStyle17 {font-family:"Calibri", sans-serif;color:windowtext;}#yiv2112349251 span.yiv2112349251HTMLPreformattedChar {font-family:"Courier New";}#yiv2112349251 .yiv2112349251MsoChpDefault {font-family:"Calibri", sans-serif;} _filtered {}#yiv2112349251 div.yiv2112349251WordSection1 {}-->

Hello Spark Team

  

I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case of write- how do I get the user specified schema?

  

This is what I am trying to achieve-

 

  

valdata =Seq(

   Row("one",1,true,12.34,6L, date, Decimal(999.00), timestamp,2f, byteVal, shortVal),

   Row("two",1,false,13.34,7L, date, Decimal(3.3), timestamp,3.59f, byteVal, shortVal)

)

 

val schema = new StructType()

   .add(StructField("name", StringType,true))

   .add(StructField("id", IntegerType,true))

   .add(StructField("flag", BooleanType,true))

   .add(StructField("salary", DoubleType,true))

   .add(StructField("phone", LongType,true))

   .add(StructField("dob", DateType,true))

   .add(StructField("weight",  DecimalType(Constants.DECIMAL_PRECISION,7),true))

   .add(StructField("time", TimestampType,true))

   .add(StructField("float", FloatType,true))

   .add(StructField("byte", ByteType,true))

   .add(StructField("short", ShortType,true))

 

 

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),

   schema)

 

//Create a new manifest and add the entity to it

df.write.format("com.microsoft.cdm")

   .option("storage",storageAccountName)

   .option("container",outputContainer)

   .option("manifest","/root/default.manifest.cdm.json")

   .option("entity","TestEntity")

   .option("format","parquet")

   .option("compression","gzip")

   .option("appId",appid).option("appKey",appkey).option("tenantId",tenantid)

   .mode(SaveMode.Append)

   .save()

  

I have my custom DataSource Implementation as follows –

  

class DefaultSource extends DataSourceRegister with TableProvider  {

 

  override def shortName(): String = "spark-cdm"

 

  override def inferSchema(options: CaseInsensitiveStringMap): StructType = null

 

  override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = {

    getTable(null, null, options).partitioning

  }

 

  override def supportsExternalMetadata = true

 

  override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = {

    println(schema)

    new MysqlTable(schema, properties)

  }

}

  

I get null here. I am not sure how should I get the StructType I created on df.write.. Any help would be appreciated.

  

Thank and Regards,

Sricheta Ruj.

  

  

Thanks,

Sricheta Ruj

</quote>