Kafka Topic to Parquet HDFS with Structured Streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka Topic to Parquet HDFS with Structured Streaming

Chetan Khatri
Hello Dear Spark Users,

I am trying to write data from Kafka Topic to Parquet HDFS with Structured Streaming but Getting failures. Please do help.

val spark: SparkSession = SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
import spark.implicits._
val dataFromTopicDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
val topicQuery = dataFromTopicDF.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/tmp/checkpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()

topicQuery.awaitTermination()
topicQuery.stop()

Above code is working well but when I am trying to write to Parquet at HDFS getting exceptions. 

logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")

val parquetQuery = dataFromTopicDF.writeStream
.format("parquet")
.option("startingOffsets", "earliest")
.option("checkpointLocation", "/tmp/checkpoint")
.option("path", "/sample-topic")
.start()

parquetQuery.awaitTermination()
parquetQuery.stop()

Exception Details:

Exception in thread "main" java.io.IOException: mkdir of /sample-topic/_spark_metadata failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:66)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:46)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:85)
at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:98)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
at com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
at com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Topic to Parquet HDFS with Structured Streaming

Chetan Khatri

On Fri, Jun 7, 2019 at 5:59 PM Chetan Khatri <[hidden email]> wrote:
Hello Dear Spark Users,

I am trying to write data from Kafka Topic to Parquet HDFS with Structured Streaming but Getting failures. Please do help.

val spark: SparkSession = SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
import spark.implicits._
val dataFromTopicDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
val topicQuery = dataFromTopicDF.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/tmp/checkpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()

topicQuery.awaitTermination()
topicQuery.stop()

Above code is working well but when I am trying to write to Parquet at HDFS getting exceptions. 

logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")

val parquetQuery = dataFromTopicDF.writeStream
.format("parquet")
.option("startingOffsets", "earliest")
.option("checkpointLocation", "/tmp/checkpoint")
.option("path", "/sample-topic")
.start()

parquetQuery.awaitTermination()
parquetQuery.stop()

Exception Details:

Exception in thread "main" java.io.IOException: mkdir of /sample-topic/_spark_metadata failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:66)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:46)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:85)
at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:98)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
at com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
at com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks
Reply | Threaded
Open this post in threaded view
|

Re: Kafka Topic to Parquet HDFS with Structured Streaming

Deng Ching-Mallete
In reply to this post by Chetan Khatri
Hi Chetan,

Best to check if the user account that you're using to run the job has permission to write to the path in HDFS. I would suggest to write the parquet files to a different path, perhaps to a project space or user home, rather than at the root directory. 

HTH,
Deng

On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri <[hidden email]> wrote:
Hello Dear Spark Users,

I am trying to write data from Kafka Topic to Parquet HDFS with Structured Streaming but Getting failures. Please do help.

val spark: SparkSession = SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
import spark.implicits._
val dataFromTopicDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
val topicQuery = dataFromTopicDF.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/tmp/checkpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()

topicQuery.awaitTermination()
topicQuery.stop()

Above code is working well but when I am trying to write to Parquet at HDFS getting exceptions. 

logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")

val parquetQuery = dataFromTopicDF.writeStream
.format("parquet")
.option("startingOffsets", "earliest")
.option("checkpointLocation", "/tmp/checkpoint")
.option("path", "/sample-topic")
.start()

parquetQuery.awaitTermination()
parquetQuery.stop()

Exception Details:

Exception in thread "main" java.io.IOException: mkdir of /sample-topic/_spark_metadata failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:66)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:46)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:85)
at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:98)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
at com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
at com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Topic to Parquet HDFS with Structured Streaming

Chetan Khatri
Hello Deng, Thank you for your email.
Issue was with Spark - Hadoop / HDFS configuration settings.

Thanks

On Mon, Jun 10, 2019 at 5:28 AM Deng Ching-Mallete <[hidden email]> wrote:
Hi Chetan,

Best to check if the user account that you're using to run the job has permission to write to the path in HDFS. I would suggest to write the parquet files to a different path, perhaps to a project space or user home, rather than at the root directory. 

HTH,
Deng

On Sat, Jun 8, 2019 at 8:00 AM Chetan Khatri <[hidden email]> wrote:
Hello Dear Spark Users,

I am trying to write data from Kafka Topic to Parquet HDFS with Structured Streaming but Getting failures. Please do help.

val spark: SparkSession = SparkSession.builder().appName("DemoSparkKafka").getOrCreate()
import spark.implicits._
val dataFromTopicDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

logger.info("DemoSparkKafka - Printing Topic Messages in Key - Value pairs.")
val topicQuery = dataFromTopicDF.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/tmp/checkpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()

topicQuery.awaitTermination()
topicQuery.stop()

Above code is working well but when I am trying to write to Parquet at HDFS getting exceptions. 

logger.info("DemoSparkKafka - Writing Topic Messages to Parquet at HDFS")

val parquetQuery = dataFromTopicDF.writeStream
.format("parquet")
.option("startingOffsets", "earliest")
.option("checkpointLocation", "/tmp/checkpoint")
.option("path", "/sample-topic")
.start()

parquetQuery.awaitTermination()
parquetQuery.stop()

Exception Details:

Exception in thread "main" java.io.IOException: mkdir of /sample-topic/_spark_metadata failed
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1067)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:378)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:66)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:46)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:85)
at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:98)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:293)
at com.dynasty.poc.DemoSparkKafka$.delayedEndpoint$com$dynasty$poc$DemoSparkKafka$1(DemoSparkKafka.scala:35)
at com.dynasty.poc.DemoSparkKafka$delayedInit$body.apply(DemoSparkKafka.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.dynasty.poc.DemoSparkKafka$.main(DemoSparkKafka.scala:7)
at com.dynasty.poc.DemoSparkKafka.main(DemoSparkKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Kafka Topic to Parquet HDFS with Structured Streaming

AlbertoMarq
Hi Chetan
I'm having the exact same issue with spark structured streaming and kafka
trying to write to HDFS.
Can you please tell me how did you fixed it?
I'm ussing spark 3.0.1 and hadoop 3.3.0

Thanks!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]