spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taylor Cox <>
Subject RE: How to do sliding window operation on RDDs in Pyspark?
Date Tue, 02 Oct 2018 17:37:06 GMT
Hey Zeinab,

We may have to take a small step back here. The sliding window approach (ie: the window operation)
is unique to Data stream mining. So it makes sense that window() is restricted to DStream.

It looks like you're not using a stream mining approach. From what I can see in your code,
the files are being read in, and you are using the window() operation after you have all the

Here's what can solve your problem:
1) Read the inputs into two DStreams and use window() as needed, or
2) You can always take a range of inputs from a spark RDD. Perhaps this will set you in the
right direction:

Let me know if this helps your issue,


-----Original Message-----
From: zakhavan <> 
Sent: Tuesday, October 2, 2018 9:30 AM
Subject: How to do sliding window operation on RDDs in Pyspark?


I have 2 text file in the following form and my goal is to calculate the Pearson correlation
between them using sliding window in pyspark:


First I read these 2 text file and store them in RDD format and then I apply the window operation
on each RDD but I keep getting this error:
AttributeError: 'PipelinedRDD' object has no attribute window*

Here is my code:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("CrossCorrelation").getOrCreate()
    #	DEFINE your input path
    input_path1 = sys.argv[1]
    input_path2 = sys.argv[2]

    num_of_partitions = 4
    rdd1 = spark.sparkContext.textFile(input_path1,
num_of_partitions).flatMap(lambda line1:
line1.split("\n").strip()).map(lambda strelem1: float(strelem1))
    rdd2 = spark.sparkContext.textFile(input_path2,
num_of_partitions).flatMap(lambda line2:
line2.split("\n").strip()).map(lambda strelem2: float(strelem2))

    windowedrdd1= rdd1.window(3,2)
    windowedrdd2= rdd2.window(3,2)

    #Correlation between sliding windows

    CrossCorr = Statistics.corr(windowedrdd1, windowedrdd2,

    if CrossCorr >= 0.7:
        print("rdd1 & rdd2 are correlated")

I know from the error that window operation is only for DStream but since I have RDD here
how I can do window operation on RDDs?

Thank you,


Sent from:;;sdata=SrN2Aa80JjxZkX4diCllXgkGRADWxleXaJovd8YcfGY%3D&amp;reserved=0

To unsubscribe e-mail:

To unsubscribe e-mail:

View raw message