Questions about count() performance with dataframes and parquet files

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Questions about count() performance with dataframes and parquet files

WranglingData
Hi,

I am currently working on an app using PySpark to produce an insert and update daily delta capture, being outputted as Parquet.  This is running on a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a dataframe, which has a hash of all records appended to it and comparing it to a dataframe from yesterdays data (which has been saved also as parquet).  

As part of the monitoring and logging, I am trying to count the number of records for the respective actions.  Example code:
df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \
            .cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
                    .cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
                        .select(lit('Update').alias('_action'), *df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
                        .cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
The above code is running two occurrences concurrently via Python threading.Thread (this is to try and overcome the network bottle neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the counts.  Occasionally, it appears that it will freeze up on a count operation for a few minutes and quite often that specific data frame will have zero records in it.  According to the DAG (which I am not 100% sure how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  => WholeStageCodegen/MapPartitionsRDD [75]count at NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the data frame operations and instead open the outputted parquet field and count using a  `sql_context.read.load('/path/to/output.parquet').filter(col("_action") == "Insert").count()` command, I am reducing my run-times by around 20 to 30%.  In my feeble mind, opening up the outputs and re-reading them seems counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am doing the above as efficiently as possible?

Best Regards
Ashley
Reply | Threaded
Open this post in threaded view
|

Re: Questions about count() performance with dataframes and parquet files

David Edwards
Hi Ashley, 

I'm not an expert but think this is because spark does lazy execution and doesn't actually perform any actions until you do some kind of write, count or other operation on the dataframe.

If you remove the count steps it will work out a more efficient execution plan reducing the number of task steps. 

if you can do the count as a final step I would do that. I think you may also not need the .cache() statements and you might want to experiment reducing the number spark.sql.shuffle.partitions too.

Thanks
Dave








On Thu, 13 Feb 2020, 04:09 Ashley Hoff, <[hidden email]> wrote:
Hi,

I am currently working on an app using PySpark to produce an insert and update daily delta capture, being outputted as Parquet.  This is running on a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a dataframe, which has a hash of all records appended to it and comparing it to a dataframe from yesterdays data (which has been saved also as parquet).  

As part of the monitoring and logging, I am trying to count the number of records for the respective actions.  Example code:
df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \
            .cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
                    .cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
                        .select(lit('Update').alias('_action'), *df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
                        .cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
The above code is running two occurrences concurrently via Python threading.Thread (this is to try and overcome the network bottle neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the counts.  Occasionally, it appears that it will freeze up on a count operation for a few minutes and quite often that specific data frame will have zero records in it.  According to the DAG (which I am not 100% sure how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  => WholeStageCodegen/MapPartitionsRDD [75]count at NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the data frame operations and instead open the outputted parquet field and count using a  `sql_context.read.load('/path/to/output.parquet').filter(col("_action") == "Insert").count()` command, I am reducing my run-times by around 20 to 30%.  In my feeble mind, opening up the outputs and re-reading them seems counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am doing the above as efficiently as possible?

Best Regards
Ashley
Reply | Threaded
Open this post in threaded view
|

Re: Questions about count() performance with dataframes and parquet files

WranglingData
Thanks David,

I did experiment with the .cache() keyword and have to admit I didn't see any marked improvement on the sample that I was running, so yes I am a bit apprehensive including it (not even sure why I actually left it in).

When you say "do the count as the final step", are you referring to getting the counts of the individual data frames, or from the already outputted parquet?

Thanks and I appreciate your reply

On Thu, Feb 13, 2020 at 4:15 PM David Edwards <[hidden email]> wrote:
Hi Ashley, 

I'm not an expert but think this is because spark does lazy execution and doesn't actually perform any actions until you do some kind of write, count or other operation on the dataframe.

If you remove the count steps it will work out a more efficient execution plan reducing the number of task steps. 

if you can do the count as a final step I would do that. I think you may also not need the .cache() statements and you might want to experiment reducing the number spark.sql.shuffle.partitions too.

Thanks
Dave








On Thu, 13 Feb 2020, 04:09 Ashley Hoff, <[hidden email]> wrote:
Hi,

I am currently working on an app using PySpark to produce an insert and update daily delta capture, being outputted as Parquet.  This is running on a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a dataframe, which has a hash of all records appended to it and comparing it to a dataframe from yesterdays data (which has been saved also as parquet).  

As part of the monitoring and logging, I am trying to count the number of records for the respective actions.  Example code:
df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \
            .cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
                    .cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
                        .select(lit('Update').alias('_action'), *df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
                        .cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
The above code is running two occurrences concurrently via Python threading.Thread (this is to try and overcome the network bottle neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the counts.  Occasionally, it appears that it will freeze up on a count operation for a few minutes and quite often that specific data frame will have zero records in it.  According to the DAG (which I am not 100% sure how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  => WholeStageCodegen/MapPartitionsRDD [75]count at NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the data frame operations and instead open the outputted parquet field and count using a  `sql_context.read.load('/path/to/output.parquet').filter(col("_action") == "Insert").count()` command, I am reducing my run-times by around 20 to 30%.  In my feeble mind, opening up the outputs and re-reading them seems counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am doing the above as efficiently as possible?

Best Regards
Ashley


--
Reply | Threaded
Open this post in threaded view
|

Re: Questions about count() performance with dataframes and parquet files

David Edwards
Hi ashley,

Apologies reading this on my phone as work l laptop doesn't let me access personal email. 

Are you actually doing anything with the counts (printing to log, writing to table?)

If you're not doing anything with them get rid of them and the caches entirely. 

If you do want to do something with the counts you could try removing the individual counts and caches. 

Put a single cache on the df_output

df_output = df_inserts.union(df_updates).cache()
Then output a count group by type on this df before  writing out the parquet. 
Hope that helps
Dave

On Thu, 13 Feb 2020, 06:09 Ashley Hoff, <[hidden email]> wrote:
Thanks David,

I did experiment with the .cache() keyword and have to admit I didn't see any marked improvement on the sample that I was running, so yes I am a bit apprehensive including it (not even sure why I actually left it in).

When you say "do the count as the final step", are you referring to getting the counts of the individual data frames, or from the already outputted parquet?

Thanks and I appreciate your reply

On Thu, Feb 13, 2020 at 4:15 PM David Edwards <[hidden email]> wrote:
Hi Ashley, 

I'm not an expert but think this is because spark does lazy execution and doesn't actually perform any actions until you do some kind of write, count or other operation on the dataframe.

If you remove the count steps it will work out a more efficient execution plan reducing the number of task steps. 

if you can do the count as a final step I would do that. I think you may also not need the .cache() statements and you might want to experiment reducing the number spark.sql.shuffle.partitions too.

Thanks
Dave








On Thu, 13 Feb 2020, 04:09 Ashley Hoff, <[hidden email]> wrote:
Hi,

I am currently working on an app using PySpark to produce an insert and update daily delta capture, being outputted as Parquet.  This is running on a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a dataframe, which has a hash of all records appended to it and comparing it to a dataframe from yesterdays data (which has been saved also as parquet).  

As part of the monitoring and logging, I am trying to count the number of records for the respective actions.  Example code:
df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \
            .cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
                    .cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
                        .select(lit('Update').alias('_action'), *df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
                        .cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
The above code is running two occurrences concurrently via Python threading.Thread (this is to try and overcome the network bottle neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the counts.  Occasionally, it appears that it will freeze up on a count operation for a few minutes and quite often that specific data frame will have zero records in it.  According to the DAG (which I am not 100% sure how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  => WholeStageCodegen/MapPartitionsRDD [75]count at NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the data frame operations and instead open the outputted parquet field and count using a  `sql_context.read.load('/path/to/output.parquet').filter(col("_action") == "Insert").count()` command, I am reducing my run-times by around 20 to 30%.  In my feeble mind, opening up the outputs and re-reading them seems counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am doing the above as efficiently as possible?

Best Regards
Ashley


--
Reply | Threaded
Open this post in threaded view
|

Re: Questions about count() performance with dataframes and parquet files

Enrico Minack
In reply to this post by WranglingData
Ashley,

I want to suggest a few optimizations. The problem might go away but at least performance should improve.
The freeze problems could have many reasons, the Spark UI SQL pages and stages detail pages would be useful. You can send them privately, if you wish.

1. the repartition(1) should be replaced by coalesce(1). The former will shuffle all data, while the latter will read in the existing partitions and not shuffle them again.
2. Repartitioning to a single partition is discouraged, unless you can guarantee the data fit into one worker's memory.
3. You can compute Insert and Update in one go, so that you don't have to join with df_reference twice.

df_actions = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="left") \
                             .withColumn('_action', when(col('b.hashkey').isNull, 'Insert').otherwise(col('a.hashkey') != col('b.hashkey'), 'Update')) \
                             .select(col('_action'), *df_source_hashed) \
                             .dropDuplicates() \
                             .cache()
Since df_actions is cached, you can count inserts and updates quickly with only that one join in df_actions:
inserts_count = df_actions.where(col('_action') === 'Insert').count()
updates_count = df_actions.where(col('_action') === 'Update').count()
And you can get rid of the union:
df_output = df_actions.where(col('_action').isNotNull)
If you have to write that output to parquet anyway, then you can get the count quickly from the parquet file if it is partitioned by the _action column (Spark then only looks into parquet's metadata to get the count, it does not read any row):
df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
df_output = sql_context.read.parquet('/path/to/output.parquet')
inserts_count = df_output.where(col('_action') === 'Insert').count()
updates_count = df_output.where(col('_action') === 'Update').count()
These are all just sketches, but I am sure you get the idea.

Enrico


Am 13.02.20 um 05:08 schrieb Ashley Hoff:
Hi,

I am currently working on an app using PySpark to produce an insert and update daily delta capture, being outputted as Parquet.  This is running on a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a dataframe, which has a hash of all records appended to it and comparing it to a dataframe from yesterdays data (which has been saved also as parquet).  

As part of the monitoring and logging, I am trying to count the number of records for the respective actions.  Example code:
df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \
            .cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
                    .cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
                        .select(lit('Update').alias('_action'), *df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
                        .cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
The above code is running two occurrences concurrently via Python threading.Thread (this is to try and overcome the network bottle neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the counts.  Occasionally, it appears that it will freeze up on a count operation for a few minutes and quite often that specific data frame will have zero records in it.  According to the DAG (which I am not 100% sure how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  => WholeStageCodegen/MapPartitionsRDD [75]count at NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the data frame operations and instead open the outputted parquet field and count using a  `sql_context.read.load('/path/to/output.parquet').filter(col("_action") == "Insert").count()` command, I am reducing my run-times by around 20 to 30%.  In my feeble mind, opening up the outputs and re-reading them seems counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am doing the above as efficiently as possible?

Best Regards
Ashley


Reply | Threaded
Open this post in threaded view
|

Re: Questions about count() performance with dataframes and parquet files

WranglingData
Hi,

Thank you both for your suggestions!  These have been eyeopeners for me.

Just to clarify, I need the counts for logging and auditing purposes otherwise I would exclude the step.  I should have also mentioned that while I am processing around 30 GB of raw data, the individual outputs are relatively small - in total across all files around 30 MB.

Firstly, I have identified the issue and it has nothing to do with Spark.  As part of my testing regime, I do a large file copy operation to stage data (copying around 6 GB from one directory to the next).  Once that is done, I kick off my process.  Looking at Cockpit, I noticed that after the copy command had completed, there was a good 60+ seconds of intensive disk IO.  In previous testing, this IO was still occurring when the first stages of my script was running.  I've now extensively re-tested after letting this IO drop off and I am no longer getting these freezes.  In reality there is any benefit in the real world (I.E., waiting 90 seconds to save 60 seconds....), but at least I can explain why.

David, by far your suggestion of reducing the shuffle partitions has absolutely smashed the run times out of the park.  I've reduced the shuffles down to the configured number of workers (in this case 6) and I am seeing another 20% of my run times.  I have now hit a firm bottleneck around network and getting the data from the database server (there was not much difference between 30 shuffle partitions compared to 6).

Enrico, I have tried your suggestions and I can see some wins as well.  I have to re-design and rebuild some of my solution to get them to work.  When this project was started, I was asked to provide single partitioned parquet files (in the same sort of way you would see being outputted by Pandas) and so my solution has been built around that.  By partitioning on a field means that I can't deliver in this way.  Regardless, reading in the parquet at the end, even with a filter clause in the statement, appears to be much quicker than reading from the data frames.  (I now need to try and convince the other stakeholders in the project that delivering files how Spark intended is the correct method)

Thank you both for your input.

Best regards
Ashley



On Fri, Feb 14, 2020 at 4:44 AM Enrico Minack <[hidden email]> wrote:
Ashley,

I want to suggest a few optimizations. The problem might go away but at least performance should improve.
The freeze problems could have many reasons, the Spark UI SQL pages and stages detail pages would be useful. You can send them privately, if you wish.

1. the repartition(1) should be replaced by coalesce(1). The former will shuffle all data, while the latter will read in the existing partitions and not shuffle them again.
2. Repartitioning to a single partition is discouraged, unless you can guarantee the data fit into one worker's memory.
3. You can compute Insert and Update in one go, so that you don't have to join with df_reference twice.

df_actions = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="left") \
                             .withColumn('_action', when(col('b.hashkey').isNull, 'Insert').otherwise(col('a.hashkey') != col('b.hashkey'), 'Update')) \
                             .select(col('_action'), *df_source_hashed) \
                             .dropDuplicates() \
                             .cache()
Since df_actions is cached, you can count inserts and updates quickly with only that one join in df_actions:
inserts_count = df_actions.where(col('_action') === 'Insert').count()
updates_count = df_actions.where(col('_action') === 'Update').count()
And you can get rid of the union:
df_output = df_actions.where(col('_action').isNotNull)
If you have to write that output to parquet anyway, then you can get the count quickly from the parquet file if it is partitioned by the _action column (Spark then only looks into parquet's metadata to get the count, it does not read any row):
df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
df_output = sql_context.read.parquet('/path/to/output.parquet')
inserts_count = df_output.where(col('_action') === 'Insert').count()
updates_count = df_output.where(col('_action') === 'Update').count()
These are all just sketches, but I am sure you get the idea.

Enrico


Am 13.02.20 um 05:08 schrieb Ashley Hoff:
Hi,

I am currently working on an app using PySpark to produce an insert and update daily delta capture, being outputted as Parquet.  This is running on a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of 2GB memory each) running Spark 2.4.3.

This is being achieved by reading in data from a TSQL database, into a dataframe, which has a hash of all records appended to it and comparing it to a dataframe from yesterdays data (which has been saved also as parquet).  

As part of the monitoring and logging, I am trying to count the number of records for the respective actions.  Example code:
df_source = spark_session.read.format('jdbc').....
df_reference = sql_context.read.parquet('/path/to/reference.parquet')

df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', *df_source.columns))) \
            .cache()

df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
                    .select(lit('Insert').alias('_action'), *df_source_hashed) \
                    .dropDuplicates() \
                    .cache()
inserts_count = df_inserts.count()

df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list, how="inner") \
                        .select(lit('Update').alias('_action'), *df_source_hashed) \
                        .where(col('a.hashkey') != col('b.hashkey')) \
                        .dropDuplicates() \
                        .cache()
updates_count = df_updates.count()

df_output = df_inserts.union(df_updates)

df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
The above code is running two occurrences concurrently via Python threading.Thread (this is to try and overcome the network bottle neck connecting to the database server).

What I am finding is I am getting some very inconsistent behavior with the counts.  Occasionally, it appears that it will freeze up on a count operation for a few minutes and quite often that specific data frame will have zero records in it.  According to the DAG (which I am not 100% sure how to read) the following is the processing flow:

Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  => WholeStageCodegen/MapPartitionsRDD [75]count at NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD [81]count at NativeMethodAccessorImpl.java:0

The other observation I have found that if I remove the counts from the data frame operations and instead open the outputted parquet field and count using a  `sql_context.read.load('/path/to/output.parquet').filter(col("_action") == "Insert").count()` command, I am reducing my run-times by around 20 to 30%.  In my feeble mind, opening up the outputs and re-reading them seems counter-intuitive.

Is anyone able to give me some guidance on why or how to ensure that I am doing the above as efficiently as possible?

Best Regards
Ashley




--
Reply | Threaded
Open this post in threaded view
|

Re: Questions about count() performance with dataframes and parquet files

Nicolas Paris-2
In reply to this post by Enrico Minack

> .dropDuplicates() \ .cache() |
> Since df_actions is cached, you can count inserts and updates quickly
> with only that one join in df_actions:

Hi Enrico. I am wondering if this is ok for very large tables ? Is
caching faster than recomputing both insert/update ?

Thanks

Enrico Minack <[hidden email]> writes:

> Ashley,
>
> I want to suggest a few optimizations. The problem might go away but
> at least performance should improve.
> The freeze problems could have many reasons, the Spark UI SQL pages
> and stages detail pages would be useful. You can send them privately,
> if you wish.
>
> 1. the repartition(1) should be replaced by coalesce(1). The former
> will shuffle all data, while the latter will read in the existing
> partitions and not shuffle them again.
> 2. Repartitioning to a single partition is discouraged, unless you can
> guarantee the data fit into one worker's memory.
> 3. You can compute Insert and Update in one go, so that you don't have
> to join with df_reference twice.
>
> |df_actions =
> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
> how="left") \ .withColumn('|||_action|',
> when(||||col('b.hashkey')||.isNull,
> 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
> 'Update')) \| .select(col('_action'), *df_source_hashed) \
> .dropDuplicates() \ .cache() |
>
> Since df_actions is cached, you can count inserts and updates quickly
> with only that one join in df_actions:
>
> |inserts_count = df_actions|||.where(col('_action') === 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 'Update')|.count()|
>
> And you can get rid of the union:
>
> |df_output = df_actions.where(col('_action').isNotNull) |
>
> If you have to write that output to parquet anyway, then you can get
> the count quickly from the parquet file if it is partitioned by the
> _action column (Spark then only looks into parquet's metadata to get
> the count, it does not read any row):
>
> |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
> df_output =
> |||sql_context.read.parquet('|||||/path/to/output.parquet|')
> |inserts_count = |||df_output|.where(col('_action') ===
> 'Insert').count() updates_count = |||df_output|.where(col('_action')
> === 'Update').count() |
>
> These are all just sketches, but I am sure you get the idea.
>
> Enrico
>
>
> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>> Hi,
>>
>> I am currently working on an app using PySpark to produce an insert
>> and update daily delta capture, being outputted as Parquet.  This is
>> running on a 8 core 32 GB Linux server in standalone mode (set to 6
>> worker cores of 2GB memory each) running Spark 2.4.3.
>>
>> This is being achieved by reading in data from a TSQL database, into
>> a dataframe, which has a hash of all records appended to it and
>> comparing it to a dataframe from yesterdays data (which has been
>> saved also as parquet).
>>
>> As part of the monitoring and logging, I am trying to count the
>> number of records for the respective actions.  Example code:
>> |df_source = spark_session.read.format('jdbc')..... df_reference =
>> sql_context.read.parquet('/path/to/reference.parquet')
>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
>> *df_source.columns))) \ .cache() df_inserts =
>> df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>> .select(lit('Insert').alias('_action'), *df_source_hashed) \
>> .dropDuplicates() \ .cache() inserts_count = df_inserts.count()
>> df_updates =
>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>> how="inner") \ .select(lit('Update').alias('_action'),
>> *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \
>> .dropDuplicates() \ .cache() updates_count = df_updates.count()
>> df_output = df_inserts.union(df_updates)
>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
>> The above code is running two occurrences concurrently via Python
>> threading.Thread (this is to try and overcome the network bottle
>> neck connecting to the database server).
>>
>> What I am finding is I am getting some very inconsistent behavior
>> with the counts.  Occasionally, it appears that it will freeze up on
>> a count operation for a few minutes and quite often that specific
>> data frame will have zero records in it.  According to the DAG
>> (which I am not 100% sure how to read) the following is the
>> processing flow:
>>
>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0
>>  => WholeStageCodegen/MapPartitionsRDD [75]count at
>> NativeMethodAccessorImpl.java:0  =>
>> InMemoryTableScan/MapPartitionsRDD [78]count at
>> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at
>> NativeMethodAccessorImpl.java:0 =>
>> WholeStageCodegen/MapPartitionsRDD [80]count at
>> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>> [81]count at NativeMethodAccessorImpl.java:0
>>
>> The other observation I have found that if I remove the counts from
>> the data frame operations and instead open the outputted parquet
>> field and count using a
>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action")
>> == "Insert").count()` command, I am reducing my run-times by around
>> 20 to 30%.  In my feeble mind, opening up the outputs and re-reading
>> them seems counter-intuitive.
>>
>> Is anyone able to give me some guidance on why or how to ensure that
>> I am doing the above as efficiently as possible?
>>
>> Best Regards
>> Ashley


--
nicolas paris

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Questions about count() performance with dataframes and parquet files

Enrico Minack
It is not about very large or small, it is about how large your cluster
is w.r.t. your data. Caching is only useful if you have the respective
memory available across your executors. Otherwise you could either
materialize the Dataframe on HDFS (e.g. parquet or checkpoint) or indeed
have to do the join twice. It's a memory-over-CPU trade-off.

Enrico


Am 17.02.20 um 22:06 schrieb Nicolas PARIS:

>> .dropDuplicates() \ .cache() |
>> Since df_actions is cached, you can count inserts and updates quickly
>> with only that one join in df_actions:
> Hi Enrico. I am wondering if this is ok for very large tables ? Is
> caching faster than recomputing both insert/update ?
>
> Thanks
>
> Enrico Minack <[hidden email]> writes:
>
>> Ashley,
>>
>> I want to suggest a few optimizations. The problem might go away but
>> at least performance should improve.
>> The freeze problems could have many reasons, the Spark UI SQL pages
>> and stages detail pages would be useful. You can send them privately,
>> if you wish.
>>
>> 1. the repartition(1) should be replaced by coalesce(1). The former
>> will shuffle all data, while the latter will read in the existing
>> partitions and not shuffle them again.
>> 2. Repartitioning to a single partition is discouraged, unless you can
>> guarantee the data fit into one worker's memory.
>> 3. You can compute Insert and Update in one go, so that you don't have
>> to join with df_reference twice.
>>
>> |df_actions =
>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>> how="left") \ .withColumn('|||_action|',
>> when(||||col('b.hashkey')||.isNull,
>> 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
>> 'Update')) \| .select(col('_action'), *df_source_hashed) \
>> .dropDuplicates() \ .cache() |
>>
>> Since df_actions is cached, you can count inserts and updates quickly
>> with only that one join in df_actions:
>>
>> |inserts_count = df_actions|||.where(col('_action') === 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 'Update')|.count()|
>>
>> And you can get rid of the union:
>>
>> |df_output = df_actions.where(col('_action').isNotNull) |
>>
>> If you have to write that output to parquet anyway, then you can get
>> the count quickly from the parquet file if it is partitioned by the
>> _action column (Spark then only looks into parquet's metadata to get
>> the count, it does not read any row):
>>
>> |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
>> df_output =
>> |||sql_context.read.parquet('|||||/path/to/output.parquet|')
>> |inserts_count = |||df_output|.where(col('_action') ===
>> 'Insert').count() updates_count = |||df_output|.where(col('_action')
>> === 'Update').count() |
>>
>> These are all just sketches, but I am sure you get the idea.
>>
>> Enrico
>>
>>
>> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>>> Hi,
>>>
>>> I am currently working on an app using PySpark to produce an insert
>>> and update daily delta capture, being outputted as Parquet.  This is
>>> running on a 8 core 32 GB Linux server in standalone mode (set to 6
>>> worker cores of 2GB memory each) running Spark 2.4.3.
>>>
>>> This is being achieved by reading in data from a TSQL database, into
>>> a dataframe, which has a hash of all records appended to it and
>>> comparing it to a dataframe from yesterdays data (which has been
>>> saved also as parquet).
>>>
>>> As part of the monitoring and logging, I am trying to count the
>>> number of records for the respective actions.  Example code:
>>> |df_source = spark_session.read.format('jdbc')..... df_reference =
>>> sql_context.read.parquet('/path/to/reference.parquet')
>>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
>>> *df_source.columns))) \ .cache() df_inserts =
>>> df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>>> .select(lit('Insert').alias('_action'), *df_source_hashed) \
>>> .dropDuplicates() \ .cache() inserts_count = df_inserts.count()
>>> df_updates =
>>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>>> how="inner") \ .select(lit('Update').alias('_action'),
>>> *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \
>>> .dropDuplicates() \ .cache() updates_count = df_updates.count()
>>> df_output = df_inserts.union(df_updates)
>>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
>>> The above code is running two occurrences concurrently via Python
>>> threading.Thread (this is to try and overcome the network bottle
>>> neck connecting to the database server).
>>>
>>> What I am finding is I am getting some very inconsistent behavior
>>> with the counts.  Occasionally, it appears that it will freeze up on
>>> a count operation for a few minutes and quite often that specific
>>> data frame will have zero records in it.  According to the DAG
>>> (which I am not 100% sure how to read) the following is the
>>> processing flow:
>>>
>>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0
>>>   => WholeStageCodegen/MapPartitionsRDD [75]count at
>>> NativeMethodAccessorImpl.java:0  =>
>>> InMemoryTableScan/MapPartitionsRDD [78]count at
>>> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at
>>> NativeMethodAccessorImpl.java:0 =>
>>> WholeStageCodegen/MapPartitionsRDD [80]count at
>>> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>>> [81]count at NativeMethodAccessorImpl.java:0
>>>
>>> The other observation I have found that if I remove the counts from
>>> the data frame operations and instead open the outputted parquet
>>> field and count using a
>>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action")
>>> == "Insert").count()` command, I am reducing my run-times by around
>>> 20 to 30%.  In my feeble mind, opening up the outputs and re-reading
>>> them seems counter-intuitive.
>>>
>>> Is anyone able to give me some guidance on why or how to ensure that
>>> I am doing the above as efficiently as possible?
>>>
>>> Best Regards
>>> Ashley
>
> --
> nicolas paris
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Questions about count() performance with dataframes and parquet files

Nicolas Paris-2
> either materialize the Dataframe on HDFS (e.g. parquet or checkpoint)

I wonder if avro is a better candidate for this because it's row
oriented it should be faster to write/read for such a task. Never heard
about checkpoint.

Enrico Minack <[hidden email]> writes:

> It is not about very large or small, it is about how large your
> cluster is w.r.t. your data. Caching is only useful if you have the
> respective memory available across your executors. Otherwise you could
> either materialize the Dataframe on HDFS (e.g. parquet or checkpoint)
> or indeed have to do the join twice. It's a memory-over-CPU trade-off.
>
> Enrico
>
>
> Am 17.02.20 um 22:06 schrieb Nicolas PARIS:
>>> .dropDuplicates() \ .cache() |
>>> Since df_actions is cached, you can count inserts and updates quickly
>>> with only that one join in df_actions:
>> Hi Enrico. I am wondering if this is ok for very large tables ? Is
>> caching faster than recomputing both insert/update ?
>>
>> Thanks
>>
>> Enrico Minack <[hidden email]> writes:
>>
>>> Ashley,
>>>
>>> I want to suggest a few optimizations. The problem might go away but
>>> at least performance should improve.
>>> The freeze problems could have many reasons, the Spark UI SQL pages
>>> and stages detail pages would be useful. You can send them privately,
>>> if you wish.
>>>
>>> 1. the repartition(1) should be replaced by coalesce(1). The former
>>> will shuffle all data, while the latter will read in the existing
>>> partitions and not shuffle them again.
>>> 2. Repartitioning to a single partition is discouraged, unless you can
>>> guarantee the data fit into one worker's memory.
>>> 3. You can compute Insert and Update in one go, so that you don't have
>>> to join with df_reference twice.
>>>
>>> |df_actions =
>>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>>> how="left") \ .withColumn('|||_action|',
>>> when(||||col('b.hashkey')||.isNull,
>>> 'Insert').otherwise(col(|||'a.hashkey') != col('b.hashkey'),
>>> 'Update')) \| .select(col('_action'), *df_source_hashed) \
>>> .dropDuplicates() \ .cache() |
>>>
>>> Since df_actions is cached, you can count inserts and updates quickly
>>> with only that one join in df_actions:
>>>
>>> |inserts_count = df_actions|||.where(col('_action') === 'Insert')|.count()||updates_count = df_actions|||.where(col('_action') === 'Update')|.count()|
>>>
>>> And you can get rid of the union:
>>>
>>> |df_output = df_actions.where(col('_action').isNotNull) |
>>>
>>> If you have to write that output to parquet anyway, then you can get
>>> the count quickly from the parquet file if it is partitioned by the
>>> _action column (Spark then only looks into parquet's metadata to get
>>> the count, it does not read any row):
>>>
>>> |df_output.repartition(1).write.partitionBy('_action').format('parquet').mode('overwrite').save('/path/to/output.parquet')
>>> df_output =
>>> |||sql_context.read.parquet('|||||/path/to/output.parquet|')
>>> |inserts_count = |||df_output|.where(col('_action') ===
>>> 'Insert').count() updates_count = |||df_output|.where(col('_action')
>>> === 'Update').count() |
>>>
>>> These are all just sketches, but I am sure you get the idea.
>>>
>>> Enrico
>>>
>>>
>>> Am 13.02.20 um 05:08 schrieb Ashley Hoff:
>>>> Hi,
>>>>
>>>> I am currently working on an app using PySpark to produce an insert
>>>> and update daily delta capture, being outputted as Parquet.  This is
>>>> running on a 8 core 32 GB Linux server in standalone mode (set to 6
>>>> worker cores of 2GB memory each) running Spark 2.4.3.
>>>>
>>>> This is being achieved by reading in data from a TSQL database, into
>>>> a dataframe, which has a hash of all records appended to it and
>>>> comparing it to a dataframe from yesterdays data (which has been
>>>> saved also as parquet).
>>>>
>>>> As part of the monitoring and logging, I am trying to count the
>>>> number of records for the respective actions.  Example code:
>>>> |df_source = spark_session.read.format('jdbc')..... df_reference =
>>>> sql_context.read.parquet('/path/to/reference.parquet')
>>>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('',
>>>> *df_source.columns))) \ .cache() df_inserts =
>>>> df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>>>> .select(lit('Insert').alias('_action'), *df_source_hashed) \
>>>> .dropDuplicates() \ .cache() inserts_count = df_inserts.count()
>>>> df_updates =
>>>> df_source_hashed.alias('a').join(df_reference.alias('b'), pk_list,
>>>> how="inner") \ .select(lit('Update').alias('_action'),
>>>> *df_source_hashed) \ .where(col('a.hashkey') != col('b.hashkey')) \
>>>> .dropDuplicates() \ .cache() updates_count = df_updates.count()
>>>> df_output = df_inserts.union(df_updates)
>>>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')|
>>>> The above code is running two occurrences concurrently via Python
>>>> threading.Thread (this is to try and overcome the network bottle
>>>> neck connecting to the database server).
>>>>
>>>> What I am finding is I am getting some very inconsistent behavior
>>>> with the counts.  Occasionally, it appears that it will freeze up on
>>>> a count operation for a few minutes and quite often that specific
>>>> data frame will have zero records in it.  According to the DAG
>>>> (which I am not 100% sure how to read) the following is the
>>>> processing flow:
>>>>
>>>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0
>>>>   => WholeStageCodegen/MapPartitionsRDD [75]count at
>>>> NativeMethodAccessorImpl.java:0  =>
>>>> InMemoryTableScan/MapPartitionsRDD [78]count at
>>>> NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count at
>>>> NativeMethodAccessorImpl.java:0 =>
>>>> WholeStageCodegen/MapPartitionsRDD [80]count at
>>>> NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
>>>> [81]count at NativeMethodAccessorImpl.java:0
>>>>
>>>> The other observation I have found that if I remove the counts from
>>>> the data frame operations and instead open the outputted parquet
>>>> field and count using a
>>>> `sql_context.read.load('/path/to/output.parquet').filter(col("_action")
>>>> == "Insert").count()` command, I am reducing my run-times by around
>>>> 20 to 30%.  In my feeble mind, opening up the outputs and re-reading
>>>> them seems counter-intuitive.
>>>>
>>>> Is anyone able to give me some guidance on why or how to ensure that
>>>> I am doing the above as efficiently as possible?
>>>>
>>>> Best Regards
>>>> Ashley
>>
>> --
>> nicolas paris
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>


--
nicolas paris

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]