Question of spark streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Question of spark streaming

uutkarshsingh

I am following the book Spark the Definitive Guide The following code is executed locally using spark-shell

Procedure: Started the spark-shell without any other options

val static = spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")

val dataSchema = static.schema

val streaming = spark.readStream.schema(dataSchema) .option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")

val activityCounts = streaming.groupBy("gt").count()

val activityQuery  = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()

activityQuery.awaitTermination()

The Books says that "After this code is executed the streaming computation will have started in the background" .... "Now that this stream is running , we can experiment with the result by querying"

MY OBSERVATION:

When this code is executed it does not frees the shell for me to type in the commands such asspark.streams.active

Hence I cannot query this stream

My resarch

I tried to open a new spark-shell but querying in that shell does not returns any results. Are the streams obtained from this shell accessible from other another instance of the shell.

I want the table in memory so that I can use the to query using command

for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}
Reply | Threaded
Open this post in threaded view
|

Re: Question of spark streaming

Arun Mahadevan

“activityQuery.awaitTermination()” is a blocking call.

 

You can just skip this line and run other commands in the same shell to query the stream.

 

Running the query from a different shell won’t help since the memory sink where the results are store is not shared between the two shells.

 

Thanks,

Arun

 

From: utkarsh rathor <[hidden email]>
Date: Friday, July 27, 2018 at 5:15 AM
To: "[hidden email]" <[hidden email]>
Subject: Question of spark streaming

 

 

I am following the book Spark the Definitive Guide The following code is executed locally using spark-shell

Procedure: Started the spark-shell without any other options

val static = spark.read.json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
 
val dataSchema = static.schema
 
val streaming = spark.readStream.schema(dataSchema) .option("maxFilesPerTrigger",1).json("/part-00079-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json")
 
val activityCounts = streaming.groupBy("gt").count()
 
val activityQuery  = activityCounts.writeStream.queryName("activity_counts").format("memory").outputMode("complete").start()
 
activityQuery.awaitTermination()

The Books says that "After this code is executed the streaming computation will have started in the background" .... "Now that this stream is running , we can experiment with the result by querying"

MY OBSERVATION:

When this code is executed it does not frees the shell for me to type in the commands such asspark.streams.active

Hence I cannot query this stream

My resarch

I tried to open a new spark-shell but querying in that shell does not returns any results. Are the streams obtained from this shell accessible from other another instance of the shell.

I want the table in memory so that I can use the to query using command

for( i <- 1 to 5)
{
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}