what is the right syntax for self joins in Spark 2.3.0 ?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

what is the right syntax for self joins in Spark 2.3.0 ?

kant kodali
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 






Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

kant kodali
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 







Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

kant kodali
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 








Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

Tathagata Das
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 









Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

kant kodali
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 










Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

Tathagata Das
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 











Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

kant kodali
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok with the complete fix being available in master or some branch. I guess the solution for me is to just build from the source. 

On a similar note, I am not finding any JIRA tickets related to full outer joins and update mode for maybe say Spark 2.3. I wonder how hard is it two implement both of these? It turns out the update mode and full outer join is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <[hidden email]> wrote:
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 












Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

kant kodali
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok with the complete fix being available in master or some branch. I guess the solution for me is to just build from the source. 

On a similar note, I am not finding any JIRA tickets related to full outer joins and update mode for maybe say Spark 2.3. I wonder how hard is it two implement both of these? It turns out the update mode and full outer join is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <[hidden email]> wrote:
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 













Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

kant kodali
It looks to me that the StateStore described in this doc Actually has full outer join and every other join is a filter of that. Also the doc talks about update mode but looks like Spark 2.3 ended up with append mode? Anyways the moment it is in master I am ready to test so JIRA tickets on this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <[hidden email]> wrote:
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok with the complete fix being available in master or some branch. I guess the solution for me is to just build from the source. 

On a similar note, I am not finding any JIRA tickets related to full outer joins and update mode for maybe say Spark 2.3. I wonder how hard is it two implement both of these? It turns out the update mode and full outer join is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <[hidden email]> wrote:
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 














Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

Gourav Sengupta
super interesting.

On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <[hidden email]> wrote:
It looks to me that the StateStore described in this doc Actually has full outer join and every other join is a filter of that. Also the doc talks about update mode but looks like Spark 2.3 ended up with append mode? Anyways the moment it is in master I am ready to test so JIRA tickets on this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <[hidden email]> wrote:
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok with the complete fix being available in master or some branch. I guess the solution for me is to just build from the source. 

On a similar note, I am not finding any JIRA tickets related to full outer joins and update mode for maybe say Spark 2.3. I wonder how hard is it two implement both of these? It turns out the update mode and full outer join is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <[hidden email]> wrote:
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 















Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

Tathagata Das
This doc is unrelated to the stream-stream join we added in Structured Streaming. :)

That said we added append mode first because it easier to reason about the semantics of append mode especially in the context of outer joins. You output a row only when you know it wont be changed ever. The semantics of update mode in outer joins is trickier to reason about and expose through the APIs. Consider a left outer join. As soon as we get a left-side record with a key K that does not have a match, do we output (K, leftValue, null)? And if we do so, then later get 2 matches from the right side, we have to output (K, leftValue, rightValue1) and (K, leftValue, rightValue2). But how do we convey that rightValue1 and rightValue2 together replace the earlier null, rather than rightValue2 replacing rightValue1 replacing null?

We will figure these out in future releases. For now, we have released append mode, which allow quite a large range of use cases, including multiple cascading joins. 

TD



On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <[hidden email]> wrote:
super interesting.

On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <[hidden email]> wrote:
It looks to me that the StateStore described in this doc Actually has full outer join and every other join is a filter of that. Also the doc talks about update mode but looks like Spark 2.3 ended up with append mode? Anyways the moment it is in master I am ready to test so JIRA tickets on this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <[hidden email]> wrote:
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok with the complete fix being available in master or some branch. I guess the solution for me is to just build from the source. 

On a similar note, I am not finding any JIRA tickets related to full outer joins and update mode for maybe say Spark 2.3. I wonder how hard is it two implement both of these? It turns out the update mode and full outer join is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <[hidden email]> wrote:
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 
















Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

kant kodali
I will give an attempt to answer this.

since rightValue1 and rightValue2 have the same key "K"(two matches) why would it ever be the case rightValue2 replacing rightValue1 replacing null? Moreover, why does user need to care?

The result in this case (after getting 2 matches) should be

(K, leftValue, rightValue1)
(K, leftValue, rightValue2)

This basically means only one of them replaced the earlier null. which one of two? Depends on whichever arrived first. Other words, "null's" will be replaced by first matching row and subsequently, if there is a new matching row it will just be another row with the same key in the result table or if there a new unmatched row then the result table should have null's for the unmatched fields.

From a user perspective, I believe just spitting out nulls for every trigger until there is a match and when there is match spitting out the joined rows should suffice isn't it?

Sorry if my thoughts are too naive!










On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das <[hidden email]> wrote:
This doc is unrelated to the stream-stream join we added in Structured Streaming. :)

That said we added append mode first because it easier to reason about the semantics of append mode especially in the context of outer joins. You output a row only when you know it wont be changed ever. The semantics of update mode in outer joins is trickier to reason about and expose through the APIs. Consider a left outer join. As soon as we get a left-side record with a key K that does not have a match, do we output (K, leftValue, null)? And if we do so, then later get 2 matches from the right side, we have to output (K, leftValue, rightValue1) and (K, leftValue, rightValue2). But how do we convey that rightValue1 and rightValue2 together replace the earlier null, rather than rightValue2 replacing rightValue1 replacing null?

We will figure these out in future releases. For now, we have released append mode, which allow quite a large range of use cases, including multiple cascading joins. 

TD



On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <[hidden email]> wrote:
super interesting.

On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <[hidden email]> wrote:
It looks to me that the StateStore described in this doc Actually has full outer join and every other join is a filter of that. Also the doc talks about update mode but looks like Spark 2.3 ended up with append mode? Anyways the moment it is in master I am ready to test so JIRA tickets on this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <[hidden email]> wrote:
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok with the complete fix being available in master or some branch. I guess the solution for me is to just build from the source. 

On a similar note, I am not finding any JIRA tickets related to full outer joins and update mode for maybe say Spark 2.3. I wonder how hard is it two implement both of these? It turns out the update mode and full outer join is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <[hidden email]> wrote:
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks! 

















Reply | Threaded
Open this post in threaded view
|

Re: what is the right syntax for self joins in Spark 2.3.0 ?

Tathagata Das
You have understood the problem right. However note that your interpretation of the output (K, leftValue, null), (K, leftValue, rightValue1), (K, leftValue, rightValue2) is subject to the knowledge of the semantics of the join. That if you are processing the output rows *manually*, you are being aware that the operator is a join where you can make the semantics interpretation of "null replaced by first match, then all matches are just addition rows". This is however not a general solution for any sink, and for any operator. We need to find a way to expose these semantics through the APIs such that a sink can use it without the knowledge of exactly what operator is in the query writing to the sink. Therefore we still need some work before we can do join in update mode clearly. 

Hope that makes it clear. :)

On Sat, Mar 10, 2018 at 12:14 AM, kant kodali <[hidden email]> wrote:
I will give an attempt to answer this.

since rightValue1 and rightValue2 have the same key "K"(two matches) why would it ever be the case rightValue2 replacing rightValue1 replacing null? Moreover, why does user need to care?

The result in this case (after getting 2 matches) should be

(K, leftValue, rightValue1)
(K, leftValue, rightValue2)

This basically means only one of them replaced the earlier null. which one of two? Depends on whichever arrived first. Other words, "null's" will be replaced by first matching row and subsequently, if there is a new matching row it will just be another row with the same key in the result table or if there a new unmatched row then the result table should have null's for the unmatched fields.

From a user perspective, I believe just spitting out nulls for every trigger until there is a match and when there is match spitting out the joined rows should suffice isn't it?

Sorry if my thoughts are too naive!










On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das <[hidden email]> wrote:
This doc is unrelated to the stream-stream join we added in Structured Streaming. :)

That said we added append mode first because it easier to reason about the semantics of append mode especially in the context of outer joins. You output a row only when you know it wont be changed ever. The semantics of update mode in outer joins is trickier to reason about and expose through the APIs. Consider a left outer join. As soon as we get a left-side record with a key K that does not have a match, do we output (K, leftValue, null)? And if we do so, then later get 2 matches from the right side, we have to output (K, leftValue, rightValue1) and (K, leftValue, rightValue2). But how do we convey that rightValue1 and rightValue2 together replace the earlier null, rather than rightValue2 replacing rightValue1 replacing null?

We will figure these out in future releases. For now, we have released append mode, which allow quite a large range of use cases, including multiple cascading joins. 

TD



On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <[hidden email]> wrote:
super interesting.

On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <[hidden email]> wrote:
It looks to me that the StateStore described in this doc Actually has full outer join and every other join is a filter of that. Also the doc talks about update mode but looks like Spark 2.3 ended up with append mode? Anyways the moment it is in master I am ready to test so JIRA tickets on this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <[hidden email]> wrote:
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok with the complete fix being available in master or some branch. I guess the solution for me is to just build from the source. 

On a similar note, I am not finding any JIRA tickets related to full outer joins and update mode for maybe say Spark 2.3. I wonder how hard is it two implement both of these? It turns out the update mode and full outer join is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <[hidden email]> wrote:
I thought about it. 
I am not 100% sure whether this fix should go into 2.3.1. 

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation 
  - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. 
  - This touches core production code paths and therefore, may not safe to backport. 

Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <[hidden email]> wrote:
Hi TD,

I pulled your commit that is listed on this ticket https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the following steps and self joins work after I cherry-pick your commit! Good Job! I was hoping it will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(

git clone https://github.com/apache/spark.git
cd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn

On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <[hidden email]> wrote:
Hey, 

Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <[hidden email]> wrote:
if I change it to the below code it works. However, I don't believe it is the solution I am looking for. I want to be able to do it in raw SQL and moreover, If a user gives a big chained raw spark SQL join query I am not even sure how to make copies of the dataframe to achieve the self-join. Is there any other way here?



import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()

On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <[hidden email]> wrote:
If I change it to this 




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <[hidden email]> wrote:
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger 

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   :     +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
      +- SubqueryAlias table
         +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34]
any idea whats wrong here?

Thanks!