spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taylor Cox <Taylor....@microsoft.com.INVALID>
Subject RE: How to do sliding window operation on RDDs in Pyspark?
Date Tue, 02 Oct 2018 21:59:22 GMT
Have a look at this guide here:
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

You should be able to send your sensor data to a Kafka topic, which Spark will subscribe to.
You may need to use an Input DStream to connect Kafka to Spark.

https://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/#read-parallelism-in-spark-streaming

Taylor

-----Original Message-----
From: zakhavan <zakhavan@unm.edu> 
Sent: Tuesday, October 2, 2018 1:16 PM
To: user@spark.apache.org
Subject: RE: How to do sliding window operation on RDDs in Pyspark?

Thank you, Taylor for your reply. The second solution doesn't work for my case since my text
files are getting updated every second. Actually, my input data is live such that I'm getting
2 streams of data from 2 seismic sensors and then I write them into 2 text files for simplicity
and this is being done in real-time and text files get updated. But it seems I need to change
my data collection method and store it as 2 DStreams. I know Kafka will work but I don't know
how to do that because I will need to implement a custom Kafka consumer to consume the incoming
data from the sensors and produce them as DStreams.

The following code is how I'm getting the data and write them into 2 text files.

Do you have any idea how I can use Kafka in this case so that I have DStreams instead of RDDs?

from obspy.clients.seedlink.easyseedlink import create_client from obspy import read import
numpy as np import obspy from obspy import UTCDateTime


def handle_data(trace):
    print('Received new data:')
    print(trace)
    print()


    if trace.stats.network == "IU":
        trace.write("/home/zeinab/data1.mseed")
        st1 = obspy.read("/home/zeinab/data1.mseed")
        for i, el1 in enumerate(st1):
            f = open("%s_%d" % ("out_file1.txt", i), "a")
            f1 = open("%s_%d" % ("timestamp_file1.txt", i), "a")
            np.savetxt(f, el1.data, fmt="%f")
            np.savetxt(f1, el1.times("utcdatetime"), fmt="%s")
            f.close()
            f1.close()
    if trace.stats.network == "CU":
        trace.write("/home/zeinab/data2.mseed")
        st2 = obspy.read("/home/zeinab/data2.mseed")
        for j, el2 in enumerate(st2):
            ff = open("%s_%d" % ("out_file2.txt", j), "a")
            ff1 = open("%s_%d" % ("timestamp_file2.txt", j), "a")
            np.savetxt(ff, el2.data, fmt="%f")
            np.savetxt(ff1, el2.times("utcdatetime"), fmt="%s")
            ff.close()
            ff1.close()







client = create_client('rtserve.iris.washington.edu:18000', handle_data) client.select_stream('IU',
'ANMO', 'BHZ') client.select_stream('CU', 'ANWB', 'BHZ')
client.run()

Thank you,

Zeinab



--
Sent from: https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fapache-spark-user-list.1001560.n3.nabble.com%2F&amp;data=02%7C01%7CTaylor.Cox%40microsoft.com%7C4fc4bb46120a45b8074808d628a3daea%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636741081549350604&amp;sdata=Ucj9pU3mow1woS%2Bp%2B5F9eyYkKPzTyvGFuPnYWhEgsBk%3D&amp;reserved=0

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message