CBO not working?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

CBO not working?

Aelur Sadgod

Hi,

 

I’m using Spark 2.4.4 (on EMR)  to try and test the CBO on a partitioned external Hive table, files are saved as parquet.

 

I’ve set up the required configuration:

    .config("spark.sql.cbo.enabled","true")\

    .config("spark.sql.cbo.joinreorder.enabled","true")\

 

After running the analyse command for all columns:

spark.sql(f"""

ANALYZE TABLE mytable PARTITION(year,month,date)

COMPUTE STATISTICS FOR COLUMNS {','.join(spark.table(‘mytable').columns)}

""")

I run the describe extended:

spark.sql("DESCRIBE EXTENDED mytable").show(truncate=False, n=400)

 

and can only see table-level stats, not column-level


col_name  data_type                                         comment

Statistics   698171162 bytes, 19001020 rows

 

Running a sample query shows no usage of statistics whatsoever

spark.sql("""

select * from mytable  a, mytable b

where a.col = b.col and a.anothercol = 6566

""").explain(True)

 

== Parsed Logical Plan ==

'Project [*]

+- 'Filter (('a.col = 'b.col) && ('a.anothercol = 3462))

   +- 'Join Inner

      :- 'SubqueryAlias `a`

      :  +- 'UnresolvedRelation `mytable`

      +- 'SubqueryAlias `b`

         +- 'UnresolvedRelation `mytable`

 

== Analyzed Logical Plan ==

col: bigint, anothercol: bigint, ... 37 more fields]

+- Filter ((col#477L = col#497L) && (anothercol#479L = cast(3462 as bigint)))

   +- Join Inner

      :- SubqueryAlias `a`

      :  +- SubqueryAlias `mydb`.`mytable`

      :     +- Relation[col#477L,anothercol#479L...] parquet

      +- SubqueryAlias `b`

         +- SubqueryAlias `mydb`.`mytable`

            +- Relation[col#497L,anothercol#499L,...] parquet

 

== Optimized Logical Plan ==

Join Inner, (col#477L = col#497L)

:- Filter ((isnotnull(anothercol#479L) && (anothercol#479L = 3462)) && isnotnull(col#477L))

:  +- Relation[col#477L,anothercol#479L,...] parquet

+- Filter isnotnull(col#497L)

   +- Relation[col#497L,anothercol#499L,...] parquet

 

== Physical Plan ==

*(5) SortMergeJoin [col#477L], [col#497L], Inner

:- *(2) Sort [col#477L ASC NULLS FIRST], false, 0

:  +- Exchange hashpartitioning(col#477L, 1000)

:     +- *(1) Project [col#477L, anothercol#479L, ...]

:        +- *(1) Filter ((isnotnull(anothercol#479L) && (anothercol#479L = 3462)) && isnotnull(col#477L))

:           +- *(1) FileScan parquet mydb.mytable[col#477L,anothercol#479L...] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://bucket/prefix..., PartitionCount: 91, PartitionFilters: [], PushedFilters: [IsNotNull(anothercol), EqualTo(anothercol,3462), IsNotNull(col)], ReadSchema: struct<col:bigint,anothercol:bigint,...

+- *(4) Sort [col#497L ASC NULLS FIRST], false, 0

   +- Exchange hashpartitioning(col#497L, 1000)

      +- *(3) Project [col#497L, anothercol#499L, ...]

         +- *(3) Filter isnotnull(col#497L)

            +- *(3) FileScan parquet mydb.mytable[col#497L,anothercol#499L,...] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://bucket/prefix..., PartitionCount: 91, PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:bigint,anothercol:bigint,...

 

I see the following tickets on JIRA but no helpful information on whether they’re bugs or not

https://issues.apache.org/jira/browse/SPARK-29335

 

https://issues.apache.org/jira/browse/SPARK-25185

 

Is this a bug or is there something I’m missing?

 

Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: CBO not working?

Aelur Sadgod
Hi, 

I've continued my investigation and found it's an issue with pyspark's explain() not showing cost information - adding the EXPLAIN COST to the queries does show me rowCount information when turning the CBO on. 

Thanks


On Thu, Feb 20, 2020 at 1:36 PM Aelur Sadgod <[hidden email]> wrote:

Hi,

 

I’m using Spark 2.4.4 (on EMR)  to try and test the CBO on a partitioned external Hive table, files are saved as parquet.

 

I’ve set up the required configuration:

    .config("spark.sql.cbo.enabled","true")\

    .config("spark.sql.cbo.joinreorder.enabled","true")\

 

After running the analyse command for all columns:

spark.sql(f"""

ANALYZE TABLE mytable PARTITION(year,month,date)

COMPUTE STATISTICS FOR COLUMNS {','.join(spark.table(‘mytable').columns)}

""")

I run the describe extended:

spark.sql("DESCRIBE EXTENDED mytable").show(truncate=False, n=400)

 

and can only see table-level stats, not column-level


col_name  data_type                                         comment

Statistics   698171162 bytes, 19001020 rows

 

Running a sample query shows no usage of statistics whatsoever

spark.sql("""

select * from mytable  a, mytable b

where a.col = b.col and a.anothercol = 6566

""").explain(True)

 

== Parsed Logical Plan ==

'Project [*]

+- 'Filter (('a.col = 'b.col) && ('a.anothercol = 3462))

   +- 'Join Inner

      :- 'SubqueryAlias `a`

      :  +- 'UnresolvedRelation `mytable`

      +- 'SubqueryAlias `b`

         +- 'UnresolvedRelation `mytable`

 

== Analyzed Logical Plan ==

col: bigint, anothercol: bigint, ... 37 more fields]

+- Filter ((col#477L = col#497L) && (anothercol#479L = cast(3462 as bigint)))

   +- Join Inner

      :- SubqueryAlias `a`

      :  +- SubqueryAlias `mydb`.`mytable`

      :     +- Relation[col#477L,anothercol#479L...] parquet

      +- SubqueryAlias `b`

         +- SubqueryAlias `mydb`.`mytable`

            +- Relation[col#497L,anothercol#499L,...] parquet

 

== Optimized Logical Plan ==

Join Inner, (col#477L = col#497L)

:- Filter ((isnotnull(anothercol#479L) && (anothercol#479L = 3462)) && isnotnull(col#477L))

:  +- Relation[col#477L,anothercol#479L,...] parquet

+- Filter isnotnull(col#497L)

   +- Relation[col#497L,anothercol#499L,...] parquet

 

== Physical Plan ==

*(5) SortMergeJoin [col#477L], [col#497L], Inner

:- *(2) Sort [col#477L ASC NULLS FIRST], false, 0

:  +- Exchange hashpartitioning(col#477L, 1000)

:     +- *(1) Project [col#477L, anothercol#479L, ...]

:        +- *(1) Filter ((isnotnull(anothercol#479L) && (anothercol#479L = 3462)) && isnotnull(col#477L))

:           +- *(1) FileScan parquet mydb.mytable[col#477L,anothercol#479L...] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://bucket/prefix..., PartitionCount: 91, PartitionFilters: [], PushedFilters: [IsNotNull(anothercol), EqualTo(anothercol,3462), IsNotNull(col)], ReadSchema: struct<col:bigint,anothercol:bigint,...

+- *(4) Sort [col#497L ASC NULLS FIRST], false, 0

   +- Exchange hashpartitioning(col#497L, 1000)

      +- *(3) Project [col#497L, anothercol#499L, ...]

         +- *(3) Filter isnotnull(col#497L)

            +- *(3) FileScan parquet mydb.mytable[col#497L,anothercol#499L,...] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://bucket/prefix..., PartitionCount: 91, PartitionFilters: [], PushedFilters: [IsNotNull(col)], ReadSchema: struct<col:bigint,anothercol:bigint,...

 

I see the following tickets on JIRA but no helpful information on whether they’re bugs or not

https://issues.apache.org/jira/browse/SPARK-29335

 

https://issues.apache.org/jira/browse/SPARK-25185

 

Is this a bug or is there something I’m missing?

 

Thanks!