Implementing TableProvider in Spark 3.0

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing TableProvider in Spark 3.0

Sricheta Ruj

Hello Spark Team

 

I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case of write- how do I get the user specified schema?

 

This is what I am trying to achieve-

 

val data = Seq(
   Row(
"one", 1, true, 12.34,6L, date, Decimal(999.00), timestamp, 2f, byteVal, shortVal),
   Row(
"two", 1, false, 13.34,7L, date, Decimal(3.3), timestamp, 3.59f, byteVal, shortVal)
)

val schema = new StructType()
   .add(StructField(
"name", StringType, true))
   .add(StructField(
"id", IntegerType, true))
   .add(StructField(
"flag", BooleanType, true))
   .add(StructField(
"salary", DoubleType, true))
   .add(StructField(
"phone", LongType, true))
   .add(StructField(
"dob", DateType, true))
   .add(StructField(
"weight",  DecimalType(Constants.DECIMAL_PRECISION,7), true))
   .add(StructField(
"time", TimestampType, true))
   .add(StructField(
"float", FloatType, true))
   .add(StructField(
"byte", ByteType, true))
   .add(StructField(
"short", ShortType, true))


val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
   schema)

//Create a new manifest and add the entity to it
df.write.format("com.microsoft.cdm")
   .option(
"storage", storageAccountName)
   .option(
"container", outputContainer)
   .option(
"manifest", "/root/default.manifest.cdm.json")
   .option(
"entity", "TestEntity")
   .option(
"format", "parquet")
   .option(
"compression", "gzip")
   .option(
"appId", appid).option("appKey", appkey).option("tenantId", tenantid)
   .mode(SaveMode.
Append)
   .save()

 

I have my custom DataSource Implementation as follows –

 

class DefaultSource extends DataSourceRegister with TableProvider  {

 
override def shortName(): String = "spark-cdm"

 
override def inferSchema(options: CaseInsensitiveStringMap): StructType = null

  override def
inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = {
    getTable(
null, null, options).partitioning
  }

 
override def supportsExternalMetadata = true

  override def
getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = {
    println(schema)
   
new MysqlTable(schema, properties)
  }
}

 

I get null here. I am not sure how should I get the StructType I created on df.write.. Any help would be appreciated.

 

Thank and Regards,

Sricheta Ruj.

 

 

Thanks,
Sricheta Ruj

Reply | Threaded
Open this post in threaded view
|

Re: Implementing TableProvider in Spark 3.0

Richard Xin-2
 Saw


Sent from Yahoo Mail for iPhone

On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj <[hidden email]> wrote:

Hello Spark Team

 

I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case of write- how do I get the user specified schema?

 

This is what I am trying to achieve-

 

val data = Seq(
   Row(
"one", 1, true, 12.34,6L, date, Decimal(999.00), timestamp, 2f, byteVal, shortVal),
   Row(
"two", 1, false, 13.34,7L, date, Decimal(3.3), timestamp, 3.59f, byteVal, shortVal)
)

val schema = new StructType()
   .add(StructField(
"name", StringType, true))
   .add(StructField(
"id", IntegerType, true))
   .add(StructField(
"flag", BooleanType, true))
   .add(StructField(
"salary", DoubleType, true))
   .add(StructField(
"phone", LongType, true))
   .add(StructField(
"dob", DateType, true))
   .add(StructField(
"weight",  DecimalType(Constants.DECIMAL_PRECISION,7), true))
   .add(StructField(
"time", TimestampType, true))
   .add(StructField(
"float", FloatType, true))
   .add(StructField(
"byte", ByteType, true))
   .add(StructField(
"short", ShortType, true))


val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
   schema)

//Create a new manifest and add the entity to it
df.write.format("com.microsoft.cdm")
   .option(
"storage", storageAccountName)
   .option(
"container", outputContainer)
   .option(
"manifest", "/root/default.manifest.cdm.json")
   .option(
"entity", "TestEntity")
   .option(
"format", "parquet")
   .option(
"compression", "gzip")
   .option(
"appId", appid).option("appKey", appkey).option("tenantId", tenantid)
   .mode(SaveMode.
Append)
   .save()

 

I have my custom DataSource Implementation as follows –

 

class DefaultSource extends DataSourceRegister with TableProvider  {

 
override def shortName(): String = "spark-cdm"

 
override def inferSchema(options: CaseInsensitiveStringMap): StructType = null

  override def
inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = {
    getTable(
null, null, options).partitioning
  }

 
override def supportsExternalMetadata = true

  override def
getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = {
    println(schema)
   
new MysqlTable(schema, properties)
  }
}

 

I get null here. I am not sure how should I get the StructType I created on df.write.. Any help would be appreciated.

 

Thank and Regards,

Sricheta Ruj.

 

 

Thanks,
Sricheta Ruj

Reply | Threaded
Open this post in threaded view
|

Re: Implementing TableProvider in Spark 3.0

tbisson
Following up on this older thread.

Looking a the implementation of DataFrameWriter, it doesn't seem possible to
use the schema from the dataframe itself when writing out a V2 interface?

In order to pass the dataframe schema to a datasourceV2 implementation, a
custom write DataSource needs to extend FileDataSourceV2. However, in
lookupV2Provider(), if the Datasource is FileDataSourceV2, isDefined will be
be None, so isDefiend is always false.


Snippet of  DataFrameWriter:save():
 val maybeV2Provider = lookupV2Provider()
    if (maybeV2Provider.isDefined) {
      val provider = maybeV2Provider.get
      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
        provider, df.sparkSession.sessionState.conf)
      val options = sessionOptions ++ extraOptions
      val dsOptions = new CaseInsensitiveStringMap(options.asJava)

      def getTable: Table = {
        // For file source, it's expensive to infer schema/partition at each
write. Here we pass
        // the schema of input query and the user-specified partitioning to
`getTable`. If the
        // query schema is not compatible with the existing data, the write
can still success but
        // following reads would fail.
        if (provider.isInstanceOf[FileDataSourceV2]) {
          provider.getTable(
            df.schema.asNullable,
            partitioningAsV2.toArray,
            dsOptions.asCaseSensitiveMap())
        } else {
          DataSourceV2Utils.getTableFromProvider(provider, dsOptions,
userSpecifiedSchema = None)
        }
      }


Richard Xin-2 wrote
>  Saw
>
>
> Sent from Yahoo Mail for iPhone
>
>
> On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj &lt;

> Sricheta.Ruj@.com

> &gt; wrote:
>
>  

> Hello Spark Team
>  
>   
>  
> I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in
> case of write- how do I get the user specified schema?
>  
>   
>  
> This is what I am trying to achieve-
>  
>   
>  
> valdata =Seq(
>    Row("one",1,true,12.34,6L, date, Decimal(999.00), timestamp,2f,
> byteVal, shortVal),
>    Row("two",1,false,13.34,7L, date, Decimal(3.3), timestamp,3.59f,
> byteVal, shortVal)
> )
>
> val schema = new StructType()
>    .add(StructField("name", StringType,true))
>    .add(StructField("id", IntegerType,true))
>    .add(StructField("flag", BooleanType,true))
>    .add(StructField("salary", DoubleType,true))
>    .add(StructField("phone", LongType,true))
>    .add(StructField("dob", DateType,true))
>    .add(StructField("weight", 
> DecimalType(Constants.DECIMAL_PRECISION,7),true))
>    .add(StructField("time", TimestampType,true))
>    .add(StructField("float", FloatType,true))
>    .add(StructField("byte", ByteType,true))
>    .add(StructField("short", ShortType,true))
>
>
> val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
>    schema)
>
> //Create a new manifest and add the entity to it
> df.write.format("com.microsoft.cdm")
>    .option("storage",storageAccountName)
>    .option("container",outputContainer)
>    .option("manifest","/root/default.manifest.cdm.json")
>    .option("entity","TestEntity")
>    .option("format","parquet")
>    .option("compression","gzip")
>   
> .option("appId",appid).option("appKey",appkey).option("tenantId",tenantid)
>    .mode(SaveMode.Append)
>    .save()
>  
>   
>  
> I have my custom DataSource Implementation as follows –
>  
>   
>  class DefaultSource extends DataSourceRegister with TableProvider  {
>
>   override def shortName(): String = "spark-cdm"
>
>   override def inferSchema(options: CaseInsensitiveStringMap): StructType
> = null
>
>   override def inferPartitioning(options: CaseInsensitiveStringMap):
> Array[Transform] = {
>     getTable(null, null, options).partitioning
>   }
>
>   override def supportsExternalMetadata = true
>
>   override def getTable(schema: StructType, partitioning:
> Array[Transform], properties: util.Map[String, String]): Table = {
>     println(schema)
>     new MysqlTable(schema, properties)
>   }
> }
>   
>  
> I get null here. I am not sure how should I get the StructType I created
> on df.write.. Any help would be appreciated.
>  
>   
>  
> Thank and Regards,
>  
> Sricheta Ruj.
>  
>   
>  
>   
>  
> Thanks,
> Sricheta Ruj





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]