Count distinct and driver memory

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Count distinct and driver memory

Lalwani, Jayesh

I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual distinct values. I just need the count. I knoe that there are around 10-16million distinct values

 

Before I write the data frame to parquet, I do df.cache. After writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()

 

When I run this, I see that the memory usage on my driver steadily increases until it starts getting future time outs. I guess it’s spending time in GC. Does countDistinct cause this behavior? Does Spark try to get all 10 million distinct values into the driver? Is countDistinct not recommended for data frames with large number of distinct values?

 

What’s the solution? Should I use approx._count_distinct?

Reply | Threaded
Open this post in threaded view
|

Re: Count distinct and driver memory

Gourav Sengupta
Hi,
6 billion rows is quite small, I can do it in my laptop with around 4 GB RAM. What is the version of SPARK you are using and what is the effective memory that you have per executor?

Regards,
Gourav Sengupta 

On Mon, Oct 19, 2020 at 4:24 AM Lalwani, Jayesh <[hidden email]> wrote:

I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual distinct values. I just need the count. I knoe that there are around 10-16million distinct values

 

Before I write the data frame to parquet, I do df.cache. After writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()

 

When I run this, I see that the memory usage on my driver steadily increases until it starts getting future time outs. I guess it’s spending time in GC. Does countDistinct cause this behavior? Does Spark try to get all 10 million distinct values into the driver? Is countDistinct not recommended for data frames with large number of distinct values?

 

What’s the solution? Should I use approx._count_distinct?

Reply | Threaded
Open this post in threaded view
|

Re: Count distinct and driver memory

Nicolas Paris-2
In reply to this post by Lalwani, Jayesh
> Before I write the data frame to parquet, I do df.cache. After writing
> the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
if you write the df to parquet, why would you also cache it ? caching by
default loads the memory. this might affect  later use, such
collect. the resulting GC can be explained by both caching and collect


Lalwani, Jayesh <[hidden email]> writes:

> I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual distinct values. I just need the count. I knoe that there are around 10-16million distinct values
>
> Before I write the data frame to parquet, I do df.cache. After writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
>
> When I run this, I see that the memory usage on my driver steadily increases until it starts getting future time outs. I guess it’s spending time in GC. Does countDistinct cause this behavior? Does Spark try to get all 10 million distinct values into the driver? Is countDistinct not recommended for data frames with large number of distinct values?
>
> What’s the solution? Should I use approx._count_distinct?


--
nicolas paris

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Count distinct and driver memory

Lalwani, Jayesh
I was caching it because I didn't want to re-execute the DAG when I ran the count query. If you have a spark application with multiple actions, Spark reexecutes the entire DAG for each action unless there is a cache in between. I was trying to avoid reloading 1/2 a terabyte of data.  Also, cache should use up executor memory, not driver memory.

As it turns out cache was the problem. I didn't expect cache to take Executor memory and spill over to disk. I don't know why it's taking driver memory. The input data has millions of partitions which results in millions of tasks. Perhaps the high memory usage is a side effect of caching the results of lots of tasks.

On 10/19/20, 1:27 PM, "Nicolas Paris" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    > Before I write the data frame to parquet, I do df.cache. After writing
    > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
    if you write the df to parquet, why would you also cache it ? caching by
    default loads the memory. this might affect  later use, such
    collect. the resulting GC can be explained by both caching and collect


    Lalwani, Jayesh <[hidden email]> writes:

    > I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual distinct values. I just need the count. I knoe that there are around 10-16million distinct values
    >
    > Before I write the data frame to parquet, I do df.cache. After writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
    >
    > When I run this, I see that the memory usage on my driver steadily increases until it starts getting future time outs. I guess it’s spending time in GC. Does countDistinct cause this behavior? Does Spark try to get all 10 million distinct values into the driver? Is countDistinct not recommended for data frames with large number of distinct values?
    >
    > What’s the solution? Should I use approx._count_distinct?


    --
    nicolas paris

    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Count distinct and driver memory

Mich Talebzadeh
Best to check this in Spark GUI under storage and see what is causing the issue.

HTH



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 19 Oct 2020 at 19:12, Lalwani, Jayesh <[hidden email]> wrote:
I was caching it because I didn't want to re-execute the DAG when I ran the count query. If you have a spark application with multiple actions, Spark reexecutes the entire DAG for each action unless there is a cache in between. I was trying to avoid reloading 1/2 a terabyte of data.  Also, cache should use up executor memory, not driver memory.

As it turns out cache was the problem. I didn't expect cache to take Executor memory and spill over to disk. I don't know why it's taking driver memory. The input data has millions of partitions which results in millions of tasks. Perhaps the high memory usage is a side effect of caching the results of lots of tasks.

On 10/19/20, 1:27 PM, "Nicolas Paris" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    > Before I write the data frame to parquet, I do df.cache. After writing
    > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
    if you write the df to parquet, why would you also cache it ? caching by
    default loads the memory. this might affect  later use, such
    collect. the resulting GC can be explained by both caching and collect


    Lalwani, Jayesh <[hidden email]> writes:

    > I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual distinct values. I just need the count. I knoe that there are around 10-16million distinct values
    >
    > Before I write the data frame to parquet, I do df.cache. After writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
    >
    > When I run this, I see that the memory usage on my driver steadily increases until it starts getting future time outs. I guess it’s spending time in GC. Does countDistinct cause this behavior? Does Spark try to get all 10 million distinct values into the driver? Is countDistinct not recommended for data frames with large number of distinct values?
    >
    > What’s the solution? Should I use approx._count_distinct?


    --
    nicolas paris

    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Count distinct and driver memory

Nicolas Paris-2
In reply to this post by Lalwani, Jayesh
> I was caching it because I didn't want to re-execute the DAG when I
> ran the count query. If you have a spark application with multiple
> actions, Spark reexecutes the entire DAG for each action unless there
> is a cache in between. I was trying to avoid reloading 1/2 a terabyte
> of data.  Also, cache should use up executor memory, not driver
> memory.
why not counting the parquet file instead? writing/reading a parquet
files is more efficients than caching in my experience.
if you really need caching you could choose a better strategy such
DISK.

Lalwani, Jayesh <[hidden email]> writes:

> I was caching it because I didn't want to re-execute the DAG when I ran the count query. If you have a spark application with multiple actions, Spark reexecutes the entire DAG for each action unless there is a cache in between. I was trying to avoid reloading 1/2 a terabyte of data.  Also, cache should use up executor memory, not driver memory.
>
> As it turns out cache was the problem. I didn't expect cache to take Executor memory and spill over to disk. I don't know why it's taking driver memory. The input data has millions of partitions which results in millions of tasks. Perhaps the high memory usage is a side effect of caching the results of lots of tasks.
>
> On 10/19/20, 1:27 PM, "Nicolas Paris" <[hidden email]> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>     > Before I write the data frame to parquet, I do df.cache. After writing
>     > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
>     if you write the df to parquet, why would you also cache it ? caching by
>     default loads the memory. this might affect  later use, such
>     collect. the resulting GC can be explained by both caching and collect
>
>
>     Lalwani, Jayesh <[hidden email]> writes:
>
>     > I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual distinct values. I just need the count. I knoe that there are around 10-16million distinct values
>     >
>     > Before I write the data frame to parquet, I do df.cache. After writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
>     >
>     > When I run this, I see that the memory usage on my driver steadily increases until it starts getting future time outs. I guess it’s spending time in GC. Does countDistinct cause this behavior? Does Spark try to get all 10 million distinct values into the driver? Is countDistinct not recommended for data frames with large number of distinct values?
>     >
>     > What’s the solution? Should I use approx._count_distinct?
>
>
>     --
>     nicolas paris
>
>     ---------------------------------------------------------------------
>     To unsubscribe e-mail: [hidden email]
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]


--
nicolas paris

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Count distinct and driver memory

ayan guha
Do not do collect. This brings results back to driver. instead do count distinct and write it out. 

On Tue, 20 Oct 2020 at 6:43 am, Nicolas Paris <[hidden email]> wrote:
> I was caching it because I didn't want to re-execute the DAG when I
> ran the count query. If you have a spark application with multiple
> actions, Spark reexecutes the entire DAG for each action unless there
> is a cache in between. I was trying to avoid reloading 1/2 a terabyte
> of data.  Also, cache should use up executor memory, not driver
> memory.
why not counting the parquet file instead? writing/reading a parquet
files is more efficients than caching in my experience.
if you really need caching you could choose a better strategy such
DISK.

Lalwani, Jayesh <[hidden email]> writes:

> I was caching it because I didn't want to re-execute the DAG when I ran the count query. If you have a spark application with multiple actions, Spark reexecutes the entire DAG for each action unless there is a cache in between. I was trying to avoid reloading 1/2 a terabyte of data.  Also, cache should use up executor memory, not driver memory.
>
> As it turns out cache was the problem. I didn't expect cache to take Executor memory and spill over to disk. I don't know why it's taking driver memory. The input data has millions of partitions which results in millions of tasks. Perhaps the high memory usage is a side effect of caching the results of lots of tasks.
>
> On 10/19/20, 1:27 PM, "Nicolas Paris" <[hidden email]> wrote:
>
>     CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
>
>
>
>     > Before I write the data frame to parquet, I do df.cache. After writing
>     > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
>     if you write the df to parquet, why would you also cache it ? caching by
>     default loads the memory. this might affect  later use, such
>     collect. the resulting GC can be explained by both caching and collect
>
>
>     Lalwani, Jayesh <[hidden email]> writes:
>
>     > I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual distinct values. I just need the count. I knoe that there are around 10-16million distinct values
>     >
>     > Before I write the data frame to parquet, I do df.cache. After writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
>     >
>     > When I run this, I see that the memory usage on my driver steadily increases until it starts getting future time outs. I guess it’s spending time in GC. Does countDistinct cause this behavior? Does Spark try to get all 10 million distinct values into the driver? Is countDistinct not recommended for data frames with large number of distinct values?
>     >
>     > What’s the solution? Should I use approx._count_distinct?
>
>
>     --
>     nicolas paris
>
>     ---------------------------------------------------------------------
>     To unsubscribe e-mail: [hidden email]
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]


--
nicolas paris

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Count distinct and driver memory

Raghavendra Ganesh
In reply to this post by Lalwani, Jayesh
Spark provides multiple options for caching (including disk). Have you tried caching to disk ?
--
Raghavendra


On Mon, Oct 19, 2020 at 11:41 PM Lalwani, Jayesh <[hidden email]> wrote:
I was caching it because I didn't want to re-execute the DAG when I ran the count query. If you have a spark application with multiple actions, Spark reexecutes the entire DAG for each action unless there is a cache in between. I was trying to avoid reloading 1/2 a terabyte of data.  Also, cache should use up executor memory, not driver memory.

As it turns out cache was the problem. I didn't expect cache to take Executor memory and spill over to disk. I don't know why it's taking driver memory. The input data has millions of partitions which results in millions of tasks. Perhaps the high memory usage is a side effect of caching the results of lots of tasks.

On 10/19/20, 1:27 PM, "Nicolas Paris" <[hidden email]> wrote:

    CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



    > Before I write the data frame to parquet, I do df.cache. After writing
    > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
    if you write the df to parquet, why would you also cache it ? caching by
    default loads the memory. this might affect  later use, such
    collect. the resulting GC can be explained by both caching and collect


    Lalwani, Jayesh <[hidden email]> writes:

    > I have a Dataframe with around 6 billion rows, and about 20 columns. First of all, I want to write this dataframe out to parquet. The, Out of the 20 columns, I have 3 columns of interest, and I want to find how many distinct values of the columns are there in the file. I don’t need the actual distinct values. I just need the count. I knoe that there are around 10-16million distinct values
    >
    > Before I write the data frame to parquet, I do df.cache. After writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
    >
    > When I run this, I see that the memory usage on my driver steadily increases until it starts getting future time outs. I guess it’s spending time in GC. Does countDistinct cause this behavior? Does Spark try to get all 10 million distinct values into the driver? Is countDistinct not recommended for data frames with large number of distinct values?
    >
    > What’s the solution? Should I use approx._count_distinct?


    --
    nicolas paris

    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]