spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zakhavan <>
Subject How to do sliding window operation on RDDs in Pyspark?
Date Tue, 02 Oct 2018 16:30:27 GMT

I have 2 text file in the following form and my goal is to calculate the
Pearson correlation between them using sliding window in pyspark:


First I read these 2 text file and store them in RDD format and then I apply
the window operation on each RDD but I keep getting this error:
AttributeError: 'PipelinedRDD' object has no attribute window*

Here is my code:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("CrossCorrelation").getOrCreate()
    #	DEFINE your input path
    input_path1 = sys.argv[1]
    input_path2 = sys.argv[2]

    num_of_partitions = 4
    rdd1 = spark.sparkContext.textFile(input_path1,
num_of_partitions).flatMap(lambda line1:
line1.split("\n").strip()).map(lambda strelem1: float(strelem1))
    rdd2 = spark.sparkContext.textFile(input_path2,
num_of_partitions).flatMap(lambda line2:
line2.split("\n").strip()).map(lambda strelem2: float(strelem2))

    windowedrdd1= rdd1.window(3,2)
    windowedrdd2= rdd2.window(3,2)

    #Correlation between sliding windows

    CrossCorr = Statistics.corr(windowedrdd1, windowedrdd2,

    if CrossCorr >= 0.7:
        print("rdd1 & rdd2 are correlated")

I know from the error that window operation is only for DStream but since I
have RDD here how I can do window operation on RDDs?

Thank you,


Sent from:

To unsubscribe e-mail:

View raw message