Insertable records in Datasource v2.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Insertable records in Datasource v2.

Rahul Kumar
I'm implementing V2 datasource for a custom datasource.

I'm trying to insert a record into a temp view, in following fashion.

insertDFWithSchema.createOrReplaceTempView(sqlView)
spark.sql(s”insert into $sqlView  values (20000, ‘insert_record1’, 200,
23000), (20001, ‘insert_record2’, 201, 23001)“). where insertDFWithSchema is
some dataframe loaded from custom data source.


I end up getting following exception

*org.apache.spark.sql.AnalysisException: unresolved operator
'InsertIntoTable RelationV2* mydb[id#63, name#64, age#65, salary#66]
(Options:
[mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...),
false, false;;
'InsertIntoTable RelationV2 mydb[id#63, name#64, age#65, salary#66]
(Options:
[mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...),
false, false
+- LocalRelation [col1#88, col2#89, col3#90, col4#91]
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
  at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
  at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
  at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
  ... 40 elided


In V1 datasource implementation, I had insertable trait in BaseRelation. In
v2, I'm not sure how it could be achieved. I have also tried implementing
insertable trait in DefaultSource.   Any input would be extremely helpful.

Thanks,
Rahul



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Insertable records in Datasource v2.

Jacek Laskowski
Hi Rahul,

I think it's not and will not be supported. You should report an issue in JIRA at https://issues.apache.org/jira/projects/SPARK, but don't expect a solution with temporary views though.

I did manage to reproduce the AnalysisException: unresolved operator and even though INSERT INTO views is not allowed (according to ResolveRelations) that did not get triggered.

Just like you I thought I could get past this exception with InsertableRelation, but alas it didn't work either.

The temporary view based on the custom data source is indeed resolved, but InsertIntoTable logical operator did not that leads to the exception you've been facing:

unresolved operator 'InsertIntoTable RelationV2 custom[id#0L, name#1, v#2] (Options: [paths=[]]), false, false;;
'InsertIntoTable RelationV2 custom[id#0L, name#1, v#2] (Options: [paths=[]]), false, false
+- Project [_1#56 AS id#60, _2#57 AS name#61, _3#58 AS v#62]
   +- LocalRelation [_1#56, _2#57, _3#58]

By the way, this insert could also be expressed using Scala API as follows:

    Seq((20000, "insert_record1", 200), (20001, "insert_record2", 201))
      .toDF(insertDFWithSchema.schema.names: _*)
      .write
      .insertInto(sqlView)

In summary, you should report this to JIRA, but don't expect this get fixed other than to catch this case just to throw this exception from ResolveRelations: Inserting into a view is not allowed"

Unless I'm mistaken...

On Thu, Jan 14, 2021 at 3:52 AM Rahul Kumar <[hidden email]> wrote:
I'm implementing V2 datasource for a custom datasource.

I'm trying to insert a record into a temp view, in following fashion.

insertDFWithSchema.createOrReplaceTempView(sqlView)
spark.sql(s”insert into $sqlView  values (20000, ‘insert_record1’, 200,
23000), (20001, ‘insert_record2’, 201, 23001)“). where insertDFWithSchema is
some dataframe loaded from custom data source.


I end up getting following exception

*org.apache.spark.sql.AnalysisException: unresolved operator
'InsertIntoTable RelationV2* mydb[id#63, name#64, age#65, salary#66]
(Options:
[mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...),
false, false;;
'InsertIntoTable RelationV2 mydb[id#63, name#64, age#65, salary#66]
(Options:
[mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...),
false, false
+- LocalRelation [col1#88, col2#89, col3#90, col4#91]
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
  at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
  at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
  at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
  ... 40 elided


In V1 datasource implementation, I had insertable trait in BaseRelation. In
v2, I'm not sure how it could be achieved. I have also tried implementing
insertable trait in DefaultSource.   Any input would be extremely helpful.

Thanks,
Rahul



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Insertable records in Datasource v2.

Rahul Kumar
Thank you Jacek, for trying it out and clarifying. Appreciate it.

Best,
Rahul



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]