spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zakhavan <zakha...@unm.edu>
Subject RE: How to do sliding window operation on RDDs in Pyspark?
Date Tue, 02 Oct 2018 20:15:42 GMT
Thank you, Taylor for your reply. The second solution doesn't work for my
case since my text files are getting updated every second. Actually, my
input data is live such that I'm getting 2 streams of data from 2 seismic 
sensors and then I write them into 2 text files for simplicity and this is
being done in real-time and text files get updated. But it seems I need to
change my data collection method and store it as 2 DStreams. I know Kafka
will work but I don't know how to do that because I will need to implement a
custom Kafka consumer to consume the incoming data from the sensors and
produce them as DStreams.

The following code is how I'm getting the data and write them into 2 text
files.

Do you have any idea how I can use Kafka in this case so that I have
DStreams instead of RDDs?

from obspy.clients.seedlink.easyseedlink import create_client
from obspy import read
import numpy as np
import obspy
from obspy import UTCDateTime


def handle_data(trace):
    print('Received new data:')
    print(trace)
    print()


    if trace.stats.network == "IU":
        trace.write("/home/zeinab/data1.mseed")
        st1 = obspy.read("/home/zeinab/data1.mseed")
        for i, el1 in enumerate(st1):
            f = open("%s_%d" % ("out_file1.txt", i), "a")
            f1 = open("%s_%d" % ("timestamp_file1.txt", i), "a")
            np.savetxt(f, el1.data, fmt="%f")
            np.savetxt(f1, el1.times("utcdatetime"), fmt="%s")
            f.close()
            f1.close()
    if trace.stats.network == "CU":
        trace.write("/home/zeinab/data2.mseed")
        st2 = obspy.read("/home/zeinab/data2.mseed")
        for j, el2 in enumerate(st2):
            ff = open("%s_%d" % ("out_file2.txt", j), "a")
            ff1 = open("%s_%d" % ("timestamp_file2.txt", j), "a")
            np.savetxt(ff, el2.data, fmt="%f")
            np.savetxt(ff1, el2.times("utcdatetime"), fmt="%s")
            ff.close()
            ff1.close()







client = create_client('rtserve.iris.washington.edu:18000', handle_data)
client.select_stream('IU', 'ANMO', 'BHZ')
client.select_stream('CU', 'ANWB', 'BHZ')
client.run()

Thank you,

Zeinab



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message