|
|
Hi,
I have a simple code that does a groupby, agg count, sort, etc. This code
finishes within 5 minutes on Spark 3.1.x. However, the same code, same
dataset, same SparkSession (configs) on Spark 3.0.2 will finish within a
minute. That is over 5x times the difference.
My SparkSession (same when it is used with --conf):
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.master("local[*]")
.config("spark.driver.memory", "16G")
.config("spark.driver.maxResultSize", "0")
.config("spark.kryoserializer.buffer.max","200M")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
Environments which I tested both 3.1.1 and 3.0.2:
- Intellij
- spark-shell
- pyspark shell
- pure Python with PyPI pyspark
The code, dataset, and initial report for reproducibility:
https://github.com/JohnSnowLabs/spark-nlp/issues/2739#issuecomment-815635930I have observed that in Spark 3.1.1, only 2 tasks are doing the majority of
the procession and it is not evenly distributed as one expects in a
12-partition DataFrame:
< http://apache-spark-user-list.1001560.n3.nabble.com/file/t8277/114009725-af969e00-9863-11eb-8e5b-07ce53e8f5f3.png>
However, without any change in any line of code or environment, Spark 3.0.2
will evenly distribute the tasks at the same time and everything runs in
parallel:
< http://apache-spark-user-list.1001560.n3.nabble.com/file/t8277/114009712-ac9bad80-9863-11eb-9e55-c797833bdbba.png>
Is there a new feature in Spark 3.1.1, a new config, something that causes
this unbalanced task execution which wasn't there before in Spark 2.4.x and
3.0.x? (I have read the migration guide but, could not find anything
relevant:
https://spark.apache.org/docs/latest/sql-migration-guide.html#upgrading-from-spark-sql-30-to-31)
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Hi,
I just looked through your code
- I assume you are testing this against spark 3.1.1?
- You are testing this set-up in a local mode in a single JVM, so it is not really distributed. I doubt whether a meaningful performance deduction can be made here
- It is accepted that newer versions of the product will offer more capabilities and hence they are expected to be more resource hungry but I concur obviously not 5 times slower
- You are reading a csv gzipped. The gz file is not splittable, therefore Spark needs to read the whole file using a single core which will slow things down (CPU intensive). After the read is done the data can be shuffled to increase parallelism.
- Intellij, Pycharm etc run in local mode anyway
- Have you tried $SPARK_HOME/bin.spark-submit --master local[something] xyz.py
FYI, I have used both Spark 3.0.1 and 3.1.1 both in local (spark-submit --master local) and in distributed mode (spark-submit --master yarn --deploy-mode client ..) and do not see this behaviour
HTH

view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi,
I have a simple code that does a groupby, agg count, sort, etc. This code
finishes within 5 minutes on Spark 3.1.x. However, the same code, same
dataset, same SparkSession (configs) on Spark 3.0.2 will finish within a
minute. That is over 5x times the difference.
My SparkSession (same when it is used with --conf):
val spark: SparkSession = SparkSession
.builder()
.appName("test")
.master("local[*]")
.config("spark.driver.memory", "16G")
.config("spark.driver.maxResultSize", "0")
.config("spark.kryoserializer.buffer.max","200M")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
Environments which I tested both 3.1.1 and 3.0.2:
- Intellij
- spark-shell
- pyspark shell
- pure Python with PyPI pyspark
The code, dataset, and initial report for reproducibility:
https://github.com/JohnSnowLabs/spark-nlp/issues/2739#issuecomment-815635930
I have observed that in Spark 3.1.1, only 2 tasks are doing the majority of
the procession and it is not evenly distributed as one expects in a
12-partition DataFrame:
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8277/114009725-af969e00-9863-11eb-8e5b-07ce53e8f5f3.png>
However, without any change in any line of code or environment, Spark 3.0.2
will evenly distribute the tasks at the same time and everything runs in
parallel:
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8277/114009712-ac9bad80-9863-11eb-9e55-c797833bdbba.png>
Is there a new feature in Spark 3.1.1, a new config, something that causes
this unbalanced task execution which wasn't there before in Spark 2.4.x and
3.0.x? (I have read the migration guide but, could not find anything
relevant:
https://spark.apache.org/docs/latest/sql-migration-guide.html#upgrading-from-spark-sql-30-to-31)
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Hi Mich,
Thanks for the reply.
I have tried to minimize as much a possible the effect of other factors
between pyspark==3.0.2 and pyspark==3.1.1 including not reading csv or gz
and just reading the Parquet.
Here is a code purely in pyspark (nothing else included) and it finishes
within 47 seconds in pyspark 3.1.1 and 15 seconds in pyspark 3.0.2: (still
the performance hit is very large!)
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.driver.memory", "16G") \
.config("spark.driver.maxResultSize", "0") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer.buffer.max", "2000m") \
.getOrCreate()
Toys = spark.read \
.parquet('./toys-cleaned').repartition(12)
# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText",
outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)
# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")
all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k
top50k =
all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)
top50k.show()
This is a local test, just different conda environments one for
pyspark==3.0.2 and one for pyspark==3.1.1, same dataset, same code, same
sessions. I think this is a very easy way to reproduce the issue without
including any third-party libraries. The two screenshots are actually the
pinpoint of this issue as to why 3.0.2 has 12 tasks in parallel when 3.1.1
has 12 tasks but 10 of them finish immediately while the other 2 are keep
processing. (also, the CPU usage in 3.0.2 is full while in 3.1.1 is very
minimal)
Something is different in spark/pyspark 3.1.1 not sure if it's about the
partitions, groupBy, limit, or just a conf being enabled or disabled in
3.1.1 resulting in these performance differences.
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
OK you need to asses where the versions have biggest impact in terms of timings
From spark GUI for each run under tab stages and completed stages, how Duration took for each task and how are they different for identical tasks in these two spark versions.
Example
Our impactor is writing to Google BigQuery from on-premise with variable timing because network we are using Cloud VPN (through public network/internet) as opposed to Cloud interconnect (dedicated network).
So in your case which stage is most time consuming?
HTH

view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich,
Thanks for the reply.
I have tried to minimize as much a possible the effect of other factors
between pyspark==3.0.2 and pyspark==3.1.1 including not reading csv or gz
and just reading the Parquet.
Here is a code purely in pyspark (nothing else included) and it finishes
within 47 seconds in pyspark 3.1.1 and 15 seconds in pyspark 3.0.2: (still
the performance hit is very large!)
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.driver.memory", "16G") \
.config("spark.driver.maxResultSize", "0") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer.buffer.max", "2000m") \
.getOrCreate()
Toys = spark.read \
.parquet('./toys-cleaned').repartition(12)
# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText",
outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)
# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")
all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k
top50k =
all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)
top50k.show()
This is a local test, just different conda environments one for
pyspark==3.0.2 and one for pyspark==3.1.1, same dataset, same code, same
sessions. I think this is a very easy way to reproduce the issue without
including any third-party libraries. The two screenshots are actually the
pinpoint of this issue as to why 3.0.2 has 12 tasks in parallel when 3.1.1
has 12 tasks but 10 of them finish immediately while the other 2 are keep
processing. (also, the CPU usage in 3.0.2 is full while in 3.1.1 is very
minimal)
Something is different in spark/pyspark 3.1.1 not sure if it's about the
partitions, groupBy, limit, or just a conf being enabled or disabled in
3.1.1 resulting in these performance differences.
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
On Thu, Apr 8, 2021 at 7:58 AM Mich Talebzadeh < [hidden email]> wrote: OK you need to asses where the versions have biggest impact in terms of timings
From spark GUI for each run under tab stages and completed stages, how Duration took for each task and how are they different for identical tasks in these two spark versions.
Example
Our impactor is writing to Google BigQuery from on-premise with variable timing because network we are using Cloud VPN (through public network/internet) as opposed to Cloud interconnect (dedicated network).
So in your case which stage is most time consuming?
HTH

view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich,
Thanks for the reply.
I have tried to minimize as much a possible the effect of other factors
between pyspark==3.0.2 and pyspark==3.1.1 including not reading csv or gz
and just reading the Parquet.
Here is a code purely in pyspark (nothing else included) and it finishes
within 47 seconds in pyspark 3.1.1 and 15 seconds in pyspark 3.0.2: (still
the performance hit is very large!)
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.driver.memory", "16G") \
.config("spark.driver.maxResultSize", "0") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer.buffer.max", "2000m") \
.getOrCreate()
Toys = spark.read \
.parquet('./toys-cleaned').repartition(12)
# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText",
outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)
# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")
all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k
top50k =
all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)
top50k.show()
This is a local test, just different conda environments one for
pyspark==3.0.2 and one for pyspark==3.1.1, same dataset, same code, same
sessions. I think this is a very easy way to reproduce the issue without
including any third-party libraries. The two screenshots are actually the
pinpoint of this issue as to why 3.0.2 has 12 tasks in parallel when 3.1.1
has 12 tasks but 10 of them finish immediately while the other 2 are keep
processing. (also, the CPU usage in 3.0.2 is full while in 3.1.1 is very
minimal)
Something is different in spark/pyspark 3.1.1 not sure if it's about the
partitions, groupBy, limit, or just a conf being enabled or disabled in
3.1.1 resulting in these performance differences.
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
Actually that only defaults to true in master ... so that may not be it ... On Thu, Apr 8, 2021 at 8:28 AM Russell Spitzer < [hidden email]> wrote:
On Thu, Apr 8, 2021 at 7:58 AM Mich Talebzadeh < [hidden email]> wrote: OK you need to asses where the versions have biggest impact in terms of timings
From spark GUI for each run under tab stages and completed stages, how Duration took for each task and how are they different for identical tasks in these two spark versions.
Example
Our impactor is writing to Google BigQuery from on-premise with variable timing because network we are using Cloud VPN (through public network/internet) as opposed to Cloud interconnect (dedicated network).
So in your case which stage is most time consuming?
HTH

view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Hi Mich,
Thanks for the reply.
I have tried to minimize as much a possible the effect of other factors
between pyspark==3.0.2 and pyspark==3.1.1 including not reading csv or gz
and just reading the Parquet.
Here is a code purely in pyspark (nothing else included) and it finishes
within 47 seconds in pyspark 3.1.1 and 15 seconds in pyspark 3.0.2: (still
the performance hit is very large!)
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.driver.memory", "16G") \
.config("spark.driver.maxResultSize", "0") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer.buffer.max", "2000m") \
.getOrCreate()
Toys = spark.read \
.parquet('./toys-cleaned').repartition(12)
# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText",
outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)
# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")
all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k
top50k =
all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)
top50k.show()
This is a local test, just different conda environments one for
pyspark==3.0.2 and one for pyspark==3.1.1, same dataset, same code, same
sessions. I think this is a very easy way to reproduce the issue without
including any third-party libraries. The two screenshots are actually the
pinpoint of this issue as to why 3.0.2 has 12 tasks in parallel when 3.1.1
has 12 tasks but 10 of them finish immediately while the other 2 are keep
processing. (also, the CPU usage in 3.0.2 is full while in 3.1.1 is very
minimal)
Something is different in spark/pyspark 3.1.1 not sure if it's about the
partitions, groupBy, limit, or just a conf being enabled or disabled in
3.1.1 resulting in these performance differences.
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
|
|
So this is what I have in my Spark UI for 3.0.2 and 3.1.1:
For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"):

Finished in 10 seconds
For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"):

Finished the same stage in 39 seconds
As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously:
3.1.1

3.0.2

PS: I have just made the same test in Databricks with 1 worker
8.1 (includes Apache Spark 3.1.1, Scala 2.12):

7.6 (includes Apache Spark 3.0.1, Scala 2.12)

There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)
Sent from the Apache Spark User List mailing list archive at Nabble.com.
|
|
Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general. So this is what I have in my Spark UI for 3.0.2 and 3.1.1:
For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"):
Finished in 10 seconds
For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"):
Finished the same stage in 39 seconds
As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously:
3.1.1
3.0.2
PS: I have just made the same test in Databricks with 1 worker
8.1 (includes Apache Spark 3.1.1, Scala 2.12):
7.6 (includes Apache Spark 3.0.1, Scala 2.12)
There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)
Sent from the Apache Spark User List mailing list archive at Nabble.com.
|
|
Thanks Sean,
I have already tried adding that and the result is absolutely the same.
The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:
So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.
Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general. So this is what I have in my Spark UI for 3.0.2 and 3.1.1:
For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"):
Finished in 10 seconds
For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"):
Finished the same stage in 39 seconds
As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously:
3.1.1
3.0.2
PS: I have just made the same test in Databricks with 1 worker
8.1 (includes Apache Spark 3.1.1, Scala 2.12):
7.6 (includes Apache Spark 3.0.1, Scala 2.12)
There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)
Sent from the Apache Spark User List mailing list archive at Nabble.com.
|
|
spark 3.1.1
I enabled the parameter
spark_session.conf.set("spark.sql.adaptive.enabled", "true")
to see it effects
in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client
with 4 executors it crashed the cluster.
I then reduced the number of executors to 2 and this time it ran OK but the performance is worse
I assume it adds some overhead?
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
I have already tried adding that and the result is absolutely the same.
The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:
So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.
Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general. So this is what I have in my Spark UI for 3.0.2 and 3.1.1:
For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"):
Finished in 10 seconds
For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"):
Finished the same stage in 39 seconds
As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously:
3.1.1
3.0.2
PS: I have just made the same test in Databricks with 1 worker
8.1 (includes Apache Spark 3.1.1, Scala 2.12):
7.6 (includes Apache Spark 3.0.1, Scala 2.12)
There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)
Sent from the Apache Spark User List mailing list archive at Nabble.com.
|
|
I personally added the followings to my SparkSession in 3.1.1 and the result was exactly the same as before (local master). The 3.1.1 is still 4-5 times slower than 3.0.2 at least for that piece of code. I will do more investigation to see how it does with other stuff, especially anything without .transform or Spark ML related functions, but the small code I provided on any dataset that is big enough to take a minute to finish will show you the difference going from 3.0.2 to 3.1.1 by magnitude of 4-5:
.config("spark.sql.adaptive.coalescePartitions.enabled", "false") .config("spark.sql.adaptive.enabled", "false")
spark 3.1.1
I enabled the parameter
spark_session.conf.set("spark.sql.adaptive.enabled", "true")
to see it effects
in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client
with 4 executors it crashed the cluster.
I then reduced the number of executors to 2 and this time it ran OK but the performance is worse
I assume it adds some overhead?
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
I have already tried adding that and the result is absolutely the same.
The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:
So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.
Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general. So this is what I have in my Spark UI for 3.0.2 and 3.1.1:
For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"):
Finished in 10 seconds
For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"):
Finished the same stage in 39 seconds
As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously:
3.1.1
3.0.2
PS: I have just made the same test in Databricks with 1 worker
8.1 (includes Apache Spark 3.1.1, Scala 2.12):
7.6 (includes Apache Spark 3.0.1, Scala 2.12)
There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)
Sent from the Apache Spark User List mailing list archive at Nabble.com.
|
|
Well the normal course of action (considering laws of diminishing returns) is that your mileage varies:
Spark 3.0.1 is pretty stable and good enough. Unless there is an overriding reason why you have to use 3.1.1, you can set it aside and try it when you have other use cases. For now I guess you can carry on with 3.0.1 as BAU.
HTH
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
I personally added the followings to my SparkSession in 3.1.1 and the result was exactly the same as before (local master). The 3.1.1 is still 4-5 times slower than 3.0.2 at least for that piece of code. I will do more investigation to see how it does with other stuff, especially anything without .transform or Spark ML related functions, but the small code I provided on any dataset that is big enough to take a minute to finish will show you the difference going from 3.0.2 to 3.1.1 by magnitude of 4-5:
.config("spark.sql.adaptive.coalescePartitions.enabled", "false") .config("spark.sql.adaptive.enabled", "false")
spark 3.1.1
I enabled the parameter
spark_session.conf.set("spark.sql.adaptive.enabled", "true")
to see it effects
in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client
with 4 executors it crashed the cluster.
I then reduced the number of executors to 2 and this time it ran OK but the performance is worse
I assume it adds some overhead?
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
I have already tried adding that and the result is absolutely the same.
The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:
So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.
Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general. So this is what I have in my Spark UI for 3.0.2 and 3.1.1:
For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"):
Finished in 10 seconds
For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"):
Finished the same stage in 39 seconds
As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously:
3.1.1
3.0.2
PS: I have just made the same test in Databricks with 1 worker
8.1 (includes Apache Spark 3.1.1, Scala 2.12):
7.6 (includes Apache Spark 3.0.1, Scala 2.12)
There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)
Sent from the Apache Spark User List mailing list archive at Nabble.com.
|
|
Thanks Mich, I will ask all of our users to use pyspark 3.0.x and will change all the notebooks/scripts to switch back from 3.1.1 to 3.0.2.
That's being said, I won't be able to defend this request by telling Spark users the previous major release was and still is more stable than the latest major release, something that made everything default to 3.1.1 (pyspark, downloads, etc.).
I'll see if I can open a ticket for this as well.
Well the normal course of action (considering laws of diminishing returns) is that your mileage varies:
Spark 3.0.1 is pretty stable and good enough. Unless there is an overriding reason why you have to use 3.1.1, you can set it aside and try it when you have other use cases. For now I guess you can carry on with 3.0.1 as BAU.
HTH
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
I personally added the followings to my SparkSession in 3.1.1 and the result was exactly the same as before (local master). The 3.1.1 is still 4-5 times slower than 3.0.2 at least for that piece of code. I will do more investigation to see how it does with other stuff, especially anything without .transform or Spark ML related functions, but the small code I provided on any dataset that is big enough to take a minute to finish will show you the difference going from 3.0.2 to 3.1.1 by magnitude of 4-5:
.config("spark.sql.adaptive.coalescePartitions.enabled", "false") .config("spark.sql.adaptive.enabled", "false")
spark 3.1.1
I enabled the parameter
spark_session.conf.set("spark.sql.adaptive.enabled", "true")
to see it effects
in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client
with 4 executors it crashed the cluster.
I then reduced the number of executors to 2 and this time it ran OK but the performance is worse
I assume it adds some overhead?
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
I have already tried adding that and the result is absolutely the same.
The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:
So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.
Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general. So this is what I have in my Spark UI for 3.0.2 and 3.1.1:
For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"):
Finished in 10 seconds
For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"):
Finished the same stage in 39 seconds
As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously:
3.1.1
3.0.2
PS: I have just made the same test in Databricks with 1 worker
8.1 (includes Apache Spark 3.1.1, Scala 2.12):
7.6 (includes Apache Spark 3.0.1, Scala 2.12)
There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)
Sent from the Apache Spark User List mailing list archive at Nabble.com.
|
|
Hi,
Regarding your point:
.... I won't be able to defend this request by telling Spark users the previous major release was and still is more stable than the latest major release ...
With the benefit of hindsight version 3.1.1 was released recently and the definition of stable (from a practical point of view) does not come into it yet. That is perhaps the reason why some vendors like Cloudera are few releases away from the latest version. In production what matters most is the predictability and stability. You are not doing anything wrong by rolling it back and awaiting further clarification and resolution on the error.
HTH

view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Mich, I will ask all of our users to use pyspark 3.0.x and will change all the notebooks/scripts to switch back from 3.1.1 to 3.0.2.
That's being said, I won't be able to defend this request by telling Spark users the previous major release was and still is more stable than the latest major release, something that made everything default to 3.1.1 (pyspark, downloads, etc.).
I'll see if I can open a ticket for this as well.
Well the normal course of action (considering laws of diminishing returns) is that your mileage varies:
Spark 3.0.1 is pretty stable and good enough. Unless there is an overriding reason why you have to use 3.1.1, you can set it aside and try it when you have other use cases. For now I guess you can carry on with 3.0.1 as BAU.
HTH
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
I personally added the followings to my SparkSession in 3.1.1 and the result was exactly the same as before (local master). The 3.1.1 is still 4-5 times slower than 3.0.2 at least for that piece of code. I will do more investigation to see how it does with other stuff, especially anything without .transform or Spark ML related functions, but the small code I provided on any dataset that is big enough to take a minute to finish will show you the difference going from 3.0.2 to 3.1.1 by magnitude of 4-5:
.config("spark.sql.adaptive.coalescePartitions.enabled", "false") .config("spark.sql.adaptive.enabled", "false")
spark 3.1.1
I enabled the parameter
spark_session.conf.set("spark.sql.adaptive.enabled", "true")
to see it effects
in yarn cluster mode, i.e spark-submit --master yarn --deploy-mode client
with 4 executors it crashed the cluster.
I then reduced the number of executors to 2 and this time it ran OK but the performance is worse
I assume it adds some overhead?
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
Thanks Sean,
I have already tried adding that and the result is absolutely the same.
The reason that config cannot be the reason (at least not alone) it's because my comparison is between Spark 3.0.2 and Spark 3.1.1. This config has been set to true the beginning of 3.0.0 and hasn't changed:
So it can't be a good thing for 3.0.2 and a bad thing for 3.1.1, unfortunately the issue is some where else.
Right, you already established a few times that the difference is the number of partitions. Russell answered with what is almost surely the correct answer, that it's AQE. In toy cases it isn't always a win. Disable it if you need to. It's not a problem per se in 3.1; AQE speeds up more realistic workloads in general. So this is what I have in my Spark UI for 3.0.2 and 3.1.1:
For pyspark==3.0.2 (stage "showString at NativeMethodAccessorImpl.java:0"):
Finished in 10 seconds
For pyspark==3.1.1 (same stage "showString at NativeMethodAccessorImpl.java:0"):
Finished the same stage in 39 seconds
As you can see everything is literally the same between 3.0.2 and 3.1.1, number of stages, number of tasks, Input, Output, Shuffle Read, Shuffle Write, except the 3.0.2 runs all 12 tasks together while the 3.1.1 finishes 10/12 and the other 2 are the processing of the actual task which I shared previously:
3.1.1
3.0.2
PS: I have just made the same test in Databricks with 1 worker
8.1 (includes Apache Spark 3.1.1, Scala 2.12):
7.6 (includes Apache Spark 3.0.1, Scala 2.12)
There is still a difference, over 20 seconds which when it comes to the whole process being within a minute that is a big bump. Not sure what it is, but until further notice, I will advise our users to not use Spark/PySpark 3.1.1 locally or in Databricks. (there are other optimizations, maybe it's not noticeable, but this is such a simple code and it can become a bottleneck quickly in larger pipelines)
Sent from the Apache Spark User List mailing list archive at Nabble.com.
|
|