Spark job's driver programe consums too much memory

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark job's driver programe consums too much memory

James Starks
I have a Spark job that read data from database. By increasing submit parameter '--driver-memory 25g' the job can works without a problem locally but not in prod env because prod master do not have enough capacity.

So I have a few questions:

-  What functions such as collecct() would cause the data to be sent back to the driver program? 
  My job so far merely uses `as`, `filter`, `map`, and `filter`.

- Is it possible to write data (in parquet format for instance) to hdfs directly from the executor? If so how can I do (any code snippet, doc for reference, or what keyword to search cause can't find by e.g. `spark direct executor hdfs write`)?

Thanks



 
Reply | Threaded
Open this post in threaded view
|

Re: Spark job's driver programe consums too much memory

Apostolos N. Papadopoulos
Dear James,

- check the Spark documentation to see the actions that return a lot of
data back to the driver. One of these actions is collect(). However,
take(x) is an action, also reduce() is an action.

Before executing collect() find out what is the size of your RDD/DF.

- I cannot understand the phrase "hdfs directly from the executor". You
can specify an hdfs file as your input and also you can use hdfs to
store your output.


regards,

Apostolos



On 07/09/2018 05:04 μμ, James Starks wrote:

> I have a Spark job that read data from database. By increasing submit
> parameter '--driver-memory 25g' the job can works without a problem
> locally but not in prod env because prod master do not have enough
> capacity.
>
> So I have a few questions:
>
> -  What functions such as collecct() would cause the data to be sent
> back to the driver program?
>   My job so far merely uses `as`, `filter`, `map`, and `filter`.
>
> - Is it possible to write data (in parquet format for instance) to
> hdfs directly from the executor? If so how can I do (any code snippet,
> doc for reference, or what keyword to search cause can't find by e.g.
> `spark direct executor hdfs write`)?
>
> Thanks
>
>
>
>

--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: [hidden email]
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark job's driver programe consums too much memory

James Starks

Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking doc shows that my spark doesn't use those actions functions. But saveXXXX functions looks resembling the function df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my spark job uses. Therefore I am thinking maybe that's the reason why my spark job driver consumes such amount of memory.

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions

My spark job's driver program consumes too much memory, so I want to prevent that by writing data to hdfs at the executor side, instead of waiting those data to be sent back to the driver program (then writing to hdfs). This is because our worker servers have bigger memory size than the one that runs driver program. If I can write data to hdfs at executor, then the driver memory for my spark job can be reduced.

Otherwise does Spark support streaming read from database (i.e. spark streaming + spark sql)?

Thanks for your reply.



‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos <[hidden email]> wrote:

> Dear James,
>
> -   check the Spark documentation to see the actions that return a lot of
>     data back to the driver. One of these actions is collect(). However,
>     take(x) is an action, also reduce() is an action.
>
>     Before executing collect() find out what is the size of your RDD/DF.
>
> -   I cannot understand the phrase "hdfs directly from the executor". You
>     can specify an hdfs file as your input and also you can use hdfs to
>     store your output.
>
>     regards,
>
>     Apostolos
>
>     On 07/09/2018 05:04 μμ, James Starks wrote:
>
>
> > I have a Spark job that read data from database. By increasing submit
> > parameter '--driver-memory 25g' the job can works without a problem
> > locally but not in prod env because prod master do not have enough
> > capacity.
> > So I have a few questions:
> > -  What functions such as collecct() would cause the data to be sent
> > back to the driver program?
> >   My job so far merely uses `as`, `filter`, `map`, and `filter`.
> >
> > -   Is it possible to write data (in parquet format for instance) to
> >     hdfs directly from the executor? If so how can I do (any code snippet,
> >     doc for reference, or what keyword to search cause can't find by e.g.
> >     `spark direct executor hdfs write`)?
> >
> >
> > Thanks
>
> --
>
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: [hidden email]
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol
>
>
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> To unsubscribe e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark job's driver programe consums too much memory

Apostolos N. Papadopoulos
You are putting all together and this does not make sense. Writing data
to HDFS does not require that all data should be transfered back to the
driver and THEN saved to HDFS.

This would be a disaster and it would never scale. I suggest to check
the documentation more carefully because I believe you are a bit confused.

regards,

Apostolos



On 07/09/2018 05:39 μμ, James Starks wrote:

> Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking doc shows that my spark doesn't use those actions functions. But saveXXXX functions looks resembling the function df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my spark job uses. Therefore I am thinking maybe that's the reason why my spark job driver consumes such amount of memory.
>
> https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions
>
> My spark job's driver program consumes too much memory, so I want to prevent that by writing data to hdfs at the executor side, instead of waiting those data to be sent back to the driver program (then writing to hdfs). This is because our worker servers have bigger memory size than the one that runs driver program. If I can write data to hdfs at executor, then the driver memory for my spark job can be reduced.
>
> Otherwise does Spark support streaming read from database (i.e. spark streaming + spark sql)?
>
> Thanks for your reply.
>
>
>
> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos <[hidden email]> wrote:
>
>> Dear James,
>>
>> -   check the Spark documentation to see the actions that return a lot of
>>      data back to the driver. One of these actions is collect(). However,
>>      take(x) is an action, also reduce() is an action.
>>
>>      Before executing collect() find out what is the size of your RDD/DF.
>>
>> -   I cannot understand the phrase "hdfs directly from the executor". You
>>      can specify an hdfs file as your input and also you can use hdfs to
>>      store your output.
>>
>>      regards,
>>
>>      Apostolos
>>
>>      On 07/09/2018 05:04 μμ, James Starks wrote:
>>
>>
>>> I have a Spark job that read data from database. By increasing submit
>>> parameter '--driver-memory 25g' the job can works without a problem
>>> locally but not in prod env because prod master do not have enough
>>> capacity.
>>> So I have a few questions:
>>> -  What functions such as collecct() would cause the data to be sent
>>> back to the driver program?
>>>    My job so far merely uses `as`, `filter`, `map`, and `filter`.
>>>
>>> -   Is it possible to write data (in parquet format for instance) to
>>>      hdfs directly from the executor? If so how can I do (any code snippet,
>>>      doc for reference, or what keyword to search cause can't find by e.g.
>>>      `spark direct executor hdfs write`)?
>>>
>>>
>>> Thanks
>> --
>>
>> Apostolos N. Papadopoulos, Associate Professor
>> Department of Informatics
>> Aristotle University of Thessaloniki
>> Thessaloniki, GREECE
>> tel: ++0030312310991918
>> email: [hidden email]
>> twitter: @papadopoulos_ap
>> web: http://datalab.csd.auth.gr/~apostol
>>
>>
>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>>
>> To unsubscribe e-mail: [hidden email]
>
>

--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: [hidden email]
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark job's driver programe consums too much memory

James Starks
Yes I think I am confused because originally my thought was that executor only requires 10g then driver ideally do not need to consume more than 10g or at least not more than 20g. But this is not the case. My configuration is setting --dervier-memory to 25g and --executor-memory 10g. And my program basically only uses `filter`, `map`, `write.mode().parquet` as below (main logic)

    val df = spark.read.format("jdbc")...option("dbtable", "select * from mytable where filedX <> ''")...load() /* sql returns around 8MM records. */
    df.createOrReplaceTempView("newtable")
    val newdf = spark.sql("select field1, ..., filedN from newtable" /* around 50 fields */).as[MyCaseClass].filter {...}.map { ... }.filter { ... }
     newdf.wrie.mode(...).parquet(...)

So I don't understand why driver program need such huge memory? And I don't find related doc explaining this, either spark website or through google (perhaps I miss it by using wrong keyword). Any places that  may contain pointer to this?

I appreciate your help.


‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On 7 September 2018 4:46 PM, Apostolos N. Papadopoulos <[hidden email]> wrote:

> You are putting all together and this does not make sense. Writing data
> to HDFS does not require that all data should be transfered back to the
> driver and THEN saved to HDFS.
>
> This would be a disaster and it would never scale. I suggest to check
> the documentation more carefully because I believe you are a bit confused.
>
> regards,
>
> Apostolos
>
> On 07/09/2018 05:39 μμ, James Starks wrote:
>
> > Is df.write.mode(...).parquet("hdfs://..") also actions function? Checking doc shows that my spark doesn't use those actions functions. But saveXXXX functions looks resembling the function df.write.mode(overwrite).parquet("hdfs://path/to/parquet-file") used by my spark job uses. Therefore I am thinking maybe that's the reason why my spark job driver consumes such amount of memory.
> > https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#actions
> > My spark job's driver program consumes too much memory, so I want to prevent that by writing data to hdfs at the executor side, instead of waiting those data to be sent back to the driver program (then writing to hdfs). This is because our worker servers have bigger memory size than the one that runs driver program. If I can write data to hdfs at executor, then the driver memory for my spark job can be reduced.
> > Otherwise does Spark support streaming read from database (i.e. spark streaming + spark sql)?
> > Thanks for your reply.
> > ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> > On 7 September 2018 4:15 PM, Apostolos N. Papadopoulos [hidden email] wrote:
> >
> > > Dear James,
> > >
> > > -   check the Spark documentation to see the actions that return a lot of
> > >     data back to the driver. One of these actions is collect(). However,
> > >     take(x) is an action, also reduce() is an action.
> > >     Before executing collect() find out what is the size of your RDD/DF.
> > >
> > > -   I cannot understand the phrase "hdfs directly from the executor". You
> > >     can specify an hdfs file as your input and also you can use hdfs to
> > >     store your output.
> > >     regards,
> > >     Apostolos
> > >     On 07/09/2018 05:04 μμ, James Starks wrote:
> > >
> > >
> > > > I have a Spark job that read data from database. By increasing submit
> > > > parameter '--driver-memory 25g' the job can works without a problem
> > > > locally but not in prod env because prod master do not have enough
> > > > capacity.
> > > > So I have a few questions:
> > > > -  What functions such as collecct() would cause the data to be sent
> > > > back to the driver program?
> > > >   My job so far merely uses `as`, `filter`, `map`, and `filter`.
> > > >
> > > > -   Is it possible to write data (in parquet format for instance) to
> > > >     hdfs directly from the executor? If so how can I do (any code snippet,
> > > >     doc for reference, or what keyword to search cause can't find by e.g.
> > > >     `spark direct executor hdfs write`)?
> > > >
> > > >
> > > > Thanks
> > > > --
> > >
> > > Apostolos N. Papadopoulos, Associate Professor
> > > Department of Informatics
> > > Aristotle University of Thessaloniki
> > > Thessaloniki, GREECE
> > > tel: ++0030312310991918
> > > email: [hidden email]
> > > twitter: @papadopoulos_ap
> > > web: http://datalab.csd.auth.gr/~apostol
> > >
> > > To unsubscribe e-mail: [hidden email]
>
> --
>
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: [hidden email]
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]