Why is Spark 3.0.x faster than Spark 3.1.x

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Why is Spark 3.0.x faster than Spark 3.1.x

maziyar
Hi,

I have a simple code that does a groupby, agg count, sort, etc. This code
finishes within 5 minutes on Spark 3.1.x. However, the same code, same
dataset, same SparkSession (configs) on Spark 3.0.2 will finish within a
minute. That is over 5x times the difference.

My SparkSession (same when it is used with --conf):

val spark: SparkSession = SparkSession
    .builder()
    .appName("test")
    .master("local[*]")
    .config("spark.driver.memory", "16G")
    .config("spark.driver.maxResultSize", "0")
    .config("spark.kryoserializer.buffer.max","200M")
    .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()

Environments which I tested both 3.1.1 and 3.0.2:
- Intellij
- spark-shell
- pyspark shell
- pure Python with PyPI pyspark

The code, dataset, and initial report for reproducibility:
https://github.com/JohnSnowLabs/spark-nlp/issues/2739#issuecomment-815635930

I have observed that in Spark 3.1.1, only 2 tasks are doing the majority of
the procession and it is not evenly distributed as one expects in a
12-partition DataFrame:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8277/114009725-af969e00-9863-11eb-8e5b-07ce53e8f5f3.png>

However, without any change in any line of code or environment, Spark 3.0.2
will evenly distribute the tasks at the same time and everything runs in
parallel:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8277/114009712-ac9bad80-9863-11eb-9e55-c797833bdbba.png>

Is there a new feature in Spark 3.1.1, a new config, something that causes
this unbalanced task execution which wasn't there before in Spark 2.4.x and
3.0.x? (I have read the migration guide but, could not find anything
relevant:
https://spark.apache.org/docs/latest/sql-migration-guide.html#upgrading-from-spark-sql-30-to-31)



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

Mich Talebzadeh
Hi,

I just looked through your code

  1. I assume you are testing this against spark 3.1.1?
  2. You are testing this set-up in a local mode in a single JVM, so it is not really distributed. I doubt whether a meaningful performance deduction can be made here
  3. It is accepted that newer versions of the product will offer more capabilities and hence they are expected to be more resource hungry but I concur obviously not 5 times slower
  4. You are reading a csv gzipped. The gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down (CPU intensive). After the read is done the data can be shuffled to increase parallelism.
  5. Intellij, Pycharm etc run in local mode anyway
  6. Have you tried $SPARK_HOME/bin.spark-submit --master local[something] xyz.py

FYI, I have used both Spark 3.0.1 and 3.1.1 both in local (spark-submit --master local) and in distributed mode (spark-submit --master yarn --deploy-mode client ..) and do not see this behaviour 

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 12:07, maziyar <[hidden email]> wrote:
Hi,

I have a simple code that does a groupby, agg count, sort, etc. This code
finishes within 5 minutes on Spark 3.1.x. However, the same code, same
dataset, same SparkSession (configs) on Spark 3.0.2 will finish within a
minute. That is over 5x times the difference.

My SparkSession (same when it is used with --conf):

val spark: SparkSession = SparkSession
    .builder()
    .appName("test")
    .master("local[*]")
    .config("spark.driver.memory", "16G")
    .config("spark.driver.maxResultSize", "0")
    .config("spark.kryoserializer.buffer.max","200M")
    .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()

Environments which I tested both 3.1.1 and 3.0.2:
- Intellij
- spark-shell
- pyspark shell
- pure Python with PyPI pyspark

The code, dataset, and initial report for reproducibility:
https://github.com/JohnSnowLabs/spark-nlp/issues/2739#issuecomment-815635930

I have observed that in Spark 3.1.1, only 2 tasks are doing the majority of
the procession and it is not evenly distributed as one expects in a
12-partition DataFrame:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8277/114009725-af969e00-9863-11eb-8e5b-07ce53e8f5f3.png>

However, without any change in any line of code or environment, Spark 3.0.2
will evenly distribute the tasks at the same time and everything runs in
parallel:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8277/114009712-ac9bad80-9863-11eb-9e55-c797833bdbba.png>

Is there a new feature in Spark 3.1.1, a new config, something that causes
this unbalanced task execution which wasn't there before in Spark 2.4.x and
3.0.x? (I have read the migration guide but, could not find anything
relevant:
https://spark.apache.org/docs/latest/sql-migration-guide.html#upgrading-from-spark-sql-30-to-31)



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

maziyar
Hi Mich,

Thanks for the reply.

I have tried to minimize as much a possible the effect of other factors
between pyspark==3.0.2 and pyspark==3.1.1 including not reading csv or gz
and just reading the Parquet.

Here is a code purely in pyspark (nothing else included) and it finishes
within 47 seconds in pyspark 3.1.1 and 15 seconds in pyspark 3.0.2: (still
the performance hit is very large!)

spark = SparkSession.builder \
        .master("local[*]") \
        .config("spark.driver.memory", "16G") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "2000m") \
        .getOrCreate()

Toys = spark.read \
  .parquet('./toys-cleaned').repartition(12)

# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText",
outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)

# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")

all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k
top50k =
all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)

top50k.show()

This is a local test, just different conda environments one for
pyspark==3.0.2 and one for pyspark==3.1.1, same dataset, same code, same
sessions. I think this is a very easy way to reproduce the issue without
including any third-party libraries. The two screenshots are actually the
pinpoint of this issue as to why 3.0.2 has 12 tasks in parallel when 3.1.1
has 12 tasks but 10 of them finish immediately while the other 2 are keep
processing. (also, the CPU usage in 3.0.2 is full while in 3.1.1 is very
minimal)

Something is different in spark/pyspark 3.1.1 not sure if it's about the
partitions, groupBy, limit, or just a conf being enabled or disabled in
3.1.1 resulting in these performance differences.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

Mich Talebzadeh
OK you need to asses where the versions have biggest impact in terms of timings

From spark GUI for each run under tab stages and completed stages, how Duration took for each task and how are they different for identical tasks in these two spark versions.

Example

image.png

Our impactor is writing to Google BigQuery from on-premise with variable timing because network we are using Cloud VPN (through public network/internet) as opposed to Cloud interconnect (dedicated network).

So in your case which stage is most time consuming?

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 13:13, maziyar <[hidden email]> wrote:
Hi Mich,

Thanks for the reply.

I have tried to minimize as much a possible the effect of other factors
between pyspark==3.0.2 and pyspark==3.1.1 including not reading csv or gz
and just reading the Parquet.

Here is a code purely in pyspark (nothing else included) and it finishes
within 47 seconds in pyspark 3.1.1 and 15 seconds in pyspark 3.0.2: (still
the performance hit is very large!)

spark = SparkSession.builder \
        .master("local[*]") \
        .config("spark.driver.memory", "16G") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "2000m") \
        .getOrCreate()

Toys = spark.read \
  .parquet('./toys-cleaned').repartition(12)

# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText",
outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)

# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")

all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k
top50k =
all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)

top50k.show()

This is a local test, just different conda environments one for
pyspark==3.0.2 and one for pyspark==3.1.1, same dataset, same code, same
sessions. I think this is a very easy way to reproduce the issue without
including any third-party libraries. The two screenshots are actually the
pinpoint of this issue as to why 3.0.2 has 12 tasks in parallel when 3.1.1
has 12 tasks but 10 of them finish immediately while the other 2 are keep
processing. (also, the CPU usage in 3.0.2 is full while in 3.1.1 is very
minimal)

Something is different in spark/pyspark 3.1.1 not sure if it's about the
partitions, groupBy, limit, or just a conf being enabled or disabled in
3.1.1 resulting in these performance differences.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

Russell Spitzer
Try disabling https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
Adaptive query execution, this would explain the different number of tasks post shuffle.

On Thu, Apr 8, 2021 at 7:58 AM Mich Talebzadeh <[hidden email]> wrote:
OK you need to asses where the versions have biggest impact in terms of timings

From spark GUI for each run under tab stages and completed stages, how Duration took for each task and how are they different for identical tasks in these two spark versions.

Example

image.png

Our impactor is writing to Google BigQuery from on-premise with variable timing because network we are using Cloud VPN (through public network/internet) as opposed to Cloud interconnect (dedicated network).

So in your case which stage is most time consuming?

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 13:13, maziyar <[hidden email]> wrote:
Hi Mich,

Thanks for the reply.

I have tried to minimize as much a possible the effect of other factors
between pyspark==3.0.2 and pyspark==3.1.1 including not reading csv or gz
and just reading the Parquet.

Here is a code purely in pyspark (nothing else included) and it finishes
within 47 seconds in pyspark 3.1.1 and 15 seconds in pyspark 3.0.2: (still
the performance hit is very large!)

spark = SparkSession.builder \
        .master("local[*]") \
        .config("spark.driver.memory", "16G") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "2000m") \
        .getOrCreate()

Toys = spark.read \
  .parquet('./toys-cleaned').repartition(12)

# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText",
outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)

# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")

all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k
top50k =
all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)

top50k.show()

This is a local test, just different conda environments one for
pyspark==3.0.2 and one for pyspark==3.1.1, same dataset, same code, same
sessions. I think this is a very easy way to reproduce the issue without
including any third-party libraries. The two screenshots are actually the
pinpoint of this issue as to why 3.0.2 has 12 tasks in parallel when 3.1.1
has 12 tasks but 10 of them finish immediately while the other 2 are keep
processing. (also, the CPU usage in 3.0.2 is full while in 3.1.1 is very
minimal)

Something is different in spark/pyspark 3.1.1 not sure if it's about the
partitions, groupBy, limit, or just a conf being enabled or disabled in
3.1.1 resulting in these performance differences.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

Russell Spitzer
Actually that only defaults to true in master ... so that may not be it ...

On Thu, Apr 8, 2021 at 8:28 AM Russell Spitzer <[hidden email]> wrote:
Try disabling https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
Adaptive query execution, this would explain the different number of tasks post shuffle.

On Thu, Apr 8, 2021 at 7:58 AM Mich Talebzadeh <[hidden email]> wrote:
OK you need to asses where the versions have biggest impact in terms of timings

From spark GUI for each run under tab stages and completed stages, how Duration took for each task and how are they different for identical tasks in these two spark versions.

Example

image.png

Our impactor is writing to Google BigQuery from on-premise with variable timing because network we are using Cloud VPN (through public network/internet) as opposed to Cloud interconnect (dedicated network).

So in your case which stage is most time consuming?

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 13:13, maziyar <[hidden email]> wrote:
Hi Mich,

Thanks for the reply.

I have tried to minimize as much a possible the effect of other factors
between pyspark==3.0.2 and pyspark==3.1.1 including not reading csv or gz
and just reading the Parquet.

Here is a code purely in pyspark (nothing else included) and it finishes
within 47 seconds in pyspark 3.1.1 and 15 seconds in pyspark 3.0.2: (still
the performance hit is very large!)

spark = SparkSession.builder \
        .master("local[*]") \
        .config("spark.driver.memory", "16G") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "2000m") \
        .getOrCreate()

Toys = spark.read \
  .parquet('./toys-cleaned').repartition(12)

# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText",
outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)

# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")

all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k
top50k =
all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)

top50k.show()

This is a local test, just different conda environments one for
pyspark==3.0.2 and one for pyspark==3.1.1, same dataset, same code, same
sessions. I think this is a very easy way to reproduce the issue without
including any third-party libraries. The two screenshots are actually the
pinpoint of this issue as to why 3.0.2 has 12 tasks in parallel when 3.1.1
has 12 tasks but 10 of them finish immediately while the other 2 are keep
processing. (also, the CPU usage in 3.0.2 is full while in 3.1.1 is very
minimal)

Something is different in spark/pyspark 3.1.1 not sure if it's about the
partitions, groupBy, limit, or just a conf being enabled or disabled in
3.1.1 resulting in these performance differences.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

maziyar
In reply to this post by Mich Talebzadeh
So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): Finished in 10 seconds For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6 (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)

Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

srowen
Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. 
Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general.

On Thu, Apr 8, 2021 at 8:52 AM maziyar <[hidden email]> wrote:
So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): Finished in 10 seconds For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6 (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)

Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

maziyar
Thanks Sean, 

I have already tried adding that and the result is absolutely the same.

The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:


So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.

On 8 Apr 2021, at 15:54, Sean Owen <[hidden email]> wrote:

Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. 
Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general.

On Thu, Apr 8, 2021 at 8:52 AM maziyar <[hidden email]> wrote:
So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): Finished in 10 seconds For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6 (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)

Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

Mich Talebzadeh
spark 3.1.1

I enabled the parameter

spark_session.conf.set("spark.sql.adaptive.enabled", "true")

to see it effects

in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client 

with 4 executors it crashed the cluster.

I then reduced the number of executors to 2 and this time it ran OK but the performance is worse

I assume it adds some overhead?



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 15:05, Maziyar Panahi <[hidden email]> wrote:
Thanks Sean, 

I have already tried adding that and the result is absolutely the same.

The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:


So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.

On 8 Apr 2021, at 15:54, Sean Owen <[hidden email]> wrote:

Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. 
Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general.

On Thu, Apr 8, 2021 at 8:52 AM maziyar <[hidden email]> wrote:
So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): Finished in 10 seconds For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6 (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)

Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

maziyar
I personally added the followings to my SparkSession in 3.1.1 and the result was exactly the same as before (local master). The 3.1.1 is still 4-5 times slower than 3.0.2 at least for that piece of code. I will do more investigation to see how it does with other stuff, especially anything without .transform or Spark ML related functions, but the small code I provided on any dataset that is big enough to take a minute to finish will show you the difference going from 3.0.2 to 3.1.1 by magnitude of 4-5:

.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.config("spark.sql.adaptive.enabled", "false")


On 8 Apr 2021, at 16:47, Mich Talebzadeh <[hidden email]> wrote:

spark 3.1.1

I enabled the parameter

spark_session.conf.set("spark.sql.adaptive.enabled", "true")

to see it effects

in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client 

with 4 executors it crashed the cluster.

I then reduced the number of executors to 2 and this time it ran OK but the performance is worse

I assume it adds some overhead?



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 15:05, Maziyar Panahi <[hidden email]> wrote:
Thanks Sean, 

I have already tried adding that and the result is absolutely the same.

The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:


So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.

On 8 Apr 2021, at 15:54, Sean Owen <[hidden email]> wrote:

Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. 
Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general.

On Thu, Apr 8, 2021 at 8:52 AM maziyar <[hidden email]> wrote:
So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): Finished in 10 seconds For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6 (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)

Sent from the Apache Spark User List mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

Mich Talebzadeh
Well the normal course of action (considering laws of diminishing returns)  is that your mileage varies:

Spark 3.0.1 is pretty stable and good enough. Unless there is an overriding reason why you have to use 3.1.1, you can set it aside and try it when you have other use cases. For now I guess you can carry on with 3.0.1 as BAU.

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 16:19, Maziyar Panahi <[hidden email]> wrote:
I personally added the followings to my SparkSession in 3.1.1 and the result was exactly the same as before (local master). The 3.1.1 is still 4-5 times slower than 3.0.2 at least for that piece of code. I will do more investigation to see how it does with other stuff, especially anything without .transform or Spark ML related functions, but the small code I provided on any dataset that is big enough to take a minute to finish will show you the difference going from 3.0.2 to 3.1.1 by magnitude of 4-5:

.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.config("spark.sql.adaptive.enabled", "false")


On 8 Apr 2021, at 16:47, Mich Talebzadeh <[hidden email]> wrote:

spark 3.1.1

I enabled the parameter

spark_session.conf.set("spark.sql.adaptive.enabled", "true")

to see it effects

in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client 

with 4 executors it crashed the cluster.

I then reduced the number of executors to 2 and this time it ran OK but the performance is worse

I assume it adds some overhead?



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 15:05, Maziyar Panahi <[hidden email]> wrote:
Thanks Sean, 

I have already tried adding that and the result is absolutely the same.

The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:


So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.

On 8 Apr 2021, at 15:54, Sean Owen <[hidden email]> wrote:

Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. 
Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general.

On Thu, Apr 8, 2021 at 8:52 AM maziyar <[hidden email]> wrote:
So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): Finished in 10 seconds For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6 (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)

Sent from the Apache Spark User List mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

maziyar
Thanks Mich, I will ask all of our users to use pyspark 3.0.x and will change all the notebooks/scripts to switch back from 3.1.1 to 3.0.2. 

That's being said, I won't be able to defend this request by telling Spark users the previous major release was and still is more stable than the latest major release, something that made everything default to 3.1.1 (pyspark, downloads, etc.).

I'll see if I can open a ticket for this as well.

On 8 Apr 2021, at 17:27, Mich Talebzadeh <[hidden email]> wrote:

Well the normal course of action (considering laws of diminishing returns)  is that your mileage varies:

Spark 3.0.1 is pretty stable and good enough. Unless there is an overriding reason why you have to use 3.1.1, you can set it aside and try it when you have other use cases. For now I guess you can carry on with 3.0.1 as BAU.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 16:19, Maziyar Panahi <[hidden email]> wrote:
I personally added the followings to my SparkSession in 3.1.1 and the result was exactly the same as before (local master). The 3.1.1 is still 4-5 times slower than 3.0.2 at least for that piece of code. I will do more investigation to see how it does with other stuff, especially anything without .transform or Spark ML related functions, but the small code I provided on any dataset that is big enough to take a minute to finish will show you the difference going from 3.0.2 to 3.1.1 by magnitude of 4-5:

.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.config("spark.sql.adaptive.enabled", "false")


On 8 Apr 2021, at 16:47, Mich Talebzadeh <[hidden email]> wrote:

spark 3.1.1

I enabled the parameter

spark_session.conf.set("spark.sql.adaptive.enabled", "true")

to see it effects

in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client 

with 4 executors it crashed the cluster.

I then reduced the number of executors to 2 and this time it ran OK but the performance is worse

I assume it adds some overhead?



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 15:05, Maziyar Panahi <[hidden email]> wrote:
Thanks Sean, 

I have already tried adding that and the result is absolutely the same.

The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:


So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.

On 8 Apr 2021, at 15:54, Sean Owen <[hidden email]> wrote:

Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. 
Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general.

On Thu, Apr 8, 2021 at 8:52 AM maziyar <[hidden email]> wrote:
So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): Finished in 10 seconds For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6 (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)

Sent from the Apache Spark User List mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Why is Spark 3.0.x faster than Spark 3.1.x

Mich Talebzadeh

Hi,

Regarding your point:

.... I won't be able to defend this request by telling Spark users the previous major release was and still is more stable than the latest major release ...

With the benefit of hindsight version 3.1.1 was released recently and the definition of stable (from a practical point of view) does not come into it yet. That is perhaps the reason why some vendors like Cloudera are few releases away from the latest version. In production what matters most is the predictability and stability. You are not doing anything wrong by rolling it back and awaiting further clarification and resolution on the error.

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 9 Apr 2021 at 08:58, Maziyar Panahi <[hidden email]> wrote:
Thanks Mich, I will ask all of our users to use pyspark 3.0.x and will change all the notebooks/scripts to switch back from 3.1.1 to 3.0.2. 

That's being said, I won't be able to defend this request by telling Spark users the previous major release was and still is more stable than the latest major release, something that made everything default to 3.1.1 (pyspark, downloads, etc.).

I'll see if I can open a ticket for this as well.

On 8 Apr 2021, at 17:27, Mich Talebzadeh <[hidden email]> wrote:

Well the normal course of action (considering laws of diminishing returns)  is that your mileage varies:

Spark 3.0.1 is pretty stable and good enough. Unless there is an overriding reason why you have to use 3.1.1, you can set it aside and try it when you have other use cases. For now I guess you can carry on with 3.0.1 as BAU.

HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 16:19, Maziyar Panahi <[hidden email]> wrote:
I personally added the followings to my SparkSession in 3.1.1 and the result was exactly the same as before (local master). The 3.1.1 is still 4-5 times slower than 3.0.2 at least for that piece of code. I will do more investigation to see how it does with other stuff, especially anything without .transform or Spark ML related functions, but the small code I provided on any dataset that is big enough to take a minute to finish will show you the difference going from 3.0.2 to 3.1.1 by magnitude of 4-5:

.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.config("spark.sql.adaptive.enabled", "false")


On 8 Apr 2021, at 16:47, Mich Talebzadeh <[hidden email]> wrote:

spark 3.1.1

I enabled the parameter

spark_session.conf.set("spark.sql.adaptive.enabled", "true")

to see it effects

in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client 

with 4 executors it crashed the cluster.

I then reduced the number of executors to 2 and this time it ran OK but the performance is worse

I assume it adds some overhead?



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 8 Apr 2021 at 15:05, Maziyar Panahi <[hidden email]> wrote:
Thanks Sean, 

I have already tried adding that and the result is absolutely the same.

The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:


So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.

On 8 Apr 2021, at 15:54, Sean Owen <[hidden email]> wrote:

Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. 
Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general.

On Thu, Apr 8, 2021 at 8:52 AM maziyar <[hidden email]> wrote:
So this is what I have in my Spark UI for 3.0.2 and 3.1.1: For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"): Finished in 10 seconds For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"): Finished the same stage in 39 seconds As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously: 3.1.1 3.0.2 PS: I have just made the same test in Databricks with 1 worker 8.1 (includes Apache Spark 3.1.1, Scala 2.12): 7.6 (includes Apache Spark 3.0.1, Scala 2.12) There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)

Sent from the Apache Spark User List mailing list archive at Nabble.com.