Left Join at SQL query gets planned as inner join

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Left Join at SQL query gets planned as inner join

Roland Johann
Hi All,


we are on vanilla Spark 2.4.4 and currently experience a somehow strange behavior of the query planner/optimizer and therefore get wrong results.

select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
limit 1
This query leads to that plan:
*(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, event_id#12209]
+- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- *(1) Project [event_id#12131, query_string#12178]
: +- *(1) Filter isnotnull(event_id#12131)
: +- *(1) FileScan parquet s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/..., PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., PushedFilters: [IsNotNull(event_id)], ReadSchema: struct<event_id:string,query_string:string>
+- *(2) Project [event_id#12209, source_event_id#12221]
+- *(2) Filter isnotnull(source_event_id#12221)
+- *(2) FileScan parquet s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: struct<event_id:string,source_event_id:string>
Without partition pruning the join gets planned as LeftOuter, with SortMergeJoin but we need partition pruning in this case to prevent full table scans and profit from broadcast join...

As soon as we rewrite the query with scala the plan looks fine
val s = spark.sql("select event_id, query_string from ssi_kpi.search where year = 2020 and month = 4 and day = 29")
val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where year = 2020 and month = 4 and day = 29")

s
.join(p, s("event_id") <=> p("source_event_id"), "left_outer")
.groupBy(s("query_string"))
.agg(count(s("query_string")), count(p("event_id")))
.show()


The second thing we saw that conditions at the where clause of joined tables gets pushed down to the parquet files and lead to wring results, for example:
select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
and p.event_id is null
Until now I assumed that the string based queries and the scala dsl lead to the same execution plan. Can someone point to docs about the internals of this topic of spark? The official docs about SQL in general are not that verbose.

Thanks in advance and stay safe!

Roland Johann
Reply | Threaded
Open this post in threaded view
|

Re: Left Join at SQL query gets planned as inner join

randy clinton
Does it still plan an inner join if you remove a filter on both tables?

It seems like you are asking for a left join, but your filters demand the behavior of an inner join.

Maybe you could do the filters on the tables first and then join them.

Something roughly like..

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)

output = s_DF.join(p_DF, event_id == source_event_id, left)



On Thu, Apr 30, 2020 at 11:06 AM Roland Johann <[hidden email]> wrote:
Hi All,


we are on vanilla Spark 2.4.4 and currently experience a somehow strange behavior of the query planner/optimizer and therefore get wrong results.

select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
limit 1
This query leads to that plan:
*(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, event_id#12209]
+- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- *(1) Project [event_id#12131, query_string#12178]
: +- *(1) Filter isnotnull(event_id#12131)
: +- *(1) FileScan parquet s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/..., PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., PushedFilters: [IsNotNull(event_id)], ReadSchema: struct<event_id:string,query_string:string>
+- *(2) Project [event_id#12209, source_event_id#12221]
+- *(2) Filter isnotnull(source_event_id#12221)
+- *(2) FileScan parquet s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: struct<event_id:string,source_event_id:string>
Without partition pruning the join gets planned as LeftOuter, with SortMergeJoin but we need partition pruning in this case to prevent full table scans and profit from broadcast join...

As soon as we rewrite the query with scala the plan looks fine
val s = spark.sql("select event_id, query_string from ssi_kpi.search where year = 2020 and month = 4 and day = 29")
val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where year = 2020 and month = 4 and day = 29")

s
.join(p, s("event_id") <=> p("source_event_id"), "left_outer")
.groupBy(s("query_string"))
.agg(count(s("query_string")), count(p("event_id")))
.show()


The second thing we saw that conditions at the where clause of joined tables gets pushed down to the parquet files and lead to wring results, for example:
select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
and p.event_id is null
Until now I assumed that the string based queries and the scala dsl lead to the same execution plan. Can someone point to docs about the internals of this topic of spark? The official docs about SQL in general are not that verbose.

Thanks in advance and stay safe!

Roland Johann


--
I appreciate your time,
Reply | Threaded
Open this post in threaded view
|

Re: Left Join at SQL query gets planned as inner join

Roland Johann
Thank for quick reply.

It plans the LeftOuter as soon as the filters on the second table will be removed.

It seems like you are asking for a left join, but your filters demand the behavior of an inner join. 
Can you explain that?
The filters on the second table uses partition pruning that we don’t have to do a full table scan to only get the data for that one day of the second table. To be more precise: We want to left outer join the two tables records of the same day and join on id properties where the second table must not contain records matching the join condition.

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)
that points to the second question of mine: event_id is null should be done after the join to get records of the first table that doesn’t match to records of the first table. The plan actually prints that it pushes down that filter to parquet and doesn’t select it anyway, so the entire result set is empty caused by the additional inner join. The desired behavior can be achieved by left anti joins but that’s not the point as the where condition behaves differently that one would expect.

I hope that this doesn’t gets confusing that we talk about two different, but somehow related, problems within a single thread..

Best
Roland


Am 30.04.2020 um 17:20 schrieb randy clinton <[hidden email]>:

Does it still plan an inner join if you remove a filter on both tables?

It seems like you are asking for a left join, but your filters demand the behavior of an inner join.

Maybe you could do the filters on the tables first and then join them.

Something roughly like..

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)

output = s_DF.join(p_DF, event_id == source_event_id, left)



On Thu, Apr 30, 2020 at 11:06 AM Roland Johann <[hidden email]> wrote:
Hi All,


we are on vanilla Spark 2.4.4 and currently experience a somehow strange behavior of the query planner/optimizer and therefore get wrong results.

select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
limit 1
This query leads to that plan:
*(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, event_id#12209]
+- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- *(1) Project [event_id#12131, query_string#12178]
: +- *(1) Filter isnotnull(event_id#12131)
: +- *(1) FileScan parquet s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/..., PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., PushedFilters: [IsNotNull(event_id)], ReadSchema: struct<event_id:string,query_string:string>
+- *(2) Project [event_id#12209, source_event_id#12221]
+- *(2) Filter isnotnull(source_event_id#12221)
+- *(2) FileScan parquet s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: struct<event_id:string,source_event_id:string>
Without partition pruning the join gets planned as LeftOuter, with SortMergeJoin but we need partition pruning in this case to prevent full table scans and profit from broadcast join...

As soon as we rewrite the query with scala the plan looks fine
val s = spark.sql("select event_id, query_string from ssi_kpi.search where year = 2020 and month = 4 and day = 29")
val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where year = 2020 and month = 4 and day = 29")

s
.join(p, s("event_id") <=> p("source_event_id"), "left_outer")
.groupBy(s("query_string"))
.agg(count(s("query_string")), count(p("event_id")))
.show()


The second thing we saw that conditions at the where clause of joined tables gets pushed down to the parquet files and lead to wring results, for example:
select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
and p.event_id is null
Until now I assumed that the string based queries and the scala dsl lead to the same execution plan. Can someone point to docs about the internals of this topic of spark? The official docs about SQL in general are not that verbose.

Thanks in advance and stay safe!

Roland Johann


--
I appreciate your time,

Reply | Threaded
Open this post in threaded view
|

Re: Left Join at SQL query gets planned as inner join

Ryan C. Kleck
He’s saying you need to move the filters for the ‘p’ table in order to do what you want. They need to be before your WHERE. The order of operations in sql applies your join clause filters before the WHERE. The filters on your ‘s’ table need to stay in the WHERE. It’s the only time the ordering matters when you are doing OUTER JOINs.

Intuitively with your query why would you do an OUTER JOIN meaning you think some things in ‘p’ will be NULL after your join. But your filter in the WHERE gets rid of all the Nulls in ‘p’.  You basically force it into an Inner join with that filter and the planner recognizes that.

Ryan Kleck
Software Developer IV
Customer Knowledge Platform

From: Roland Johann <[hidden email]>
Sent: Thursday, April 30, 2020 8:30:05 AM
To: randy clinton <[hidden email]>
Cc: Roland Johann <[hidden email]>; user <[hidden email]>
Subject: Re: Left Join at SQL query gets planned as inner join
 
Notice: This email is from an external sender.

 

Thank for quick reply.

It plans the LeftOuter as soon as the filters on the second table will be removed.

It seems like you are asking for a left join, but your filters demand the behavior of an inner join. 
Can you explain that?
The filters on the second table uses partition pruning that we don’t have to do a full table scan to only get the data for that one day of the second table. To be more precise: We want to left outer join the two tables records of the same day and join on id properties where the second table must not contain records matching the join condition.

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)
that points to the second question of mine: event_id is null should be done after the join to get records of the first table that doesn’t match to records of the first table. The plan actually prints that it pushes down that filter to parquet and doesn’t select it anyway, so the entire result set is empty caused by the additional inner join. The desired behavior can be achieved by left anti joins but that’s not the point as the where condition behaves differently that one would expect.

I hope that this doesn’t gets confusing that we talk about two different, but somehow related, problems within a single thread..

Best
Roland


Am 30.04.2020 um 17:20 schrieb randy clinton <[hidden email]>:

Does it still plan an inner join if you remove a filter on both tables?

It seems like you are asking for a left join, but your filters demand the behavior of an inner join.

Maybe you could do the filters on the tables first and then join them.

Something roughly like..

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)

output = s_DF.join(p_DF, event_id == source_event_id, left)



On Thu, Apr 30, 2020 at 11:06 AM Roland Johann <[hidden email]> wrote:
Hi All,


we are on vanilla Spark 2.4.4 and currently experience a somehow strange behavior of the query planner/optimizer and therefore get wrong results.

select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
limit 1
This query leads to that plan:
*(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, event_id#12209]
+- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- *(1) Project [event_id#12131, query_string#12178]
: +- *(1) Filter isnotnull(event_id#12131)
: +- *(1) FileScan parquet s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/..., PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., PushedFilters: [IsNotNull(event_id)], ReadSchema: struct<event_id:string,query_string:string>
+- *(2) Project [event_id#12209, source_event_id#12221]
+- *(2) Filter isnotnull(source_event_id#12221)
+- *(2) FileScan parquet s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: struct<event_id:string,source_event_id:string>
Without partition pruning the join gets planned as LeftOuter, with SortMergeJoin but we need partition pruning in this case to prevent full table scans and profit from broadcast join...

As soon as we rewrite the query with scala the plan looks fine
val s = spark.sql("select event_id, query_string from ssi_kpi.search where year = 2020 and month = 4 and day = 29")
val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where year = 2020 and month = 4 and day = 29")

s
.join(p, s("event_id") <=> p("source_event_id"), "left_outer")
.groupBy(s("query_string"))
.agg(count(s("query_string")), count(p("event_id")))
.show()


The second thing we saw that conditions at the where clause of joined tables gets pushed down to the parquet files and lead to wring results, for example:
select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
and p.event_id is null
Until now I assumed that the string based queries and the scala dsl lead to the same execution plan. Can someone point to docs about the internals of this topic of spark? The official docs about SQL in general are not that verbose.

Thanks in advance and stay safe!

Roland Johann


--
I appreciate your time,

Reply | Threaded
Open this post in threaded view
|

Re: Left Join at SQL query gets planned as inner join

Roland Johann
Thank you, that’s absolutely right!
Getting the rows of `s` without matches in `p` is now not a problem anymore.

Have a nice day
Roland


Am 30.04.2020 um 17:36 schrieb Ryan C. Kleck <[hidden email]>:

He’s saying you need to move the filters for the ‘p’ table in order to do what you want. They need to be before your WHERE. The order of operations in sql applies your join clause filters before the WHERE. The filters on your ‘s’ table need to stay in the WHERE. It’s the only time the ordering matters when you are doing OUTER JOINs.

Intuitively with your query why would you do an OUTER JOIN meaning you think some things in ‘p’ will be NULL after your join. But your filter in the WHERE gets rid of all the Nulls in ‘p’.  You basically force it into an Inner join with that filter and the planner recognizes that.

Ryan Kleck
Software Developer IV
Customer Knowledge Platform

From: Roland Johann <[hidden email]>
Sent: Thursday, April 30, 2020 8:30:05 AM
To: randy clinton <[hidden email]>
Cc: Roland Johann <[hidden email]>; user <[hidden email]>
Subject: Re: Left Join at SQL query gets planned as inner join
 
Notice: This email is from an external sender.
 
Thank for quick reply.

It plans the LeftOuter as soon as the filters on the second table will be removed.

It seems like you are asking for a left join, but your filters demand the behavior of an inner join. 
Can you explain that?
The filters on the second table uses partition pruning that we don’t have to do a full table scan to only get the data for that one day of the second table. To be more precise: We want to left outer join the two tables records of the same day and join on id properties where the second table must not contain records matching the join condition.

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)
that points to the second question of mine: event_id is null should be done after the join to get records of the first table that doesn’t match to records of the first table. The plan actually prints that it pushes down that filter to parquet and doesn’t select it anyway, so the entire result set is empty caused by the additional inner join. The desired behavior can be achieved by left anti joins but that’s not the point as the where condition behaves differently that one would expect.

I hope that this doesn’t gets confusing that we talk about two different, but somehow related, problems within a single thread..

Best
Roland


Am 30.04.2020 um 17:20 schrieb randy clinton <[hidden email]>:

Does it still plan an inner join if you remove a filter on both tables?

It seems like you are asking for a left join, but your filters demand the behavior of an inner join.

Maybe you could do the filters on the tables first and then join them.

Something roughly like..

s_DF = s_DF.filter(year = 2020 and month = 4 and day = 29)
p_DF = p_DF.filter(year = 2020 and month = 4 and day = 29 and event_id is null)

output = s_DF.join(p_DF, event_id == source_event_id, left)



On Thu, Apr 30, 2020 at 11:06 AM Roland Johann <[hidden email]> wrote:
Hi All,


we are on vanilla Spark 2.4.4 and currently experience a somehow strange behavior of the query planner/optimizer and therefore get wrong results.

select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
limit 1
This query leads to that plan:
*(2) Project [event_id#12131 AS search_event_id#12118, query_string#12178, event_id#12209]
+- *(2) BroadcastHashJoin [event_id#12131], [source_event_id#12221], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- *(1) Project [event_id#12131, query_string#12178]
: +- *(1) Filter isnotnull(event_id#12131)
: +- *(1) FileScan parquet s[event_id#12131,query_string#12178,year#12194,month#12195,day#12196] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/search/year=2020/month=4/day=29/..., PartitionCount: 1, PartitionFilters: [isnotnull(year#12194), isnotnull(month#12195), isnotnull(day#12196), (year#12194 = 2020), (month..., PushedFilters: [IsNotNull(event_id)], ReadSchema: struct<event_id:string,query_string:string>
+- *(2) Project [event_id#12209, source_event_id#12221]
+- *(2) Filter isnotnull(source_event_id#12221)
+- *(2) FileScan parquet s[event_id#12209,source_event_id#12221,year#12308,month#12309,day#12310] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://<some-path>/p/year=2020/month=4/day=2..., PartitionCount: 1, PartitionFilters: [isnotnull(day#12310), isnotnull(year#12308), isnotnull(month#12309), (year#12308 = 2020), (month..., PushedFilters: [IsNotNull(source_event_id)], ReadSchema: struct<event_id:string,source_event_id:string>
Without partition pruning the join gets planned as LeftOuter, with SortMergeJoin but we need partition pruning in this case to prevent full table scans and profit from broadcast join...

As soon as we rewrite the query with scala the plan looks fine
val s = spark.sql("select event_id, query_string from ssi_kpi.search where year = 2020 and month = 4 and day = 29")
val p = spark.sql("select event_id, source_event_id from ssi_kpi.pda_show where year = 2020 and month = 4 and day = 29")

s
.join(p, s("event_id") <=> p("source_event_id"), "left_outer")
.groupBy(s("query_string"))
.agg(count(s("query_string")), count(p("event_id")))
.show()


The second thing we saw that conditions at the where clause of joined tables gets pushed down to the parquet files and lead to wring results, for example:
select
s.event_id as search_event_id,
s.query_string,
p.event_id
from s
left outer join p on s.event_id = p.source_event_id
where
s.year = 2020 and s.month = 4 and s.day = 29
and p.year = 2020 and p.month = 4 and p.day = 29
and p.event_id is null
Until now I assumed that the string based queries and the scala dsl lead to the same execution plan. Can someone point to docs about the internals of this topic of spark? The official docs about SQL in general are not that verbose.

Thanks in advance and stay safe!

Roland Johann


--
I appreciate your time,