Spark-Streaming-Kafka10_2.11 on Spark 2.3

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark-Streaming-Kafka10_2.11 on Spark 2.3

Bryan Jeffrey
Hello.

I am running with Spark 2.3.0 and I am connecting to Kafka and reading data using the spark-streaming-kafka10_2.11 library.  The library appears to work well, but I am seeing some odd scheduling behavior.  Our job reads from three separate Kafka clusters and unions the DStream result together:



The result is that we have a single stage with 322 tasks reading a good amount of data from Kafka.  The summary metrics show a good picture w/ a median task time of 0.5 seconds and a max task time of 4s:




However, if we look at the scheduler delay metric we see a delay of 1.9 minutes.  This is a huge scheduler delay - and so even though we've got tasks that took 4 seconds we're seeing a longer total read time from Kafka.  I have the following Kafka parameters:
  def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, applicationName: String): Map[String, String] =
    Map[String, String](
      "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
      "enable.auto.commit" -> false.toString, // we'll commit these manually
      "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
      "value.deserializer" -> classOf[OinkSpark.Decoders.MixedDecoder].getCanonicalName,
      "partition.assignment.strategy" -> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
      "bootstrap.servers" -> brokers,
      "group.id" -> applicationName,
      "session.timeout.ms" -> 240000.toString,
      "request.timeout.ms"-> 300000.toString
    )

It's possible that I need to change the 'session.timeout.ms' and 'request.timeout.ms' to ensure that the client times out faster or similar. I increased those settings after seeing failures to write offsets to Kafka with the following message: "Failed to write offsets w/ error Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member"

At this point I see no errors related to scheduling or Kafka in the log - but there is sometimes a massive schedule delay communicating with Kafka.  

Does anyone have some thought on the reason?

Thank you,

Bryan Jeffrey


Reply | Threaded
Open this post in threaded view
|

Re: Spark-Streaming-Kafka10_2.11 on Spark 2.3

Bryan Jeffrey
Quick note:

We have speculation enabled, and so we've disabled 'spark.streaming.kafka.consumer.cache.enabled' to work around 19185.  In looking at our jobs, it appears that we only are seeing this latency where there are not a sufficient number of cores to connect to every partition that we're reading from at once.  Adding a small number of executor cores seems to drastically drop the time to complete a batch.  When an executor goes to complete a task and disconnect/reconnect to Kafka perhaps there is significant overhead?

On Fri, Jul 20, 2018 at 2:55 PM, Bryan Jeffrey <[hidden email]> wrote:
Hello.

I am running with Spark 2.3.0 and I am connecting to Kafka and reading data using the spark-streaming-kafka10_2.11 library.  The library appears to work well, but I am seeing some odd scheduling behavior.  Our job reads from three separate Kafka clusters and unions the DStream result together:



The result is that we have a single stage with 322 tasks reading a good amount of data from Kafka.  The summary metrics show a good picture w/ a median task time of 0.5 seconds and a max task time of 4s:




However, if we look at the scheduler delay metric we see a delay of 1.9 minutes.  This is a huge scheduler delay - and so even though we've got tasks that took 4 seconds we're seeing a longer total read time from Kafka.  I have the following Kafka parameters:
  def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, applicationName: String): Map[String, String] =
    Map[String, String](
      "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
      "enable.auto.commit" -> false.toString, // we'll commit these manually
      "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
      "value.deserializer" -> classOf[OinkSpark.Decoders.MixedDecoder].getCanonicalName,
      "partition.assignment.strategy" -> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
      "bootstrap.servers" -> brokers,
      "group.id" -> applicationName,
      "session.timeout.ms" -> 240000.toString,
      "request.timeout.ms"-> 300000.toString
    )

It's possible that I need to change the 'session.timeout.ms' and 'request.timeout.ms' to ensure that the client times out faster or similar. I increased those settings after seeing failures to write offsets to Kafka with the following message: "Failed to write offsets w/ error Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member"

At this point I see no errors related to scheduling or Kafka in the log - but there is sometimes a massive schedule delay communicating with Kafka.  

Does anyone have some thought on the reason?

Thank you,

Bryan Jeffrey



Reply | Threaded
Open this post in threaded view
|

Re: Spark-Streaming-Kafka10_2.11 on Spark 2.3

Cody Koeninger
regarding this message

 "Failed to write offsets w/ error Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member"

how are you committing offsets to kafka?  Specifically, are you committing using a group id that has multiple consumers, i.e. are you re-using the same group id for multiple streams or multiple applications?

On Fri, Jul 20, 2018 at 1:45 PM, Bryan Jeffrey <[hidden email]> wrote:
Quick note:

We have speculation enabled, and so we've disabled 'spark.streaming.kafka.consumer.cache.enabled' to work around 19185.  In looking at our jobs, it appears that we only are seeing this latency where there are not a sufficient number of cores to connect to every partition that we're reading from at once.  Adding a small number of executor cores seems to drastically drop the time to complete a batch.  When an executor goes to complete a task and disconnect/reconnect to Kafka perhaps there is significant overhead?

On Fri, Jul 20, 2018 at 2:55 PM, Bryan Jeffrey <[hidden email]> wrote:
Hello.

I am running with Spark 2.3.0 and I am connecting to Kafka and reading data using the spark-streaming-kafka10_2.11 library.  The library appears to work well, but I am seeing some odd scheduling behavior.  Our job reads from three separate Kafka clusters and unions the DStream result together:



The result is that we have a single stage with 322 tasks reading a good amount of data from Kafka.  The summary metrics show a good picture w/ a median task time of 0.5 seconds and a max task time of 4s:




However, if we look at the scheduler delay metric we see a delay of 1.9 minutes.  This is a huge scheduler delay - and so even though we've got tasks that took 4 seconds we're seeing a longer total read time from Kafka.  I have the following Kafka parameters:
  def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, applicationName: String): Map[String, String] =
    Map[String, String](
      "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
      "enable.auto.commit" -> false.toString, // we'll commit these manually
      "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
      "value.deserializer" -> classOf[OinkSpark.Decoders.MixedDecoder].getCanonicalName,
      "partition.assignment.strategy" -> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
      "bootstrap.servers" -> brokers,
      "group.id" -> applicationName,
      "session.timeout.ms" -> 240000.toString,
      "request.timeout.ms"-> 300000.toString
    )

It's possible that I need to change the 'session.timeout.ms' and 'request.timeout.ms' to ensure that the client times out faster or similar. I increased those settings after seeing failures to write offsets to Kafka with the following message: "Failed to write offsets w/ error Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member"

At this point I see no errors related to scheduling or Kafka in the log - but there is sometimes a massive schedule delay communicating with Kafka.  

Does anyone have some thought on the reason?

Thank you,

Bryan Jeffrey




Reply | Threaded
Open this post in threaded view
|

Re: Spark-Streaming-Kafka10_2.11 on Spark 2.3

Bryan Jeffrey
Cody,

No - the group ID is the application name and topic.


From: Cody Koeninger <[hidden email]>
Sent: Friday, July 20, 2018 5:29:06 PM
To: Bryan Jeffrey
Cc: user
Subject: Re: Spark-Streaming-Kafka10_2.11 on Spark 2.3
 
regarding this message

 "Failed to write offsets w/ error Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member"

how are you committing offsets to kafka?  Specifically, are you committing using a group id that has multiple consumers, i.e. are you re-using the same group id for multiple streams or multiple applications?

On Fri, Jul 20, 2018 at 1:45 PM, Bryan Jeffrey <[hidden email]> wrote:
Quick note:

We have speculation enabled, and so we've disabled 'spark.streaming.kafka.consumer.cache.enabled' to work around 19185.  In looking at our jobs, it appears that we only are seeing this latency where there are not a sufficient number of cores to connect to every partition that we're reading from at once.  Adding a small number of executor cores seems to drastically drop the time to complete a batch.  When an executor goes to complete a task and disconnect/reconnect to Kafka perhaps there is significant overhead?

On Fri, Jul 20, 2018 at 2:55 PM, Bryan Jeffrey <[hidden email]> wrote:
Hello.

I am running with Spark 2.3.0 and I am connecting to Kafka and reading data using the spark-streaming-kafka10_2.11 library.  The library appears to work well, but I am seeing some odd scheduling behavior.  Our job reads from three separate Kafka clusters and unions the DStream result together:



The result is that we have a single stage with 322 tasks reading a good amount of data from Kafka.  The summary metrics show a good picture w/ a median task time of 0.5 seconds and a max task time of 4s:




However, if we look at the scheduler delay metric we see a delay of 1.9 minutes.  This is a huge scheduler delay - and so even though we've got tasks that took 4 seconds we're seeing a longer total read time from Kafka.  I have the following Kafka parameters:
  def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, applicationName: String): Map[String, String] =
    Map[String, String](
      "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
      "enable.auto.commit" -> false.toString, // we'll commit these manually
      "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
      "value.deserializer" -> classOf[OinkSpark.Decoders.MixedDecoder].getCanonicalName,
      "partition.assignment.strategy" -> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
      "bootstrap.servers" -> brokers,
      "group.id" -> applicationName,
      "session.timeout.ms" -> 240000.toString,
      "request.timeout.ms"-> 300000.toString
    )

It's possible that I need to change the 'session.timeout.ms' and 'request.timeout.ms' to ensure that the client times out faster or similar. I increased those settings after seeing failures to write offsets to Kafka with the following message: "Failed to write offsets w/ error Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member"

At this point I see no errors related to scheduling or Kafka in the log - but there is sometimes a massive schedule delay communicating with Kafka.  

Does anyone have some thought on the reason?

Thank you,

Bryan Jeffrey