Queries with streaming sources must be executed with writeStream.start()

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Queries with streaming sources must be executed with writeStream.start()

kant kodali
Hi All,

 have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around?

 Dataset<String> ds = newDS.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();
Reply | Threaded
Open this post in threaded view
|

Re: Queries with streaming sources must be executed with writeStream.start()

Felix Cheung
What is newDS?
If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work.


From: kant kodali <[hidden email]>
Sent: Saturday, September 9, 2017 4:04:33 PM
To: user @spark
Subject: Queries with streaming sources must be executed with writeStream.start()
 
Hi All,

 have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around?

 Dataset<String> ds = newDS.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();
Reply | Threaded
Open this post in threaded view
|

Re: Queries with streaming sources must be executed with writeStream.start()

kant kodali
yes it is a streaming dataset. so what is the problem with following code?

Dataset<String> ds = dataset.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();

On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <[hidden email]> wrote:
What is newDS?
If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work.


From: kant kodali <[hidden email]>
Sent: Saturday, September 9, 2017 4:04:33 PM
To: user @spark
Subject: Queries with streaming sources must be executed with writeStream.start()
 
Hi All,

 have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around?

 Dataset<String> ds = newDS.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();

Reply | Threaded
Open this post in threaded view
|

Re: Queries with streaming sources must be executed with writeStream.start()

shixiong
It's because "toJSON" doesn't support Structured Streaming. The current implementation will convert the Dataset to an RDD, which is not supported by streaming queries.

On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <[hidden email]> wrote:
yes it is a streaming dataset. so what is the problem with following code?

Dataset<String> ds = dataset.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();

On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <[hidden email]> wrote:
What is newDS?
If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work.


From: kant kodali <[hidden email]>
Sent: Saturday, September 9, 2017 4:04:33 PM
To: user @spark
Subject: Queries with streaming sources must be executed with writeStream.start()
 
Hi All,

 have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around?

 Dataset<String> ds = newDS.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();


Reply | Threaded
Open this post in threaded view
|

Re: Queries with streaming sources must be executed with writeStream.start()

kant kodali
Thanks Ryan! In this case, I will have Dataset<Row> so is there a way to convert Row to Json string?

Thanks

On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <[hidden email]> wrote:
It's because "toJSON" doesn't support Structured Streaming. The current implementation will convert the Dataset to an RDD, which is not supported by streaming queries.

On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <[hidden email]> wrote:
yes it is a streaming dataset. so what is the problem with following code?

Dataset<String> ds = dataset.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();

On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <[hidden email]> wrote:
What is newDS?
If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work.


From: kant kodali <[hidden email]>
Sent: Saturday, September 9, 2017 4:04:33 PM
To: user @spark
Subject: Queries with streaming sources must be executed with writeStream.start()
 
Hi All,

 have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around?

 Dataset<String> ds = newDS.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();



Reply | Threaded
Open this post in threaded view
|

Re: Queries with streaming sources must be executed with writeStream.start()

Michael Armbrust
The following will convert the whole row to JSON.

import org.apache.spark.sql.functions.*
df.select(to_json(struct(col("*"))))

On Sat, Sep 9, 2017 at 6:27 PM, kant kodali <[hidden email]> wrote:
Thanks Ryan! In this case, I will have Dataset<Row> so is there a way to convert Row to Json string?

Thanks

On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <[hidden email]> wrote:
It's because "toJSON" doesn't support Structured Streaming. The current implementation will convert the Dataset to an RDD, which is not supported by streaming queries.

On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <[hidden email]> wrote:
yes it is a streaming dataset. so what is the problem with following code?

Dataset<String> ds = dataset.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();

On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <[hidden email]> wrote:
What is newDS?
If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work.


From: kant kodali <[hidden email]>
Sent: Saturday, September 9, 2017 4:04:33 PM
To: user @spark
Subject: Queries with streaming sources must be executed with writeStream.start()
 
Hi All,

 have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around?

 Dataset<String> ds = newDS.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();




Reply | Threaded
Open this post in threaded view
|

Re: Queries with streaming sources must be executed with writeStream.start()

kant kodali
Hi Michael,

Interestingly that doesn't seem to quite work for me for some reason. Here is what I have

Datset

name | id | country
-------------------------
kant   | 1  | usa
john   | 2  | usa


And here is my code 

Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above
StreamingQuery query = ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query.awaitTermination();
This works completely fine and I can see the rows on my console.
Now if I change it to this.
Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above
Dataset<String> jsonDS = ds.select(to_json(struct(ds.col("*")))).as(Encoders.STRING());
StreamingQuery query2 = jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query2.awaitTermination();
I dont see any rows on my console and I made sure I waited for a while.
The moment I change it back to above code and run it works again.










On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust <[hidden email]> wrote:
The following will convert the whole row to JSON.

import org.apache.spark.sql.functions.*
df.select(to_json(struct(col("*"))))

On Sat, Sep 9, 2017 at 6:27 PM, kant kodali <[hidden email]> wrote:
Thanks Ryan! In this case, I will have Dataset<Row> so is there a way to convert Row to Json string?

Thanks

On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <[hidden email]> wrote:
It's because "toJSON" doesn't support Structured Streaming. The current implementation will convert the Dataset to an RDD, which is not supported by streaming queries.

On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <[hidden email]> wrote:
yes it is a streaming dataset. so what is the problem with following code?

Dataset<String> ds = dataset.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();

On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <[hidden email]> wrote:
What is newDS?
If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work.


From: kant kodali <[hidden email]>
Sent: Saturday, September 9, 2017 4:04:33 PM
To: user @spark
Subject: Queries with streaming sources must be executed with writeStream.start()
 
Hi All,

 have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around?

 Dataset<String> ds = newDS.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();





Reply | Threaded
Open this post in threaded view
|

Re: Queries with streaming sources must be executed with writeStream.start()

kant kodali
I have about 100 fields in my dataset and some of them have "null" in it. Does to_json fails to convert if that is the case?

Thanks!

On Tue, Sep 12, 2017 at 12:32 PM, kant kodali <[hidden email]> wrote:
Hi Michael,

Interestingly that doesn't seem to quite work for me for some reason. Here is what I have

Datset

name | id | country
-------------------------
kant   | 1  | usa
john   | 2  | usa


And here is my code 

Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above
StreamingQuery query = ds.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query.awaitTermination();
This works completely fine and I can see the rows on my console.
Now if I change it to this.
Dataset<Row> ds = getKafkaStream(); // This dataset represents the one above
Dataset<String> jsonDS = ds.select(to_json(struct(ds.col("*")))).as(Encoders.STRING());
StreamingQuery query2 = jsonDS.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
query2.awaitTermination();
I dont see any rows on my console and I made sure I waited for a while.
The moment I change it back to above code and run it works again.










On Mon, Sep 11, 2017 at 2:28 PM, Michael Armbrust <[hidden email]> wrote:
The following will convert the whole row to JSON.

import org.apache.spark.sql.functions.*
df.select(to_json(struct(col("*"))))

On Sat, Sep 9, 2017 at 6:27 PM, kant kodali <[hidden email]> wrote:
Thanks Ryan! In this case, I will have Dataset<Row> so is there a way to convert Row to Json string?

Thanks

On Sat, Sep 9, 2017 at 5:14 PM, Shixiong(Ryan) Zhu <[hidden email]> wrote:
It's because "toJSON" doesn't support Structured Streaming. The current implementation will convert the Dataset to an RDD, which is not supported by streaming queries.

On Sat, Sep 9, 2017 at 4:40 PM, kant kodali <[hidden email]> wrote:
yes it is a streaming dataset. so what is the problem with following code?

Dataset<String> ds = dataset.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();

On Sat, Sep 9, 2017 at 4:20 PM, Felix Cheung <[hidden email]> wrote:
What is newDS?
If it is a Streaming Dataset/DataFrame (since you have writeStream there) then there seems to be an issue preventing toJSON to work.


From: kant kodali <[hidden email]>
Sent: Saturday, September 9, 2017 4:04:33 PM
To: user @spark
Subject: Queries with streaming sources must be executed with writeStream.start()
 
Hi All,

 have the following code and I am not sure what's wrong with it? I cannot call dataset.toJSON() (which returns a DataSet) ? I am using spark 2.2.0 so I am wondering if there is any work around?

 Dataset<String> ds = newDS.toJSON().map(()->{some function that returns a string});
 StreamingQuery query = ds.writeStream().start();
 query.awaitTermination();