|
|
Hello Spark Team
I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case of write- how do I get the user specified schema?
This is what I am trying to achieve-
val
data =
Seq(
Row("one",
1,
true,
12.34,6L,
date, Decimal(999.00), timestamp,
2f, byteVal, shortVal),
Row("two",
1,
false,
13.34,7L,
date, Decimal(3.3), timestamp,
3.59f, byteVal, shortVal)
)
val
schema =
new StructType()
.add(StructField("name", StringType,
true))
.add(StructField("id", IntegerType,
true))
.add(StructField("flag", BooleanType,
true))
.add(StructField("salary", DoubleType,
true))
.add(StructField("phone", LongType,
true))
.add(StructField("dob", DateType,
true))
.add(StructField("weight", DecimalType(Constants.DECIMAL_PRECISION,7),
true))
.add(StructField("time", TimestampType,
true))
.add(StructField("float", FloatType,
true))
.add(StructField("byte", ByteType,
true))
.add(StructField("short", ShortType,
true))
val
df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
//Create a new manifest and add the entity to it
df.write.format("com.microsoft.cdm")
.option("storage",
storageAccountName)
.option("container",
outputContainer)
.option("manifest",
"/root/default.manifest.cdm.json")
.option("entity",
"TestEntity")
.option("format",
"parquet")
.option("compression",
"gzip")
.option("appId",
appid).option("appKey",
appkey).option("tenantId",
tenantid)
.mode(SaveMode.Append)
.save()
I have my custom DataSource Implementation as follows –
class DefaultSource extends DataSourceRegister with TableProvider {
override def shortName(): String = "spark-cdm"
override def inferSchema(options: CaseInsensitiveStringMap): StructType = null
override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = { getTable(null, null, options).partitioning }
override def supportsExternalMetadata = true
override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { println(schema) new MysqlTable(schema, properties) } }
I get null here. I am not sure how should I get the StructType I created on df.write.. Any help would be appreciated.
Thank and Regards,
Sricheta Ruj.
Thanks,
Sricheta Ruj
|
|
Saw Sent from Yahoo Mail for iPhoneOn Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj <[hidden email]> wrote:
Hello Spark Team
I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in case of write- how do I get the user specified schema?
This is what I am trying to achieve-
val
data =
Seq(
Row("one",
1,
true,
12.34,6L,
date, Decimal(999.00), timestamp,
2f, byteVal, shortVal),
Row("two",
1,
false,
13.34,7L,
date, Decimal(3.3), timestamp,
3.59f, byteVal, shortVal)
)
val
schema =
new StructType()
.add(StructField("name", StringType,
true))
.add(StructField("id", IntegerType,
true))
.add(StructField("flag", BooleanType,
true))
.add(StructField("salary", DoubleType,
true))
.add(StructField("phone", LongType,
true))
.add(StructField("dob", DateType,
true))
.add(StructField("weight", DecimalType(Constants.DECIMAL_PRECISION,7),
true))
.add(StructField("time", TimestampType,
true))
.add(StructField("float", FloatType,
true))
.add(StructField("byte", ByteType,
true))
.add(StructField("short", ShortType,
true))
val
df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
//Create a new manifest and add the entity to it
df.write.format("com.microsoft.cdm")
.option("storage",
storageAccountName)
.option("container",
outputContainer)
.option("manifest",
"/root/default.manifest.cdm.json")
.option("entity",
"TestEntity")
.option("format",
"parquet")
.option("compression",
"gzip")
.option("appId",
appid).option("appKey",
appkey).option("tenantId",
tenantid)
.mode(SaveMode.Append)
.save()
I have my custom DataSource Implementation as follows –
class DefaultSource extends DataSourceRegister with TableProvider {
override def shortName(): String = "spark-cdm"
override def inferSchema(options: CaseInsensitiveStringMap): StructType = null
override def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] = { getTable(null, null, options).partitioning }
override def supportsExternalMetadata = true
override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { println(schema) new MysqlTable(schema, properties) } }
I get null here. I am not sure how should I get the StructType I created on df.write.. Any help would be appreciated.
Thank and Regards,
Sricheta Ruj.
Thanks,
Sricheta Ruj
|
|
Following up on this older thread.
Looking a the implementation of DataFrameWriter, it doesn't seem possible to
use the schema from the dataframe itself when writing out a V2 interface?
In order to pass the dataframe schema to a datasourceV2 implementation, a
custom write DataSource needs to extend FileDataSourceV2. However, in
lookupV2Provider(), if the Datasource is FileDataSourceV2, isDefined will be
be None, so isDefiend is always false.
Snippet of DataFrameWriter:save():
val maybeV2Provider = lookupV2Provider()
if (maybeV2Provider.isDefined) {
val provider = maybeV2Provider.get
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
def getTable: Table = {
// For file source, it's expensive to infer schema/partition at each
write. Here we pass
// the schema of input query and the user-specified partitioning to
`getTable`. If the
// query schema is not compatible with the existing data, the write
can still success but
// following reads would fail.
if (provider.isInstanceOf[FileDataSourceV2]) {
provider.getTable(
df.schema.asNullable,
partitioningAsV2.toArray,
dsOptions.asCaseSensitiveMap())
} else {
DataSourceV2Utils.getTableFromProvider(provider, dsOptions,
userSpecifiedSchema = None)
}
}
Richard Xin-2 wrote
> Saw
>
>
> Sent from Yahoo Mail for iPhone
>
>
> On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj <
> Sricheta.Ruj@.com
> > wrote:
>
>
> Hello Spark Team
>
>
>
> I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in
> case of write- how do I get the user specified schema?
>
>
>
> This is what I am trying to achieve-
>
>
>
> valdata =Seq(
> Row("one",1,true,12.34,6L, date, Decimal(999.00), timestamp,2f,
> byteVal, shortVal),
> Row("two",1,false,13.34,7L, date, Decimal(3.3), timestamp,3.59f,
> byteVal, shortVal)
> )
>
> val schema = new StructType()
> .add(StructField("name", StringType,true))
> .add(StructField("id", IntegerType,true))
> .add(StructField("flag", BooleanType,true))
> .add(StructField("salary", DoubleType,true))
> .add(StructField("phone", LongType,true))
> .add(StructField("dob", DateType,true))
> .add(StructField("weight",
> DecimalType(Constants.DECIMAL_PRECISION,7),true))
> .add(StructField("time", TimestampType,true))
> .add(StructField("float", FloatType,true))
> .add(StructField("byte", ByteType,true))
> .add(StructField("short", ShortType,true))
>
>
> val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
> schema)
>
> //Create a new manifest and add the entity to it
> df.write.format("com.microsoft.cdm")
> .option("storage",storageAccountName)
> .option("container",outputContainer)
> .option("manifest","/root/default.manifest.cdm.json")
> .option("entity","TestEntity")
> .option("format","parquet")
> .option("compression","gzip")
>
> .option("appId",appid).option("appKey",appkey).option("tenantId",tenantid)
> .mode(SaveMode.Append)
> .save()
>
>
>
> I have my custom DataSource Implementation as follows –
>
>
> class DefaultSource extends DataSourceRegister with TableProvider {
>
> override def shortName(): String = "spark-cdm"
>
> override def inferSchema(options: CaseInsensitiveStringMap): StructType
> = null
>
> override def inferPartitioning(options: CaseInsensitiveStringMap):
> Array[Transform] = {
> getTable(null, null, options).partitioning
> }
>
> override def supportsExternalMetadata = true
>
> override def getTable(schema: StructType, partitioning:
> Array[Transform], properties: util.Map[String, String]): Table = {
> println(schema)
> new MysqlTable(schema, properties)
> }
> }
>
>
> I get null here. I am not sure how should I get the StructType I created
> on df.write.. Any help would be appreciated.
>
>
>
> Thank and Regards,
>
> Sricheta Ruj.
>
>
>
>
>
> Thanks,
> Sricheta Ruj
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Did you guys find a way to retrieve schema while saving into external
database? I'm also stuck at the same place without any clear path forward.
Thanks,
Rahul
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|