Reading kafka and save to parquet problem

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading kafka and save to parquet problem

JF Chen
I am struggling in trying to read data in kafka and save them to parquet file on hdfs by using spark streaming according to this post https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet

My code is similar to  following
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .write.parquet("hdfs://data.parquet")

What the difference is I am writing in Java language.

But in practice, this code just run once and then exit gracefully. Although it produces the parquet file successfully and no any exception is threw out , it runs like a normal spark program rather than a spark streaming program.

What should I do if want to read kafka and save them to parquet in batch? 

Regard,
Junfeng Chen
Reply | Threaded
Open this post in threaded view
|

Re: Reading kafka and save to parquet problem

naresh Goud
change it to readStream instead of read as below

val df = spark
  .readStream
.format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load()


Check is this helpful

https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala










On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen <[hidden email]> wrote:
I am struggling in trying to read data in kafka and save them to parquet file on hdfs by using spark streaming according to this post https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet

My code is similar to  following
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .write.parquet("hdfs://data.parquet")

What the difference is I am writing in Java language.

But in practice, this code just run once and then exit gracefully. Although it produces the parquet file successfully and no any exception is threw out , it runs like a normal spark program rather than a spark streaming program.

What should I do if want to read kafka and save them to parquet in batch? 

Regard,
Junfeng Chen
Reply | Threaded
Open this post in threaded view
|

Re: Reading kafka and save to parquet problem

JF Chen
I have ever tried to use readStream and writeStream, but it throw "Uri without authority: hdfs:/data/_spark_metadata" exception, which is not seen in normal read mode. 
The parquet path I specified is  hdfs:///data


Regard,
Junfeng Chen

On Thu, Mar 8, 2018 at 9:38 AM, naresh Goud <[hidden email]> wrote:
change it to readStream instead of read as below

val df = spark
  .readStream
.format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load()


Check is this helpful

https://github.com/ndulam/KafkaSparkStreams/blob/master/SampleStreamApp/src/main/scala/com/naresh/org/SensorDataSave.scala










On Wed, Mar 7, 2018 at 7:33 PM Junfeng Chen <[hidden email]> wrote:
I am struggling in trying to read data in kafka and save them to parquet file on hdfs by using spark streaming according to this post https://stackoverflow.com/questions/45827664/read-from-kafka-and-write-to-hdfs-in-parquet

My code is similar to  following
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
  .write.parquet("hdfs://data.parquet")

What the difference is I am writing in Java language.

But in practice, this code just run once and then exit gracefully. Although it produces the parquet file successfully and no any exception is threw out , it runs like a normal spark program rather than a spark streaming program.

What should I do if want to read kafka and save them to parquet in batch? 

Regard,
Junfeng Chen