[SS] Any way to optimize memory consumption of SS?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[SS] Any way to optimize memory consumption of SS?

KevinZwx
Hi,

I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the application will be blocked and finally crash in OOM. It's kind of unreasonable. So is there any suggestion to optimize the memory consumption of SS? Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: [SS] Any way to optimize memory consumption of SS?

Michael Armbrust
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <[hidden email]> wrote:
Hi,

I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the application will be blocked and finally crash in OOM. It's kind of unreasonable. So is there any suggestion to optimize the memory consumption of SS? Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: [SS] Any way to optimize memory consumption of SS?

KevinZwx
Yes, my code is shown below(I also post my code in another mail)
/**
    * input
    */
  val logs = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")
    .load()

  /**
    * process
    */
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
    .map(parseFunction)
    .select(
      $"_1".alias("date").cast("timestamp"),
      $"_2".alias("uuid").cast("string")
    )

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")
    .groupBy($"date")
    .count()

  /**
    * output
    */
  val query = results
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .start()

  query.awaitTermination()

and I use play json to parse input logs from kafka ,the parse function is like

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)  
  }

and the java heap space is like (I've increase the executor memory to 15g):

image.png
Michael Armbrust <[hidden email]>于2017年9月13日周三 上午2:23写道:
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <[hidden email]> wrote:
Hi,

I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the application will be blocked and finally crash in OOM. It's kind of unreasonable. So is there any suggestion to optimize the memory consumption of SS? Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: [SS] Any way to optimize memory consumption of SS?

Michael Armbrust
How many UUIDs do you expect to have in a day?  That is likely where all the memory is being used.  Does it work without that?

On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <[hidden email]> wrote:
Yes, my code is shown below(I also post my code in another mail)
/**
    * input
    */
  val logs = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")
    .load()

  /**
    * process
    */
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
    .map(parseFunction)
    .select(
      $"_1".alias("date").cast("timestamp"),
      $"_2".alias("uuid").cast("string")
    )

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")
    .groupBy($"date")
    .count()

  /**
    * output
    */
  val query = results
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .start()

  query.awaitTermination()

and I use play json to parse input logs from kafka ,the parse function is like

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)  
  }

and the java heap space is like (I've increase the executor memory to 15g):

image.png
Michael Armbrust <[hidden email]>于2017年9月13日周三 上午2:23写道:
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <[hidden email]> wrote:
Hi,

I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the application will be blocked and finally crash in OOM. It's kind of unreasonable. So is there any suggestion to optimize the memory consumption of SS? Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: [SS] Any way to optimize memory consumption of SS?

KevinZwx
There is expected to be about 5 million UUIDs in a day. I need to use this field to drop duplicate records and count number. If I simply count numbers without using dropDuplicates it only occupies less than 1g memory. I believe most of the memory is occupied by the state store for keeping the state of dropDuplicates. But I cannot find a way to alleviate the problem.

Michael Armbrust <[hidden email]>于2017年9月15日周五 上午3:35写道:
How many UUIDs do you expect to have in a day?  That is likely where all the memory is being used.  Does it work without that?

On Tue, Sep 12, 2017 at 8:42 PM, 张万新 <[hidden email]> wrote:
Yes, my code is shown below(I also post my code in another mail)
/**
    * input
    */
  val logs = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")
    .load()

  /**
    * process
    */
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
    .map(parseFunction)
    .select(
      $"_1".alias("date").cast("timestamp"),
      $"_2".alias("uuid").cast("string")
    )

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")
    .groupBy($"date")
    .count()

  /**
    * output
    */
  val query = results
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .start()

  query.awaitTermination()

and I use play json to parse input logs from kafka ,the parse function is like

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)  
  }

and the java heap space is like (I've increase the executor memory to 15g):

image.png
Michael Armbrust <[hidden email]>于2017年9月13日周三 上午2:23写道:
Can you show the full query you are running?

On Tue, Sep 12, 2017 at 10:11 AM, 张万新 <[hidden email]> wrote:
Hi,

I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the application will be blocked and finally crash in OOM. It's kind of unreasonable. So is there any suggestion to optimize the memory consumption of SS? Thanks.