Spark streaming with Confluent kafka

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark streaming with Confluent kafka

dwgw
Hi

I am trying to stream confluent kafka topic in the spark shell. For that i
have invoked spark shell using following command.

# spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
--conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
--driver-java-options
"-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
/home/spark/kafka_jaas.conf

kafka_jaas.conf
-----------------

KafkaClient {
                     
 org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="XXX"
                       password="XXX";
};

Readstream
-------------

scala> val df = spark.
| readStream.
| format("kafka").
| option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
| option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
| option("kafka.sasl.mechanisms", "PLAIN").
| option("kafka.security.protocol", "SASL_SSL").
| option("startingOffsets", "earliest").
| load.
| select($"value".cast("string").alias("value"))
df: org.apache.spark.sql.DataFrame = [value: string]

Writestream
--------------

scala> df.writeStream.
| format("console").
| outputMode("append").
| trigger(Trigger.ProcessingTime("20 seconds")).
| start
2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
checkpoint location created which is deleted normally when the query didn't
fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
to delete it under any circumstances, please set
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
know deleting temp checkpoint folder is best effort.
res0: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741

scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
attempt 1 getting Kafka offsets:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:598)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:364)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:208)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: org.apache.kafka.common.KafkaException:
java.lang.IllegalArgumentException: No serviceName defined in either JAAS or
Kafka config
at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 43 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in
either JAAS or Kafka config
at
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)

The error No serviceName defined in either JAAS or Kafka config is
misleading because, i am not using kerberos rather .PlainLoginModule And in
this case it is not required to have serviceName parameter in either JAAS or
Kafka config.

To make my point, i can run kafka avro console producer/consumer with the
following client properties (without servicename):

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=200000
retry.backoff.ms=5000
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="XXX"
password="XXX";
security.protocol=SASL_SSL

I am using:
Spark 3.0.0
Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)
spark-sql-kafka-0-10_2.12:3.0.0

What am i doing wrong ?

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming with Confluent kafka

Gabor Somogyi
The error is clear:
Caused by: java.lang.IllegalArgumentException: No serviceName defined in
either JAAS or Kafka config

On Fri, 3 Jul 2020, 15:40 dwgw, <[hidden email]> wrote:
Hi

I am trying to stream confluent kafka topic in the spark shell. For that i
have invoked spark shell using following command.

# spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
--conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
--driver-java-options
"-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
/home/spark/kafka_jaas.conf

kafka_jaas.conf
-----------------

KafkaClient {
                     
 org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="XXX"
                       password="XXX";
};

Readstream
-------------

scala> val df = spark.
| readStream.
| format("kafka").
| option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
| option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
| option("kafka.sasl.mechanisms", "PLAIN").
| option("kafka.security.protocol", "SASL_SSL").
| option("startingOffsets", "earliest").
| load.
| select($"value".cast("string").alias("value"))
df: org.apache.spark.sql.DataFrame = [value: string]

Writestream
--------------

scala> df.writeStream.
| format("console").
| outputMode("append").
| trigger(Trigger.ProcessingTime("20 seconds")).
| start
2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
checkpoint location created which is deleted normally when the query didn't
fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
to delete it under any circumstances, please set
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
know deleting temp checkpoint folder is best effort.
res0: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741

scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
attempt 1 getting Kafka offsets:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:598)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:364)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:208)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: org.apache.kafka.common.KafkaException:
java.lang.IllegalArgumentException: No serviceName defined in either JAAS or
Kafka config
at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 43 more
Caused by: java.lang.IllegalArgumentException: No serviceName defined in
either JAAS or Kafka config
at
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:301)

The error No serviceName defined in either JAAS or Kafka config is
misleading because, i am not using kerberos rather .PlainLoginModule And in
this case it is not required to have serviceName parameter in either JAAS or
Kafka config.

To make my point, i can run kafka avro console producer/consumer with the
following client properties (without servicename):

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=200000
retry.backoff.ms=5000
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username="XXX"
password="XXX";
security.protocol=SASL_SSL

I am using:
Spark 3.0.0
Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)
spark-sql-kafka-0-10_2.12:3.0.0

What am i doing wrong ?

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]