How to do sliding window operation on RDDs in Pyspark?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to do sliding window operation on RDDs in Pyspark?

zakhavan
Hello,

I have 2 text file in the following form and my goal is to calculate the
Pearson correlation between them using sliding window in pyspark:

123.00
-12.00
334.00
.
.
.

First I read these 2 text file and store them in RDD format and then I apply
the window operation on each RDD but I keep getting this error:
*
AttributeError: 'PipelinedRDD' object has no attribute window*

Here is my code:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("CrossCorrelation").getOrCreate()
    # DEFINE your input path
    input_path1 = sys.argv[1]
    input_path2 = sys.argv[2]



    num_of_partitions = 4
    rdd1 = spark.sparkContext.textFile(input_path1,
num_of_partitions).flatMap(lambda line1:
line1.split("\n").strip()).map(lambda strelem1: float(strelem1))
    rdd2 = spark.sparkContext.textFile(input_path2,
num_of_partitions).flatMap(lambda line2:
line2.split("\n").strip()).map(lambda strelem2: float(strelem2))

    #Windowing
    windowedrdd1= rdd1.window(3,2)
    windowedrdd2= rdd2.window(3,2)

    #Correlation between sliding windows

    CrossCorr = Statistics.corr(windowedrdd1, windowedrdd2,
method="pearson")


    if CrossCorr >= 0.7:
        print("rdd1 & rdd2 are correlated")

I know from the error that window operation is only for DStream but since I
have RDD here how I can do window operation on RDDs?

Thank you,

Zeinab





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: How to do sliding window operation on RDDs in Pyspark?

Taylor Cox
Hey Zeinab,

We may have to take a small step back here. The sliding window approach (ie: the window operation) is unique to Data stream mining. So it makes sense that window() is restricted to DStream.

It looks like you're not using a stream mining approach. From what I can see in your code, the files are being read in, and you are using the window() operation after you have all the information.

Here's what can solve your problem:
1) Read the inputs into two DStreams and use window() as needed, or
2) You can always take a range of inputs from a spark RDD. Perhaps this will set you in the right direction:
https://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd

Let me know if this helps your issue,

Taylor

-----Original Message-----
From: zakhavan <[hidden email]>
Sent: Tuesday, October 2, 2018 9:30 AM
To: [hidden email]
Subject: How to do sliding window operation on RDDs in Pyspark?

Hello,

I have 2 text file in the following form and my goal is to calculate the Pearson correlation between them using sliding window in pyspark:

123.00
-12.00
334.00
.
.
.

First I read these 2 text file and store them in RDD format and then I apply the window operation on each RDD but I keep getting this error:
*
AttributeError: 'PipelinedRDD' object has no attribute window*

Here is my code:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("CrossCorrelation").getOrCreate()
    # DEFINE your input path
    input_path1 = sys.argv[1]
    input_path2 = sys.argv[2]



    num_of_partitions = 4
    rdd1 = spark.sparkContext.textFile(input_path1,
num_of_partitions).flatMap(lambda line1:
line1.split("\n").strip()).map(lambda strelem1: float(strelem1))
    rdd2 = spark.sparkContext.textFile(input_path2,
num_of_partitions).flatMap(lambda line2:
line2.split("\n").strip()).map(lambda strelem2: float(strelem2))

    #Windowing
    windowedrdd1= rdd1.window(3,2)
    windowedrdd2= rdd2.window(3,2)

    #Correlation between sliding windows

    CrossCorr = Statistics.corr(windowedrdd1, windowedrdd2,
method="pearson")


    if CrossCorr >= 0.7:
        print("rdd1 & rdd2 are correlated")

I know from the error that window operation is only for DStream but since I have RDD here how I can do window operation on RDDs?

Thank you,

Zeinab





--
Sent from: https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2F&amp;data=02%7C01%7CTaylor.Cox%40microsoft.com%7C67fd11306aa44701149c08d628845f7b%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636740946337699799&amp;sdata=SrN2Aa80JjxZkX4diCllXgkGRADWxleXaJovd8YcfGY%3D&amp;reserved=0

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: How to do sliding window operation on RDDs in Pyspark?

zakhavan
Thank you, Taylor for your reply. The second solution doesn't work for my
case since my text files are getting updated every second. Actually, my
input data is live such that I'm getting 2 streams of data from 2 seismic
sensors and then I write them into 2 text files for simplicity and this is
being done in real-time and text files get updated. But it seems I need to
change my data collection method and store it as 2 DStreams. I know Kafka
will work but I don't know how to do that because I will need to implement a
custom Kafka consumer to consume the incoming data from the sensors and
produce them as DStreams.

The following code is how I'm getting the data and write them into 2 text
files.

Do you have any idea how I can use Kafka in this case so that I have
DStreams instead of RDDs?

from obspy.clients.seedlink.easyseedlink import create_client
from obspy import read
import numpy as np
import obspy
from obspy import UTCDateTime


def handle_data(trace):
    print('Received new data:')
    print(trace)
    print()


    if trace.stats.network == "IU":
        trace.write("/home/zeinab/data1.mseed")
        st1 = obspy.read("/home/zeinab/data1.mseed")
        for i, el1 in enumerate(st1):
            f = open("%s_%d" % ("out_file1.txt", i), "a")
            f1 = open("%s_%d" % ("timestamp_file1.txt", i), "a")
            np.savetxt(f, el1.data, fmt="%f")
            np.savetxt(f1, el1.times("utcdatetime"), fmt="%s")
            f.close()
            f1.close()
    if trace.stats.network == "CU":
        trace.write("/home/zeinab/data2.mseed")
        st2 = obspy.read("/home/zeinab/data2.mseed")
        for j, el2 in enumerate(st2):
            ff = open("%s_%d" % ("out_file2.txt", j), "a")
            ff1 = open("%s_%d" % ("timestamp_file2.txt", j), "a")
            np.savetxt(ff, el2.data, fmt="%f")
            np.savetxt(ff1, el2.times("utcdatetime"), fmt="%s")
            ff.close()
            ff1.close()







client = create_client('rtserve.iris.washington.edu:18000', handle_data)
client.select_stream('IU', 'ANMO', 'BHZ')
client.select_stream('CU', 'ANWB', 'BHZ')
client.run()

Thank you,

Zeinab



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: How to do sliding window operation on RDDs in Pyspark?

Taylor Cox
Have a look at this guide here:
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

You should be able to send your sensor data to a Kafka topic, which Spark will subscribe to. You may need to use an Input DStream to connect Kafka to Spark.

https://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/#read-parallelism-in-spark-streaming

Taylor

-----Original Message-----
From: zakhavan <[hidden email]>
Sent: Tuesday, October 2, 2018 1:16 PM
To: [hidden email]
Subject: RE: How to do sliding window operation on RDDs in Pyspark?

Thank you, Taylor for your reply. The second solution doesn't work for my case since my text files are getting updated every second. Actually, my input data is live such that I'm getting 2 streams of data from 2 seismic sensors and then I write them into 2 text files for simplicity and this is being done in real-time and text files get updated. But it seems I need to change my data collection method and store it as 2 DStreams. I know Kafka will work but I don't know how to do that because I will need to implement a custom Kafka consumer to consume the incoming data from the sensors and produce them as DStreams.

The following code is how I'm getting the data and write them into 2 text files.

Do you have any idea how I can use Kafka in this case so that I have DStreams instead of RDDs?

from obspy.clients.seedlink.easyseedlink import create_client from obspy import read import numpy as np import obspy from obspy import UTCDateTime


def handle_data(trace):
    print('Received new data:')
    print(trace)
    print()


    if trace.stats.network == "IU":
        trace.write("/home/zeinab/data1.mseed")
        st1 = obspy.read("/home/zeinab/data1.mseed")
        for i, el1 in enumerate(st1):
            f = open("%s_%d" % ("out_file1.txt", i), "a")
            f1 = open("%s_%d" % ("timestamp_file1.txt", i), "a")
            np.savetxt(f, el1.data, fmt="%f")
            np.savetxt(f1, el1.times("utcdatetime"), fmt="%s")
            f.close()
            f1.close()
    if trace.stats.network == "CU":
        trace.write("/home/zeinab/data2.mseed")
        st2 = obspy.read("/home/zeinab/data2.mseed")
        for j, el2 in enumerate(st2):
            ff = open("%s_%d" % ("out_file2.txt", j), "a")
            ff1 = open("%s_%d" % ("timestamp_file2.txt", j), "a")
            np.savetxt(ff, el2.data, fmt="%f")
            np.savetxt(ff1, el2.times("utcdatetime"), fmt="%s")
            ff.close()
            ff1.close()







client = create_client('rtserve.iris.washington.edu:18000', handle_data) client.select_stream('IU', 'ANMO', 'BHZ') client.select_stream('CU', 'ANWB', 'BHZ')
client.run()

Thank you,

Zeinab



--
Sent from: https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2F&amp;data=02%7C01%7CTaylor.Cox%40microsoft.com%7C4fc4bb46120a45b8074808d628a3daea%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636741081549350604&amp;sdata=Ucj9pU3mow1woS%2Bp%2B5F9eyYkKPzTyvGFuPnYWhEgsBk%3D&amp;reserved=0

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: How to do sliding window operation on RDDs in Pyspark?

zakhavan
Thank you. It helps.

Zeinab



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]