[Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

Sethupathi T
Hi Team,

We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10 (it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception). 

Here is the code where executor construct executor specific group id 


Here are my Questions

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?

Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
ssc,
PreferConsistent,
Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

Gabor Somogyi
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not DStreams what you've mentioned but still relevant):

kafka.group.idstringnonestreaming and batchThe Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because Structured Streaming has another interesting feature:
groupIdPrefixstringspark-kafka-sourcestreaming and batchPrefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T <[hidden email]> wrote:
Hi Team,

We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10 (it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception). 

Here is the code where executor construct executor specific group id 


Here are my Questions

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?

Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
ssc,
PreferConsistent,
Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

Sethupathi T
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use spark streaming (KafkaUtils.createDirectStream) than structured streaming by following this document https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and re-iterating the issue again for better understanding. spark-streaming-kafka-0-10 kafka connector prefix "spark-executor" + group.id for executors, driver uses original group id.

Here is the code where executor construct executor specific group id 


It would be great if you could provide the explanation to the following questions.

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix? at line # 212 in KafkaUtils.scala

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?

Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
ssc,
PreferConsistent,
Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T

On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <[hidden email]> wrote:
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not DStreams what you've mentioned but still relevant):

kafka.group.idstringnonestreaming and batchThe Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because Structured Streaming has another interesting feature:
groupIdPrefixstringspark-kafka-sourcestreaming and batchPrefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T <[hidden email]> wrote:
Hi Team,

We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10 (it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception). 

Here is the code where executor construct executor specific group id 


Here are my Questions

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?

Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
ssc,
PreferConsistent,
Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

Sethupathi T
In reply to this post by Gabor Somogyi
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use spark streaming (KafkaUtils.createDirectStream) than structured streaming by following this document https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and re-iterating the issue again for better understanding. spark-streaming-kafka-0-10 kafka connector prefix "spark-executor" + group.id for executors, driver uses original group id.

Here is the code where executor construct executor specific group id 


It would be great if you could provide the explanation to the following questions.

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix? at line # 212 in KafkaUtils.scala
#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?
Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)
val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T


On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <[hidden email]> wrote:
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not DStreams what you've mentioned but still relevant):

kafka.group.idstringnonestreaming and batchThe Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because Structured Streaming has another interesting feature:
groupIdPrefixstringspark-kafka-sourcestreaming and batchPrefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T <[hidden email]> wrote:
Hi Team,

We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10 (it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception). 

Here is the code where executor construct executor specific group id 


Here are my Questions

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?

Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
ssc,
PreferConsistent,
Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

Gabor Somogyi
Sethupathi,

Let me extract then the important part what I've shared:

1. "This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer"
2. Consumers may eat the data from each other, offset calculation may give back wrong result (that's the reason why "extreme caution" is recommended in Structured Streaming doc which still applies here)
3. yes

BR,
G


On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T <[hidden email]> wrote:
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use spark streaming (KafkaUtils.createDirectStream) than structured streaming by following this document https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and re-iterating the issue again for better understanding. spark-streaming-kafka-0-10 kafka connector prefix "spark-executor" + group.id for executors, driver uses original group id.

Here is the code where executor construct executor specific group id 


It would be great if you could provide the explanation to the following questions.

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix? at line # 212 in KafkaUtils.scala
#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?
Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)
val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T


On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <[hidden email]> wrote:
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not DStreams what you've mentioned but still relevant):

kafka.group.idstringnonestreaming and batchThe Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because Structured Streaming has another interesting feature:
groupIdPrefixstringspark-kafka-sourcestreaming and batchPrefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T <[hidden email]> wrote:
Hi Team,

We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10 (it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception). 

Here is the code where executor construct executor specific group id 


Here are my Questions

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?

Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
ssc,
PreferConsistent,
Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations

Sethupathi T
Gabor,

Thanks for the clarification.

Thanks 

On Fri, Sep 6, 2019 at 12:38 AM Gabor Somogyi <[hidden email]> wrote:
Sethupathi,

Let me extract then the important part what I've shared:

1. "This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer"
2. Consumers may eat the data from each other, offset calculation may give back wrong result (that's the reason why "extreme caution" is recommended in Structured Streaming doc which still applies here)
3. yes

BR,
G


On Thu, Sep 5, 2019 at 8:34 PM Sethupathi T <[hidden email]> wrote:
Gabor,

Thanks for the quick response and sharing about spark 3.0,  we need to use spark streaming (KafkaUtils.createDirectStream) than structured streaming by following this document https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html and re-iterating the issue again for better understanding. spark-streaming-kafka-0-10 kafka connector prefix "spark-executor" + group.id for executors, driver uses original group id.

Here is the code where executor construct executor specific group id 


It would be great if you could provide the explanation to the following questions.

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix? at line # 212 in KafkaUtils.scala
#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?
Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
  ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
  "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
  "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
  "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
  "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)
val stream = KafkaUtils.createDirectStream[String, Message](
  ssc,
  PreferConsistent,
  Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T


On Thu, Sep 5, 2019 at 9:17 AM Gabor Somogyi <[hidden email]> wrote:
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not DStreams what you've mentioned but still relevant):

kafka.group.idstringnonestreaming and batchThe Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to be very small. When this is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because Structured Streaming has another interesting feature:
groupIdPrefixstringspark-kafka-sourcestreaming and batchPrefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T <[hidden email]> wrote:
Hi Team,

We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10 (it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception). 

Here is the code where executor construct executor specific group id 


Here are my Questions

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?

Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
ssc,
PreferConsistent,
Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T