spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taylor Cox <Taylor....@microsoft.com.INVALID>
Subject RE: How to do sliding window operation on RDDs in Pyspark?
Date Tue, 02 Oct 2018 17:37:06 GMT
Hey Zeinab,

We may have to take a small step back here. The sliding window approach (ie: the window operation)
is unique to Data stream mining. So it makes sense that window() is restricted to DStream.


It looks like you're not using a stream mining approach. From what I can see in your code,
the files are being read in, and you are using the window() operation after you have all the
information.

Here's what can solve your problem:
1) Read the inputs into two DStreams and use window() as needed, or
2) You can always take a range of inputs from a spark RDD. Perhaps this will set you in the
right direction:
https://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd

Let me know if this helps your issue,

Taylor

-----Original Message-----
From: zakhavan <zakhavan@unm.edu> 
Sent: Tuesday, October 2, 2018 9:30 AM
To: user@spark.apache.org
Subject: How to do sliding window operation on RDDs in Pyspark?

Hello,

I have 2 text file in the following form and my goal is to calculate the Pearson correlation
between them using sliding window in pyspark:

123.00
-12.00
334.00
.
.
.

First I read these 2 text file and store them in RDD format and then I apply the window operation
on each RDD but I keep getting this error:
*
AttributeError: 'PipelinedRDD' object has no attribute window*

Here is my code:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("CrossCorrelation").getOrCreate()
    #	DEFINE your input path
    input_path1 = sys.argv[1]
    input_path2 = sys.argv[2]



    num_of_partitions = 4
    rdd1 = spark.sparkContext.textFile(input_path1,
num_of_partitions).flatMap(lambda line1:
line1.split("\n").strip()).map(lambda strelem1: float(strelem1))
    rdd2 = spark.sparkContext.textFile(input_path2,
num_of_partitions).flatMap(lambda line2:
line2.split("\n").strip()).map(lambda strelem2: float(strelem2))

    #Windowing
    windowedrdd1= rdd1.window(3,2)
    windowedrdd2= rdd2.window(3,2)

    #Correlation between sliding windows

    CrossCorr = Statistics.corr(windowedrdd1, windowedrdd2,
method="pearson")


    if CrossCorr >= 0.7:
        print("rdd1 & rdd2 are correlated")

I know from the error that window operation is only for DStream but since I have RDD here
how I can do window operation on RDDs?

Thank you,

Zeinab





--
Sent from: https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2F&amp;data=02%7C01%7CTaylor.Cox%40microsoft.com%7C67fd11306aa44701149c08d628845f7b%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636740946337699799&amp;sdata=SrN2Aa80JjxZkX4diCllXgkGRADWxleXaJovd8YcfGY%3D&amp;reserved=0

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message