ConcurrentModificationExceptions with CachedKafkaConsumers

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

ConcurrentModificationExceptions with CachedKafkaConsumers

Bryan Jeffrey
Hello, Spark Users.

We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're have a Spark streaming job, and we're reading a reasonable amount of data from Kafka (40 GB / minute or so).  We would like to move to using the Kafka 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having to modify formats.

We've run into https://issues.apache.org/jira/browse/SPARK-19185, 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've tried to work around it as follows:

1. Disabled consumer caching.  This increased the total job time from ~1 minute per batch to ~1.8 minutes per batch.  This performance penalty is unacceptable for our use-case. We also saw some partitions stop receiving for an extended period of time - I was unable to get a simple repro for this effect though.
2. Disabled speculation and multiple-job concurrency and added caching for the stream directly after reading from Kafka & caching offsets.  This approach seems to work well for simple examples (read from a Kafka topic, write to another topic). However, when we move through more complex logic we continue to see this type of error - despite only creating the stream for a given topic a single time.  We validated that we're creating the stream from a given topic / partition a single time by logging on stream creation, caching the stream and (eventually) calling 'runJob' to actually go and fetch the data. Nonetheless with multiple outputs we see the ConcurrentModificationException.

I've included some code down below.  I would be happy if anyone had debugging tips for the workaround.  However, my main concern is to ensure that the 2.4 version will have a bug fix that will work for Spark Streaming in which multiple input topics map data to multiple outputs. I would also like to understand if the fix (https://github.com/apache/spark/pull/20997) will be backported to Spark 2.3.x

In our code, read looks like the following:

case class StreamLookupKey(topic: Set[String], brokers: String)

private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()

// Given inputs return a direct stream.
def createDirectStream(ssc: StreamingContext,
additionalKafkaParameters: Map[String, String],
brokersToUse: Array[String], // broker1,broker2|broker3,broker4
topicsToUse: Array[String],
applicationName: String,
persist: Option[PersistenceManager],
useOldestOffsets: Boolean,
maxRatePerPartition: Long,
batchSeconds: Int
): DStream[DecodedData] = {
val streams: Array[DStream[DecodedData]] =
brokersToUse.map(brokers => {
val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
val kafkaParameters: Map[String, String] = getKafkaParameters(brokers, useOldestOffsets, groupId) ++ additionalKafkaParameters
logger.info(s"Kafka Params: ${kafkaParameters}")
val topics = topicsToUse.toSet
logger.info(s"Creating Kafka direct connection - ${kafkaParameters.mkString(GeneralConstants.comma)} " +
s"topics: ${topics.mkString(GeneralConstants.comma)} w/ applicationGroup: ${groupId}")

streamMap.getOrElse(StreamLookupKey(topics, brokers), createKafkaStream(ssc, applicationName, topics, brokers, maxRatePerPartition, batchSeconds, kafkaParameters))
})

ssc.union(streams)
}

private def createKafkaStream(ssc: StreamingContext, applicationName: String, topics: Set[String], brokers: String,
maxRatePerPartition: Long, batchSeconds: Int, kafkaParameters: Map[String,String]): DStream[DecodedData] = {
logger.info(s"Creating a stream from Kafka for application ${applicationName} w/ topic ${topics} and " +
s"brokers: ${brokers.split(',').head} with parameters: ${kafkaParameters.mkString("|")}")
try {
val consumerStrategy = ConsumerStrategies.Subscribe[String, DecodedData](topics.toSeq, kafkaParameters)
val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
KafkaUtils.createDirectStream(ssc, locationStrategy = LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)

KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, stream, maxRatePerPartition, batchSeconds)
val result = stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
streamMap += StreamLookupKey(topics, brokers) -> result
result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator: Iterator[_]) => {}))
result
} catch ErrorHandling.safelyCatch {
case e: Exception =>
logger.error("Unable to create direct stream:")
e.printStackTrace()
throw KafkaDirectStreamException(topics.toArray, brokers, e)
}
}
def getKafkaParameters(brokers: String, useOldestOffsets: Boolean, applicationName: String): Map[String, String] =
Map[String, String](
"auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
"enable.auto.commit" -> false.toString, // we'll commit these manually
"key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[Decoders.MixedDecoder].getCanonicalName,
"partition.assignment.strategy" -> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
"bootstrap.servers" -> brokers,
"group.id" -> applicationName,
"session.timeout.ms" -> 240000.toString,
"request.timeout.ms"-> 300000.toString
)
Write code looks like the following:
def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], conv: (T) => Array[Byte], numPartitions: Int): Unit = {
val rddToWrite =
if (numPartitions > 0) {
rdd.repartition(numPartitions)
} else {
rdd
}

// Get session from current threads session
val session = SparkSession.builder().getOrCreate()
val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))), StructType(Array(StructField("value", BinaryType))))
df.selectExpr("CAST('' AS STRING)", "value")
.write
.format("kafka")
.option("kafka.bootstrap.servers", getBrokersToUse(brokers))
.option("compression.type", "gzip")
.option("retries", "3")
.option("topic", topic)
.save()
}
Regards,

Bryan Jeffrey
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

Cody Koeninger
I doubt that fix will get backported to 2.3.x

Are you able to test against master?  2.4 with the fix you linked to
is likely to hit code freeze soon.

From a quick look at your code, I'm not sure why you're mapping over
an array of brokers.  It seems like that would result in different
streams with the same group id, because broker isn't part of your
group id string.

On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <[hidden email]> wrote:

> Hello, Spark Users.
>
> We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> have a Spark streaming job, and we're reading a reasonable amount of data
> from Kafka (40 GB / minute or so).  We would like to move to using the Kafka
> 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having to
> modify formats.
>
> We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've tried to
> work around it as follows:
>
> 1. Disabled consumer caching.  This increased the total job time from ~1
> minute per batch to ~1.8 minutes per batch.  This performance penalty is
> unacceptable for our use-case. We also saw some partitions stop receiving
> for an extended period of time - I was unable to get a simple repro for this
> effect though.
> 2. Disabled speculation and multiple-job concurrency and added caching for
> the stream directly after reading from Kafka & caching offsets.  This
> approach seems to work well for simple examples (read from a Kafka topic,
> write to another topic). However, when we move through more complex logic we
> continue to see this type of error - despite only creating the stream for a
> given topic a single time.  We validated that we're creating the stream from
> a given topic / partition a single time by logging on stream creation,
> caching the stream and (eventually) calling 'runJob' to actually go and
> fetch the data. Nonetheless with multiple outputs we see the
> ConcurrentModificationException.
>
> I've included some code down below.  I would be happy if anyone had
> debugging tips for the workaround.  However, my main concern is to ensure
> that the 2.4 version will have a bug fix that will work for Spark Streaming
> in which multiple input topics map data to multiple outputs. I would also
> like to understand if the fix (https://github.com/apache/spark/pull/20997)
> will be backported to Spark 2.3.x
>
> In our code, read looks like the following:
>
> case class StreamLookupKey(topic: Set[String], brokers: String)
>
> private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
>
> // Given inputs return a direct stream.
> def createDirectStream(ssc: StreamingContext,
>                        additionalKafkaParameters: Map[String, String],
>                        brokersToUse: Array[String], //
> broker1,broker2|broker3,broker4
>                        topicsToUse: Array[String],
>                        applicationName: String,
>                        persist: Option[PersistenceManager],
>                        useOldestOffsets: Boolean,
>                        maxRatePerPartition: Long,
>                        batchSeconds: Int
>                       ): DStream[DecodedData] = {
>   val streams: Array[DStream[DecodedData]] =
>     brokersToUse.map(brokers => {
>       val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>       val kafkaParameters: Map[String, String] = getKafkaParameters(brokers,
> useOldestOffsets, groupId) ++ additionalKafkaParameters
>       logger.info(s"Kafka Params: ${kafkaParameters}")
>       val topics = topicsToUse.toSet
>       logger.info(s"Creating Kafka direct connection -
> ${kafkaParameters.mkString(GeneralConstants.comma)} " +
>         s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> applicationGroup: ${groupId}")
>
>       streamMap.getOrElse(StreamLookupKey(topics, brokers),
> createKafkaStream(ssc, applicationName, topics, brokers,
> maxRatePerPartition, batchSeconds, kafkaParameters))
>     })
>
>   ssc.union(streams)
> }
>
> private def createKafkaStream(ssc: StreamingContext, applicationName:
> String, topics: Set[String], brokers: String,
>                               maxRatePerPartition: Long, batchSeconds: Int,
> kafkaParameters: Map[String,String]): DStream[DecodedData] = {
>   logger.info(s"Creating a stream from Kafka for application
> ${applicationName} w/ topic ${topics} and " +
>     s"brokers: ${brokers.split(',').head} with parameters:
> ${kafkaParameters.mkString("|")}")
>   try {
>     val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
>     val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
>       KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
>
>     KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, stream,
> maxRatePerPartition, batchSeconds)
>     val result =
> stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
>     streamMap += StreamLookupKey(topics, brokers) -> result
>     result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator: Iterator[_])
> => {}))
>     result
>   } catch ErrorHandling.safelyCatch {
>     case e: Exception =>
>       logger.error("Unable to create direct stream:")
>       e.printStackTrace()
>       throw KafkaDirectStreamException(topics.toArray, brokers, e)
>   }
> }
>
> def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
> applicationName: String): Map[String, String] =
>   Map[String, String](
>     "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
>     "enable.auto.commit" -> false.toString, // we'll commit these manually
>     "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>     "value.deserializer" -> classOf[Decoders.MixedDecoder].getCanonicalName,
>     "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>     "bootstrap.servers" -> brokers,
>     "group.id" -> applicationName,
>     "session.timeout.ms" -> 240000.toString,
>     "request.timeout.ms"-> 300000.toString
>   )
>
> Write code looks like the following:
>
> def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], conv:
> (T) => Array[Byte], numPartitions: Int): Unit = {
>   val rddToWrite =
>     if (numPartitions > 0) {
>       rdd.repartition(numPartitions)
>     } else {
>       rdd
>     }
>
>   // Get session from current threads session
>   val session = SparkSession.builder().getOrCreate()
>   val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
> StructType(Array(StructField("value", BinaryType))))
>   df.selectExpr("CAST('' AS STRING)", "value")
>     .write
>     .format("kafka")
>     .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
>     .option("compression.type", "gzip")
>     .option("retries", "3")
>     .option("topic", topic)
>     .save()
> }
>
> Regards,
>
> Bryan Jeffrey

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

Bryan Jeffrey
Cody,

We are connecting to multiple clusters for each topic.  I did experiment this morning with both adding a cluster identifier to the group id, as well as simply moving to use only a single one of our clusters.  Neither of these were successful.  I am not able to run a test against master now.  

Regards,

Bryan Jeffrey




On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <[hidden email]> wrote:
I doubt that fix will get backported to 2.3.x

Are you able to test against master?  2.4 with the fix you linked to
is likely to hit code freeze soon.

From a quick look at your code, I'm not sure why you're mapping over
an array of brokers.  It seems like that would result in different
streams with the same group id, because broker isn't part of your
group id string.

On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <[hidden email]> wrote:
> Hello, Spark Users.
>
> We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> have a Spark streaming job, and we're reading a reasonable amount of data
> from Kafka (40 GB / minute or so).  We would like to move to using the Kafka
> 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having to
> modify formats.
>
> We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've tried to
> work around it as follows:
>
> 1. Disabled consumer caching.  This increased the total job time from ~1
> minute per batch to ~1.8 minutes per batch.  This performance penalty is
> unacceptable for our use-case. We also saw some partitions stop receiving
> for an extended period of time - I was unable to get a simple repro for this
> effect though.
> 2. Disabled speculation and multiple-job concurrency and added caching for
> the stream directly after reading from Kafka & caching offsets.  This
> approach seems to work well for simple examples (read from a Kafka topic,
> write to another topic). However, when we move through more complex logic we
> continue to see this type of error - despite only creating the stream for a
> given topic a single time.  We validated that we're creating the stream from
> a given topic / partition a single time by logging on stream creation,
> caching the stream and (eventually) calling 'runJob' to actually go and
> fetch the data. Nonetheless with multiple outputs we see the
> ConcurrentModificationException.
>
> I've included some code down below.  I would be happy if anyone had
> debugging tips for the workaround.  However, my main concern is to ensure
> that the 2.4 version will have a bug fix that will work for Spark Streaming
> in which multiple input topics map data to multiple outputs. I would also
> like to understand if the fix (https://github.com/apache/spark/pull/20997)
> will be backported to Spark 2.3.x
>
> In our code, read looks like the following:
>
> case class StreamLookupKey(topic: Set[String], brokers: String)
>
> private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
>
> // Given inputs return a direct stream.
> def createDirectStream(ssc: StreamingContext,
>                        additionalKafkaParameters: Map[String, String],
>                        brokersToUse: Array[String], //
> broker1,broker2|broker3,broker4
>                        topicsToUse: Array[String],
>                        applicationName: String,
>                        persist: Option[PersistenceManager],
>                        useOldestOffsets: Boolean,
>                        maxRatePerPartition: Long,
>                        batchSeconds: Int
>                       ): DStream[DecodedData] = {
>   val streams: Array[DStream[DecodedData]] =
>     brokersToUse.map(brokers => {
>       val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>       val kafkaParameters: Map[String, String] = getKafkaParameters(brokers,
> useOldestOffsets, groupId) ++ additionalKafkaParameters
>       logger.info(s"Kafka Params: ${kafkaParameters}")
>       val topics = topicsToUse.toSet
>       logger.info(s"Creating Kafka direct connection -
> ${kafkaParameters.mkString(GeneralConstants.comma)} " +
>         s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> applicationGroup: ${groupId}")
>
>       streamMap.getOrElse(StreamLookupKey(topics, brokers),
> createKafkaStream(ssc, applicationName, topics, brokers,
> maxRatePerPartition, batchSeconds, kafkaParameters))
>     })
>
>   ssc.union(streams)
> }
>
> private def createKafkaStream(ssc: StreamingContext, applicationName:
> String, topics: Set[String], brokers: String,
>                               maxRatePerPartition: Long, batchSeconds: Int,
> kafkaParameters: Map[String,String]): DStream[DecodedData] = {
>   logger.info(s"Creating a stream from Kafka for application
> ${applicationName} w/ topic ${topics} and " +
>     s"brokers: ${brokers.split(',').head} with parameters:
> ${kafkaParameters.mkString("|")}")
>   try {
>     val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
>     val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
>       KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
>
>     KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, stream,
> maxRatePerPartition, batchSeconds)
>     val result =
> stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
>     streamMap += StreamLookupKey(topics, brokers) -> result
>     result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator: Iterator[_])
> => {}))
>     result
>   } catch ErrorHandling.safelyCatch {
>     case e: Exception =>
>       logger.error("Unable to create direct stream:")
>       e.printStackTrace()
>       throw KafkaDirectStreamException(topics.toArray, brokers, e)
>   }
> }
>
> def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
> applicationName: String): Map[String, String] =
>   Map[String, String](
>     "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else "latest"),
>     "enable.auto.commit" -> false.toString, // we'll commit these manually
>     "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>     "value.deserializer" -> classOf[Decoders.MixedDecoder].getCanonicalName,
>     "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>     "bootstrap.servers" -> brokers,
>     "group.id" -> applicationName,
>     "session.timeout.ms" -> 240000.toString,
>     "request.timeout.ms"-> 300000.toString
>   )
>
> Write code looks like the following:
>
> def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String], conv:
> (T) => Array[Byte], numPartitions: Int): Unit = {
>   val rddToWrite =
>     if (numPartitions > 0) {
>       rdd.repartition(numPartitions)
>     } else {
>       rdd
>     }
>
>   // Get session from current threads session
>   val session = SparkSession.builder().getOrCreate()
>   val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
> StructType(Array(StructField("value", BinaryType))))
>   df.selectExpr("CAST('' AS STRING)", "value")
>     .write
>     .format("kafka")
>     .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
>     .option("compression.type", "gzip")
>     .option("retries", "3")
>     .option("topic", topic)
>     .save()
> }
>
> Regards,
>
> Bryan Jeffrey
Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

Cody Koeninger
Just to be 100% sure, when you're logging the group id in
createDirectStream, you no longer see any duplicates?

Regarding testing master, is the blocker that your spark cluster is on
2.3?  There's at least a reasonable chance that building an
application assembly jar that uses the master version just for the
spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster

On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey <[hidden email]> wrote:

> Cody,
>
> We are connecting to multiple clusters for each topic.  I did experiment
> this morning with both adding a cluster identifier to the group id, as well
> as simply moving to use only a single one of our clusters.  Neither of these
> were successful.  I am not able to run a test against master now.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
>
> On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <[hidden email]> wrote:
>>
>> I doubt that fix will get backported to 2.3.x
>>
>> Are you able to test against master?  2.4 with the fix you linked to
>> is likely to hit code freeze soon.
>>
>> From a quick look at your code, I'm not sure why you're mapping over
>> an array of brokers.  It seems like that would result in different
>> streams with the same group id, because broker isn't part of your
>> group id string.
>>
>> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <[hidden email]>
>> wrote:
>> > Hello, Spark Users.
>> >
>> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
>> > We're
>> > have a Spark streaming job, and we're reading a reasonable amount of
>> > data
>> > from Kafka (40 GB / minute or so).  We would like to move to using the
>> > Kafka
>> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
>> > to
>> > modify formats.
>> >
>> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
>> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
>> > tried to
>> > work around it as follows:
>> >
>> > 1. Disabled consumer caching.  This increased the total job time from ~1
>> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
>> > unacceptable for our use-case. We also saw some partitions stop
>> > receiving
>> > for an extended period of time - I was unable to get a simple repro for
>> > this
>> > effect though.
>> > 2. Disabled speculation and multiple-job concurrency and added caching
>> > for
>> > the stream directly after reading from Kafka & caching offsets.  This
>> > approach seems to work well for simple examples (read from a Kafka
>> > topic,
>> > write to another topic). However, when we move through more complex
>> > logic we
>> > continue to see this type of error - despite only creating the stream
>> > for a
>> > given topic a single time.  We validated that we're creating the stream
>> > from
>> > a given topic / partition a single time by logging on stream creation,
>> > caching the stream and (eventually) calling 'runJob' to actually go and
>> > fetch the data. Nonetheless with multiple outputs we see the
>> > ConcurrentModificationException.
>> >
>> > I've included some code down below.  I would be happy if anyone had
>> > debugging tips for the workaround.  However, my main concern is to
>> > ensure
>> > that the 2.4 version will have a bug fix that will work for Spark
>> > Streaming
>> > in which multiple input topics map data to multiple outputs. I would
>> > also
>> > like to understand if the fix
>> > (https://github.com/apache/spark/pull/20997)
>> > will be backported to Spark 2.3.x
>> >
>> > In our code, read looks like the following:
>> >
>> > case class StreamLookupKey(topic: Set[String], brokers: String)
>> >
>> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] =
>> > Map()
>> >
>> > // Given inputs return a direct stream.
>> > def createDirectStream(ssc: StreamingContext,
>> >                        additionalKafkaParameters: Map[String, String],
>> >                        brokersToUse: Array[String], //
>> > broker1,broker2|broker3,broker4
>> >                        topicsToUse: Array[String],
>> >                        applicationName: String,
>> >                        persist: Option[PersistenceManager],
>> >                        useOldestOffsets: Boolean,
>> >                        maxRatePerPartition: Long,
>> >                        batchSeconds: Int
>> >                       ): DStream[DecodedData] = {
>> >   val streams: Array[DStream[DecodedData]] =
>> >     brokersToUse.map(brokers => {
>> >       val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>> >       val kafkaParameters: Map[String, String] =
>> > getKafkaParameters(brokers,
>> > useOldestOffsets, groupId) ++ additionalKafkaParameters
>> >       logger.info(s"Kafka Params: ${kafkaParameters}")
>> >       val topics = topicsToUse.toSet
>> >       logger.info(s"Creating Kafka direct connection -
>> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
>> >         s"topics: ${topics.mkString(GeneralConstants.comma)} w/
>> > applicationGroup: ${groupId}")
>> >
>> >       streamMap.getOrElse(StreamLookupKey(topics, brokers),
>> > createKafkaStream(ssc, applicationName, topics, brokers,
>> > maxRatePerPartition, batchSeconds, kafkaParameters))
>> >     })
>> >
>> >   ssc.union(streams)
>> > }
>> >
>> > private def createKafkaStream(ssc: StreamingContext, applicationName:
>> > String, topics: Set[String], brokers: String,
>> >                               maxRatePerPartition: Long, batchSeconds:
>> > Int,
>> > kafkaParameters: Map[String,String]): DStream[DecodedData] = {
>> >   logger.info(s"Creating a stream from Kafka for application
>> > ${applicationName} w/ topic ${topics} and " +
>> >     s"brokers: ${brokers.split(',').head} with parameters:
>> > ${kafkaParameters.mkString("|")}")
>> >   try {
>> >     val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> > DecodedData](topics.toSeq, kafkaParameters)
>> >     val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
>> >       KafkaUtils.createDirectStream(ssc, locationStrategy =
>> > LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
>> >
>> >     KafkaStreamFactory.writeStreamOffsets(applicationName, brokers,
>> > stream,
>> > maxRatePerPartition, batchSeconds)
>> >     val result =
>> >
>> > stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
>> >     streamMap += StreamLookupKey(topics, brokers) -> result
>> >     result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator:
>> > Iterator[_])
>> > => {}))
>> >     result
>> >   } catch ErrorHandling.safelyCatch {
>> >     case e: Exception =>
>> >       logger.error("Unable to create direct stream:")
>> >       e.printStackTrace()
>> >       throw KafkaDirectStreamException(topics.toArray, brokers, e)
>> >   }
>> > }
>> >
>> > def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
>> > applicationName: String): Map[String, String] =
>> >   Map[String, String](
>> >     "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else
>> > "latest"),
>> >     "enable.auto.commit" -> false.toString, // we'll commit these
>> > manually
>> >     "key.deserializer" ->
>> >
>> > classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>> >     "value.deserializer" ->
>> > classOf[Decoders.MixedDecoder].getCanonicalName,
>> >     "partition.assignment.strategy" ->
>> >
>> > classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>> >     "bootstrap.servers" -> brokers,
>> >     "group.id" -> applicationName,
>> >     "session.timeout.ms" -> 240000.toString,
>> >     "request.timeout.ms"-> 300000.toString
>> >   )
>> >
>> > Write code looks like the following:
>> >
>> > def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String],
>> > conv:
>> > (T) => Array[Byte], numPartitions: Int): Unit = {
>> >   val rddToWrite =
>> >     if (numPartitions > 0) {
>> >       rdd.repartition(numPartitions)
>> >     } else {
>> >       rdd
>> >     }
>> >
>> >   // Get session from current threads session
>> >   val session = SparkSession.builder().getOrCreate()
>> >   val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
>> > StructType(Array(StructField("value", BinaryType))))
>> >   df.selectExpr("CAST('' AS STRING)", "value")
>> >     .write
>> >     .format("kafka")
>> >     .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
>> >     .option("compression.type", "gzip")
>> >     .option("retries", "3")
>> >     .option("topic", topic)
>> >     .save()
>> > }
>> >
>> > Regards,
>> >
>> > Bryan Jeffrey

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

Bryan Jeffrey
Cody,

Yes - I was able to verify that I am not seeing duplicate calls to createDirectStream.  If the spark-streaming-kafka-0-10 will work on a 2.3 cluster I can go ahead and give that a shot.

Regards,

Bryan Jeffrey

On Fri, Aug 31, 2018 at 11:56 AM Cody Koeninger <[hidden email]> wrote:
Just to be 100% sure, when you're logging the group id in
createDirectStream, you no longer see any duplicates?

Regarding testing master, is the blocker that your spark cluster is on
2.3?  There's at least a reasonable chance that building an
application assembly jar that uses the master version just for the
spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster

On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey <[hidden email]> wrote:
> Cody,
>
> We are connecting to multiple clusters for each topic.  I did experiment
> this morning with both adding a cluster identifier to the group id, as well
> as simply moving to use only a single one of our clusters.  Neither of these
> were successful.  I am not able to run a test against master now.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
>
> On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger <[hidden email]> wrote:
>>
>> I doubt that fix will get backported to 2.3.x
>>
>> Are you able to test against master?  2.4 with the fix you linked to
>> is likely to hit code freeze soon.
>>
>> From a quick look at your code, I'm not sure why you're mapping over
>> an array of brokers.  It seems like that would result in different
>> streams with the same group id, because broker isn't part of your
>> group id string.
>>
>> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey <[hidden email]>
>> wrote:
>> > Hello, Spark Users.
>> >
>> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
>> > We're
>> > have a Spark streaming job, and we're reading a reasonable amount of
>> > data
>> > from Kafka (40 GB / minute or so).  We would like to move to using the
>> > Kafka
>> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
>> > to
>> > modify formats.
>> >
>> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
>> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
>> > tried to
>> > work around it as follows:
>> >
>> > 1. Disabled consumer caching.  This increased the total job time from ~1
>> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
>> > unacceptable for our use-case. We also saw some partitions stop
>> > receiving
>> > for an extended period of time - I was unable to get a simple repro for
>> > this
>> > effect though.
>> > 2. Disabled speculation and multiple-job concurrency and added caching
>> > for
>> > the stream directly after reading from Kafka & caching offsets.  This
>> > approach seems to work well for simple examples (read from a Kafka
>> > topic,
>> > write to another topic). However, when we move through more complex
>> > logic we
>> > continue to see this type of error - despite only creating the stream
>> > for a
>> > given topic a single time.  We validated that we're creating the stream
>> > from
>> > a given topic / partition a single time by logging on stream creation,
>> > caching the stream and (eventually) calling 'runJob' to actually go and
>> > fetch the data. Nonetheless with multiple outputs we see the
>> > ConcurrentModificationException.
>> >
>> > I've included some code down below.  I would be happy if anyone had
>> > debugging tips for the workaround.  However, my main concern is to
>> > ensure
>> > that the 2.4 version will have a bug fix that will work for Spark
>> > Streaming
>> > in which multiple input topics map data to multiple outputs. I would
>> > also
>> > like to understand if the fix
>> > (https://github.com/apache/spark/pull/20997)
>> > will be backported to Spark 2.3.x
>> >
>> > In our code, read looks like the following:
>> >
>> > case class StreamLookupKey(topic: Set[String], brokers: String)
>> >
>> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] =
>> > Map()
>> >
>> > // Given inputs return a direct stream.
>> > def createDirectStream(ssc: StreamingContext,
>> >                        additionalKafkaParameters: Map[String, String],
>> >                        brokersToUse: Array[String], //
>> > broker1,broker2|broker3,broker4
>> >                        topicsToUse: Array[String],
>> >                        applicationName: String,
>> >                        persist: Option[PersistenceManager],
>> >                        useOldestOffsets: Boolean,
>> >                        maxRatePerPartition: Long,
>> >                        batchSeconds: Int
>> >                       ): DStream[DecodedData] = {
>> >   val streams: Array[DStream[DecodedData]] =
>> >     brokersToUse.map(brokers => {
>> >       val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>> >       val kafkaParameters: Map[String, String] =
>> > getKafkaParameters(brokers,
>> > useOldestOffsets, groupId) ++ additionalKafkaParameters
>> >       logger.info(s"Kafka Params: ${kafkaParameters}")
>> >       val topics = topicsToUse.toSet
>> >       logger.info(s"Creating Kafka direct connection -
>> > ${kafkaParameters.mkString(GeneralConstants.comma)} " +
>> >         s"topics: ${topics.mkString(GeneralConstants.comma)} w/
>> > applicationGroup: ${groupId}")
>> >
>> >       streamMap.getOrElse(StreamLookupKey(topics, brokers),
>> > createKafkaStream(ssc, applicationName, topics, brokers,
>> > maxRatePerPartition, batchSeconds, kafkaParameters))
>> >     })
>> >
>> >   ssc.union(streams)
>> > }
>> >
>> > private def createKafkaStream(ssc: StreamingContext, applicationName:
>> > String, topics: Set[String], brokers: String,
>> >                               maxRatePerPartition: Long, batchSeconds:
>> > Int,
>> > kafkaParameters: Map[String,String]): DStream[DecodedData] = {
>> >   logger.info(s"Creating a stream from Kafka for application
>> > ${applicationName} w/ topic ${topics} and " +
>> >     s"brokers: ${brokers.split(',').head} with parameters:
>> > ${kafkaParameters.mkString("|")}")
>> >   try {
>> >     val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> > DecodedData](topics.toSeq, kafkaParameters)
>> >     val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
>> >       KafkaUtils.createDirectStream(ssc, locationStrategy =
>> > LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
>> >
>> >     KafkaStreamFactory.writeStreamOffsets(applicationName, brokers,
>> > stream,
>> > maxRatePerPartition, batchSeconds)
>> >     val result =
>> >
>> > stream.map(addConsumerRecordMetadata).persist(GeneralConstants.defaultPersistenceLevel)
>> >     streamMap += StreamLookupKey(topics, brokers) -> result
>> >     result.foreachRDD(rdd => rdd.context.runJob(rdd, (iterator:
>> > Iterator[_])
>> > => {}))
>> >     result
>> >   } catch ErrorHandling.safelyCatch {
>> >     case e: Exception =>
>> >       logger.error("Unable to create direct stream:")
>> >       e.printStackTrace()
>> >       throw KafkaDirectStreamException(topics.toArray, brokers, e)
>> >   }
>> > }
>> >
>> > def getKafkaParameters(brokers: String, useOldestOffsets: Boolean,
>> > applicationName: String): Map[String, String] =
>> >   Map[String, String](
>> >     "auto.offset.reset" -> (if (useOldestOffsets) "earliest" else
>> > "latest"),
>> >     "enable.auto.commit" -> false.toString, // we'll commit these
>> > manually
>> >     "key.deserializer" ->
>> >
>> > classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>> >     "value.deserializer" ->
>> > classOf[Decoders.MixedDecoder].getCanonicalName,
>> >     "partition.assignment.strategy" ->
>> >
>> > classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>> >     "bootstrap.servers" -> brokers,
>> >     "group.id" -> applicationName,
>> >     "session.timeout.ms" -> 240000.toString,
>> >     "request.timeout.ms"-> 300000.toString
>> >   )
>> >
>> > Write code looks like the following:
>> >
>> > def write[T, A](rdd: RDD[T], topic: String, brokers: Array[String],
>> > conv:
>> > (T) => Array[Byte], numPartitions: Int): Unit = {
>> >   val rddToWrite =
>> >     if (numPartitions > 0) {
>> >       rdd.repartition(numPartitions)
>> >     } else {
>> >       rdd
>> >     }
>> >
>> >   // Get session from current threads session
>> >   val session = SparkSession.builder().getOrCreate()
>> >   val df = session.createDataFrame(rddToWrite.map(x => Row(conv(x))),
>> > StructType(Array(StructField("value", BinaryType))))
>> >   df.selectExpr("CAST('' AS STRING)", "value")
>> >     .write
>> >     .format("kafka")
>> >     .option("kafka.bootstrap.servers", getBrokersToUse(brokers))
>> >     .option("compression.type", "gzip")
>> >     .option("retries", "3")
>> >     .option("topic", topic)
>> >     .save()
>> > }
>> >
>> > Regards,
>> >
>> > Bryan Jeffrey