[Structured Streaming] More than 1 streaming in a code

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

[Structured Streaming] More than 1 streaming in a code

Aakash Basu-2
Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


# -----------------------#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# -----------------------#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,
Aakash.
Reply | Threaded
Open this post in threaded view
|

Fwd: [Structured Streaming] More than 1 streaming in a code

Aakash Basu-2
Any help?

Need urgent help. Someone please clarify the doubt?

---------- Forwarded message ----------
From: Aakash Basu <[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <[hidden email]>


Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


# -----------------------#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# -----------------------#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,
Aakash.

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

Panagiotis Garefalakis
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.
What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

I hope this helps.

Cheers,
Panagiotis

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:
Any help?

Need urgent help. Someone please clarify the doubt?

---------- Forwarded message ----------
From: Aakash Basu <[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <[hidden email]>


Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


# -----------------------#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# -----------------------#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,
Aakash.


Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

Aakash Basu-2
Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+


Updated Code -
from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from ds") # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()


Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.
What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

I hope this helps.

Cheers,
Panagiotis

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:
Any help?

Need urgent help. Someone please clarify the doubt?

---------- Forwarded message ----------
From: Aakash Basu <[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <[hidden email]>


Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


# -----------------------#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# -----------------------#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,
Aakash.



Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

spark receiver
Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


在 2018年4月6日,上午7:40,Aakash Basu <[hidden email]> 写道:

Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+


Updated Code -
from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from ds") # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()


Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.
What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

I hope this helps.

Cheers,
Panagiotis

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:
Any help?

Need urgent help. Someone please clarify the doubt?

---------- Forwarded message ----------
From: Aakash Basu <[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <[hidden email]>


Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


# -----------------------#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# -----------------------#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,
Aakash.




Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

JayeshLalwani

Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported

 

 

From: spark receiver <[hidden email]>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <[hidden email]>
Cc: Panagiotis Garefalakis <[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Hi Panagiotis ,

 

Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate  so much if you could paste the code snippet  if it’s working .

 

Thanks.

 



201846日,上午7:40Aakash Basu <[hidden email]> 写道:

 

Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+



Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName(
"StructuredNetworkWordCount") \
    .getOrCreate()

data = spark \
    .readStream \
    .format(
"socket") \
    .option(
"header","true") \
    .option(
"host", "localhost") \
    .option(
"port", 9998) \
    .load(
"csv")


id_DF = data.select(split(data.value
, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView(
"ds")

df = spark.sql(
"select avg(col1) as aver from ds")

df.createOrReplaceTempView(
"abcd")

wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from ds"# (select aver from abcd)

query2 = df \
    .writeStream \
    .format(
"console") \
    .outputMode(
"complete") \
    .trigger(
processingTime='5 seconds') \
    .start()

query = wordCounts \
    .writeStream \
    .format(
"console") \
    .trigger(
processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

 

Thanks,

Aakash.

 

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:

Hello Aakash,

 

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.

What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

 

I hope this helps.

 

Cheers,

Panagiotis

 

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:

Any help?

Need urgent help. Someone please clarify the doubt?

 

---------- Forwarded message ----------
From: Aakash Basu <
[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <
[hidden email]>

Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


    spark = SparkSession.builder \
        .appName(
"Stream_Col_Oper_Spark") \
        .getOrCreate()

    data = spark.readStream.format(
"kafka") \
        .option(
"startingOffsets", "latest") \
        .option(
"kafka.bootstrap.servers", "localhost:9092") \
        .option(
"subscribe", "test1") \
        .load()

    ID = data.select(
'value') \
        .withColumn(
'value', data.value.cast("string")) \
        .withColumn(
"Col1", split(col("value"), ",").getItem(0)) \
        .withColumn(
"Col2", split(col("value"), ",").getItem(1)) \
        .drop(
'value')

    ID.createOrReplaceTempView(
"transformed_Stream_DF")

    df = spark.sql(
"select avg(col1) as aver from transformed_Stream_DF")

    df.createOrReplaceTempView(
"abcd")

    wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


    # -----------------------#

   
query1 = df \
        .writeStream \
        .format(
"console") \
        .outputMode(
"complete") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query1.awaitTermination()
   
# -----------------------#

   
query2 = wordCounts \
        .writeStream \
        .format(
"console") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query2.awaitTermination()


    # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,

Aakash.

 

 

 

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

Aakash Basu-2
Hey Jayesh and Others,

Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <[hidden email]> wrote:

Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported

 

 

From: spark receiver <[hidden email]>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <[hidden email]>
Cc: Panagiotis Garefalakis <[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Hi Panagiotis ,

 

Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate  so much if you could paste the code snippet  if it’s working .

 

Thanks.

 



201846日,上午7:40Aakash Basu <[hidden email]> 写道:

 

Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+



Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName(
"StructuredNetworkWordCount") \
    .getOrCreate()

data = spark \
    .readStream \
    .format(
"socket") \
    .option(
"header","true") \
    .option(
"host", "localhost") \
    .option(
"port", 9998) \
    .load(
"csv")


id_DF = data.select(split(data.value
, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView(
"ds")

df = spark.sql(
"select avg(col1) as aver from ds")

df.createOrReplaceTempView(
"abcd")

wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from ds"# (select aver from abcd)

query2 = df \
    .writeStream \
    .format(
"console") \
    .outputMode(
"complete") \
    .trigger(
processingTime='5 seconds') \
    .start()

query = wordCounts \
    .writeStream \
    .format(
"console") \
    .trigger(
processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

 

Thanks,

Aakash.

 

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:

Hello Aakash,

 

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.

What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

 

I hope this helps.

 

Cheers,

Panagiotis

 

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:

Any help?

Need urgent help. Someone please clarify the doubt?

 

---------- Forwarded message ----------
From: Aakash Basu <
[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <
[hidden email]>

Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


    spark = SparkSession.builder \
        .appName(
"Stream_Col_Oper_Spark") \
        .getOrCreate()

    data = spark.readStream.format(
"kafka") \
        .option(
"startingOffsets", "latest") \
        .option(
"kafka.bootstrap.servers", "localhost:9092") \
        .option(
"subscribe", "test1") \
        .load()

    ID = data.select(
'value') \
        .withColumn(
'value', data.value.cast("string")) \
        .withColumn(
"Col1", split(col("value"), ",").getItem(0)) \
        .withColumn(
"Col2", split(col("value"), ",").getItem(1)) \
        .drop(
'value')

    ID.createOrReplaceTempView(
"transformed_Stream_DF")

    df = spark.sql(
"select avg(col1) as aver from transformed_Stream_DF")

    df.createOrReplaceTempView(
"abcd")

    wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


    # -----------------------#

   
query1 = df \
        .writeStream \
        .format(
"console") \
        .outputMode(
"complete") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query1.awaitTermination()
   
# -----------------------#

   
query2 = wordCounts \
        .writeStream \
        .format(
"console") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query2.awaitTermination()


    # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,

Aakash.

 

 

 

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

JayeshLalwani

You could do it if you had a timestamp in your data.  You can use windowed operations to divide a value by it’s own average over a window. However, in structured streaming, you can only window by timestamp columns. You cannot do windows aggregations on integers.

 

From: Aakash Basu <[hidden email]>
Date: Monday, April 16, 2018 at 4:52 AM
To: "Lalwani, Jayesh" <[hidden email]>
Cc: spark receiver <[hidden email]>, Panagiotis Garefalakis <[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Is there then, any other way to come to a solution for this use-case?

 

Thanks,

Aakash.

 

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <[hidden email]> wrote:

Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported

 

 

From: spark receiver <[hidden email]>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <
[hidden email]>
Cc: Panagiotis Garefalakis <
[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Hi Panagiotis ,

 

Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate  so much if you could paste the code snippet  if it’s working .

 

Thanks.

 

 

201846日,上午7:40Aakash Basu <[hidden email]> 写道:

 

Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+



Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName(
"StructuredNetworkWordCount") \
    .getOrCreate()

data = spark \
    .readStream \
    .format(
"socket") \
    .option(
"header","true") \
    .option(
"host", "localhost") \
    .option(
"port", 9998) \
    .load(
"csv")


id_DF = data.select(split(data.value
, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView(
"ds")

df = spark.sql(
"select avg(col1) as aver from ds")

df.createOrReplaceTempView(
"abcd")

wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from ds"# (select aver from abcd)

query2 = df \
    .writeStream \
    .format(
"console") \
    .outputMode(
"complete") \
    .trigger(
processingTime='5 seconds') \
    .start()

query = wordCounts \
    .writeStream \
    .format(
"console") \
    .trigger(
processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

 

Thanks,

Aakash.

 

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:

Hello Aakash,

 

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.

What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

 

I hope this helps.

 

Cheers,

Panagiotis

 

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:

Any help?

Need urgent help. Someone please clarify the doubt?

 

---------- Forwarded message ----------
From: Aakash Basu <
[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <
[hidden email]>

Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


    spark = SparkSession.builder \
        .appName(
"Stream_Col_Oper_Spark") \
        .getOrCreate()

    data = spark.readStream.format(
"kafka") \
        .option(
"startingOffsets", "latest") \
        .option(
"kafka.bootstrap.servers", "localhost:9092") \
        .option(
"subscribe", "test1") \
        .load()

    ID = data.select(
'value') \
        .withColumn(
'value', data.value.cast("string")) \
        .withColumn(
"Col1", split(col("value"), ",").getItem(0)) \
        .withColumn(
"Col2", split(col("value"), ",").getItem(1)) \
        .drop(
'value')

    ID.createOrReplaceTempView(
"transformed_Stream_DF")

    df = spark.sql(
"select avg(col1) as aver from transformed_Stream_DF")

    df.createOrReplaceTempView(
"abcd")

    wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


    # -----------------------#

   
query1 = df \
        .writeStream \
        .format(
"console") \
        .outputMode(
"complete") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query1.awaitTermination()
   
# -----------------------#

   
query2 = wordCounts \
        .writeStream \
        .format(
"console") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query2.awaitTermination()


    # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py

 

Thanks,

Aakash.

 

 

 

 

 


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

Aakash Basu-2
If I use timestamp based windowing, then my average will not be global average but grouped by timestamp, which is not my requirement. I want to recalculate the avg of entire column, every time a new row(s) comes in and divide the other column with the updated avg.

Let me know, in-case you or anyone else has any soln. for this.

On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh <[hidden email]> wrote:

You could do it if you had a timestamp in your data.  You can use windowed operations to divide a value by it’s own average over a window. However, in structured streaming, you can only window by timestamp columns. You cannot do windows aggregations on integers.

 

From: Aakash Basu <[hidden email]>
Date: Monday, April 16, 2018 at 4:52 AM
To: "Lalwani, Jayesh" <[hidden email]>
Cc: spark receiver <[hidden email]>, Panagiotis Garefalakis <[hidden email]>, user <[hidden email]>


Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Is there then, any other way to come to a solution for this use-case?

 

Thanks,

Aakash.

 

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <[hidden email]> wrote:

Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported

 

 

From: spark receiver <[hidden email]>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <
[hidden email]>
Cc: Panagiotis Garefalakis <
[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Hi Panagiotis ,

 

Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate  so much if you could paste the code snippet  if it’s working .

 

Thanks.

 

 

201846日,上午7:40Aakash Basu <[hidden email]> 写道:

 

Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+



Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName(
"StructuredNetworkWordCount") \
    .getOrCreate()

data = spark \
    .readStream \
    .format(
"socket") \
    .option(
"header","true") \
    .option(
"host", "localhost") \
    .option(
"port", 9998) \
    .load(
"csv")


id_DF = data.select(split(data.value
, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView(
"ds")

df = spark.sql(
"select avg(col1) as aver from ds")

df.createOrReplaceTempView(
"abcd")

wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from ds"# (select aver from abcd)

query2 = df \
    .writeStream \
    .format(
"console") \
    .outputMode(
"complete") \
    .trigger(
processingTime='5 seconds') \
    .start()

query = wordCounts \
    .writeStream \
    .format(
"console") \
    .trigger(
processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

 

Thanks,

Aakash.

 

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:

Hello Aakash,

 

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.

What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

 

I hope this helps.

 

Cheers,

Panagiotis

 

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:

Any help?

Need urgent help. Someone please clarify the doubt?

 

---------- Forwarded message ----------
From: Aakash Basu <
[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <
[hidden email]>

Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


    spark = SparkSession.builder \
        .appName(
"Stream_Col_Oper_Spark") \
        .getOrCreate()

    data = spark.readStream.format(
"kafka") \
        .option(
"startingOffsets", "latest") \
        .option(
"kafka.bootstrap.servers", "localhost:9092") \
        .option(
"subscribe", "test1") \
        .load()

    ID = data.select(
'value') \
        .withColumn(
'value', data.value.cast("string")) \
        .withColumn(
"Col1", split(col("value"), ",").getItem(0)) \
        .withColumn(
"Col2", split(col("value"), ",").getItem(1)) \
        .drop(
'value')

    ID.createOrReplaceTempView(
"transformed_Stream_DF")

    df = spark.sql(
"select avg(col1) as aver from transformed_Stream_DF")

    df.createOrReplaceTempView(
"abcd")

    wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


    # -----------------------#

   
query1 = df \
        .writeStream \
        .format(
"console") \
        .outputMode(
"complete") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query1.awaitTermination()
   
# -----------------------#

   
query2 = wordCounts \
        .writeStream \
        .format(
"console") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query2.awaitTermination()


    # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py

 

Thanks,

Aakash.

 

 

 

 

 


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

JayeshLalwani

You could have a really large window.

 

From: Aakash Basu <[hidden email]>
Date: Monday, April 16, 2018 at 10:56 AM
To: "Lalwani, Jayesh" <[hidden email]>
Cc: spark receiver <[hidden email]>, Panagiotis Garefalakis <[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

 

On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh <[hidden email]> wrote:

You could do it if you had a timestamp in your data.  You can use windowed operations to divide a value by it’s own average over a window. However, in structured streaming, you can only window by timestamp columns. You cannot do windows aggregations on integers.

 

From: Aakash Basu <[hidden email]>
Date: Monday, April 16, 2018 at 4:52 AM
To: "Lalwani, Jayesh" <
[hidden email]>
Cc: spark receiver <
[hidden email]>, Panagiotis Garefalakis <[hidden email]>, user <[hidden email]>


Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Is there then, any other way to come to a solution for this use-case?

 

Thanks,

Aakash.

 

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <[hidden email]> wrote:

Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported

 

 

From: spark receiver <[hidden email]>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <
[hidden email]>
Cc: Panagiotis Garefalakis <
[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Hi Panagiotis ,

 

Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate  so much if you could paste the code snippet  if it’s working .

 

Thanks.

 

 

201846日,上午7:40Aakash Basu <[hidden email]> 写道:

 

Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+



Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName(
"StructuredNetworkWordCount") \
    .getOrCreate()

data = spark \
    .readStream \
    .format(
"socket") \
    .option(
"header","true") \
    .option(
"host", "localhost") \
    .option(
"port", 9998) \
    .load(
"csv")


id_DF = data.select(split(data.value
, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView(
"ds")

df = spark.sql(
"select avg(col1) as aver from ds")

df.createOrReplaceTempView(
"abcd")

wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from ds"# (select aver from abcd)

query2 = df \
    .writeStream \
    .format(
"console") \
    .outputMode(
"complete") \
    .trigger(
processingTime='5 seconds') \
    .start()

query = wordCounts \
    .writeStream \
    .format(
"console") \
    .trigger(
processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

 

Thanks,

Aakash.

 

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:

Hello Aakash,

 

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.

What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

 

I hope this helps.

 

Cheers,

Panagiotis

 

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:

Any help?

Need urgent help. Someone please clarify the doubt?

 

---------- Forwarded message ----------
From: Aakash Basu <
[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <
[hidden email]>

Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


    spark = SparkSession.builder \
        .appName(
"Stream_Col_Oper_Spark") \
        .getOrCreate()

    data = spark.readStream.format(
"kafka") \
        .option(
"startingOffsets", "latest") \
        .option(
"kafka.bootstrap.servers", "localhost:9092") \
        .option(
"subscribe", "test1") \
        .load()

    ID = data.select(
'value') \
        .withColumn(
'value', data.value.cast("string")) \
        .withColumn(
"Col1", split(col("value"), ",").getItem(0)) \
        .withColumn(
"Col2", split(col("value"), ",").getItem(1)) \
        .drop(
'value')

    ID.createOrReplaceTempView(
"transformed_Stream_DF")

    df = spark.sql(
"select avg(col1) as aver from transformed_Stream_DF")

    df.createOrReplaceTempView(
"abcd")

    wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


    # -----------------------#

   
query1 = df \
        .writeStream \
        .format(
"console") \
        .outputMode(
"complete") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query1.awaitTermination()
   
# -----------------------#

   
query2 = wordCounts \
        .writeStream \
        .format(
"console") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query2.awaitTermination()


    # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py

 

Thanks,

Aakash.

 

 

 

 

 


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

 

 


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

maasg
In reply to this post by Aakash Basu-2
Aakash,

There are two issues here.
The issue with the code on the first question is that the first query blocks and the code for the second does not get executed. Panagiotis pointed this out correctly. 
In the updated code, the issue is related to netcat (nc) and the way structured streaming works. As far as I remember, netcat only delivers data to the first network connection.
On the structured streaming side, each query will issue its own connection. This results in only the first query getting the data.
If you would talk to a TPC server supporting multiple connected clients, you would see data in both queries.

If your actual source is Kafka, the original solution of using `spark.streams.awaitAnyTermination`  should solve the problem.

-kr, Gerard.
    


On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu <[hidden email]> wrote:
Hey Jayesh and Others,

Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <[hidden email]> wrote:

Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported

 

 

From: spark receiver <[hidden email]>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <[hidden email]>
Cc: Panagiotis Garefalakis <[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Hi Panagiotis ,

 

Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate  so much if you could paste the code snippet  if it’s working .

 

Thanks.

 



201846日,上午7:40Aakash Basu <[hidden email]> 写道:

 

Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+



Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName(
"StructuredNetworkWordCount") \
    .getOrCreate()

data = spark \
    .readStream \
    .format(
"socket") \
    .option(
"header","true") \
    .option(
"host", "localhost") \
    .option(
"port", 9998) \
    .load(
"csv")


id_DF = data.select(split(data.value
, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView(
"ds")

df = spark.sql(
"select avg(col1) as aver from ds")

df.createOrReplaceTempView(
"abcd")

wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from ds"# (select aver from abcd)

query2 = df \
    .writeStream \
    .format(
"console") \
    .outputMode(
"complete") \
    .trigger(
processingTime='5 seconds') \
    .start()

query = wordCounts \
    .writeStream \
    .format(
"console") \
    .trigger(
processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

 

Thanks,

Aakash.

 

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:

Hello Aakash,

 

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.

What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

 

I hope this helps.

 

Cheers,

Panagiotis

 

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:

Any help?

Need urgent help. Someone please clarify the doubt?

 

---------- Forwarded message ----------
From: Aakash Basu <
[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <
[hidden email]>

Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


    spark = SparkSession.builder \
        .appName(
"Stream_Col_Oper_Spark") \
        .getOrCreate()

    data = spark.readStream.format(
"kafka") \
        .option(
"startingOffsets", "latest") \
        .option(
"kafka.bootstrap.servers", "localhost:9092") \
        .option(
"subscribe", "test1") \
        .load()

    ID = data.select(
'value') \
        .withColumn(
'value', data.value.cast("string")) \
        .withColumn(
"Col1", split(col("value"), ",").getItem(0)) \
        .withColumn(
"Col2", split(col("value"), ",").getItem(1)) \
        .drop(
'value')

    ID.createOrReplaceTempView(
"transformed_Stream_DF")

    df = spark.sql(
"select avg(col1) as aver from transformed_Stream_DF")

    df.createOrReplaceTempView(
"abcd")

    wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


    # -----------------------#

   
query1 = df \
        .writeStream \
        .format(
"console") \
        .outputMode(
"complete") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query1.awaitTermination()
   
# -----------------------#

   
query2 = wordCounts \
        .writeStream \
        .format(
"console") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query2.awaitTermination()


    # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,

Aakash.

 

 

 

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.



Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] More than 1 streaming in a code

Aakash Basu-2
Hi Gerard,

"If your actual source is Kafka, the original solution of using `spark.streams.awaitAnyTermination`  should solve the problem."

I tried literally everything, nothing worked out.

1) Tried NC from two different ports for two diff streams, still nothing worked.

2) Tried same using Kafka with awaitAnyTermination, still no use, the first stream write kept on blocking the second... (And inner queries with aggregation doesn't work in Spark Streaming it seems, as it expects a separate writeStream.start()).

Any insight (or direct update to the code would be helpful).

Thanks,
Aakash.

On Mon 16 Apr, 2018, 9:05 PM Gerard Maas, <[hidden email]> wrote:
Aakash,

There are two issues here.
The issue with the code on the first question is that the first query blocks and the code for the second does not get executed. Panagiotis pointed this out correctly. 
In the updated code, the issue is related to netcat (nc) and the way structured streaming works. As far as I remember, netcat only delivers data to the first network connection.
On the structured streaming side, each query will issue its own connection. This results in only the first query getting the data.
If you would talk to a TPC server supporting multiple connected clients, you would see data in both queries.

If your actual source is Kafka, the original solution of using `spark.streams.awaitAnyTermination`  should solve the problem.

-kr, Gerard.
    


On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu <[hidden email]> wrote:
Hey Jayesh and Others,

Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <[hidden email]> wrote:

Note that what you are trying to do here is join a streaming data frame with an aggregated streaming data frame. As per the documentation, joining an aggregated streaming data frame with another streaming data frame is not supported

 

 

From: spark receiver <[hidden email]>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <[hidden email]>
Cc: Panagiotis Garefalakis <[hidden email]>, user <[hidden email]>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

 

Hi Panagiotis ,

 

Wondering you solved the problem or not? Coz I met the same issue today. I’d appreciate  so much if you could paste the code snippet  if it’s working .

 

Thanks.

 



201846日,上午7:40Aakash Basu <[hidden email]> 写道:

 

Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits for new data, doesn't even goes to the next one.

Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8

Result -

-------------------------------------------
Batch: 0
-------------------------------------------
+----+
|aver|
+----+
| 3.0|
+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+
|aver|
+----+
| 4.0|
+----+



Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName(
"StructuredNetworkWordCount") \
    .getOrCreate()

data = spark \
    .readStream \
    .format(
"socket") \
    .option(
"header","true") \
    .option(
"host", "localhost") \
    .option(
"port", 9998) \
    .load(
"csv")


id_DF = data.select(split(data.value
, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView(
"ds")

df = spark.sql(
"select avg(col1) as aver from ds")

df.createOrReplaceTempView(
"abcd")

wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from ds"# (select aver from abcd)

query2 = df \
    .writeStream \
    .format(
"console") \
    .outputMode(
"complete") \
    .trigger(
processingTime='5 seconds') \
    .start()

query = wordCounts \
    .writeStream \
    .format(
"console") \
    .trigger(
processingTime='5 seconds') \
    .start()

spark.streams.awaitAnyTermination()

 

Thanks,

Aakash.

 

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <[hidden email]> wrote:

Hello Aakash,

 

When you use query.awaitTermination you are pretty much blocking there waiting for the current query to stop or throw an exception. In your case the second query will not even start.

What you could do instead is remove all the blocking calls and use spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 to terminate). Make sure you do that after the query2.start call.

 

I hope this helps.

 

Cheers,

Panagiotis

 

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <[hidden email]> wrote:

Any help?

Need urgent help. Someone please clarify the doubt?

 

---------- Forwarded message ----------
From: Aakash Basu <
[hidden email]>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <
[hidden email]>

Hi,

If I have more than one writeStream in a code, which operates on the same readStream data, why does it produce only the first writeStream? I want the second one to be also printed on the console.

How to do that?

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


    spark = SparkSession.builder \
        .appName(
"Stream_Col_Oper_Spark") \
        .getOrCreate()

    data = spark.readStream.format(
"kafka") \
        .option(
"startingOffsets", "latest") \
        .option(
"kafka.bootstrap.servers", "localhost:9092") \
        .option(
"subscribe", "test1") \
        .load()

    ID = data.select(
'value') \
        .withColumn(
'value', data.value.cast("string")) \
        .withColumn(
"Col1", split(col("value"), ",").getItem(0)) \
        .withColumn(
"Col2", split(col("value"), ",").getItem(1)) \
        .drop(
'value')

    ID.createOrReplaceTempView(
"transformed_Stream_DF")

    df = spark.sql(
"select avg(col1) as aver from transformed_Stream_DF")

    df.createOrReplaceTempView(
"abcd")

    wordCounts = spark.sql(
"Select col1, col2, col2/(select aver from abcd) col3 from transformed_Stream_DF")


    # -----------------------#

   
query1 = df \
        .writeStream \
        .format(
"console") \
        .outputMode(
"complete") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query1.awaitTermination()
   
# -----------------------#

   
query2 = wordCounts \
        .writeStream \
        .format(
"console") \
        .trigger(
processingTime='3 seconds') \
        .start()

    query2.awaitTermination()


    # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3 /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py



Thanks,

Aakash.

 

 

 

 



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.