spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <>
Subject Re: streaming and piping to R, sending all data in window to pipe()
Date Sun, 19 Jul 2015 15:15:18 GMT
Did you try inputs.repartition(1).foreachRDD(..)?

Best Regards

On Fri, Jul 17, 2015 at 9:51 PM, PAULI, KEVIN CHRISTIAN
[AG-Contractor/1000] <> wrote:

>  Spark newbie here, using Spark 1.3.1.
>  I’m consuming a stream and trying to pipe the data from the entire
> window to R for analysis.  The R algorithm needs the entire dataset from
> the stream (everything in the window) in order to function properly; it
> can’t be broken up.
>  So I tried doing a coalesce(1) before calling pipe(), but it still seems
> to be breaking up the data and invoking R, but it still seems to to be
> breaking up the data and invoking R multiple times with small pieces of
> data.  Is there some other approach I should try?
>  Here’s a small snippet:
>      val inputs: DStream[String] = MQTTUtils.createStream(ssc,
> mqttBrokerUrl, inputsTopic, StorageLevel.MEMORY_AND_DISK_SER)
>       .window(duration)
>     inputs.foreachRDD {
>       windowRdd => {
>         if (windowRdd.count() > 0) processWindow(windowRdd)
>       }
>     }
>  ...
>    def processWindow(windowRdd: RDD[String]) = {
>     // call R script to process data
>     windowRdd.coalesce(1)
>     val outputsRdd: RDD[String] =
> windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString))
>     outputsRdd.cache()
>      if (outputsRdd.count() > 0) processOutputs(outputsRdd)
>   }
>  ...
> This e-mail message may contain privileged and/or confidential information, and is intended
to be received only by persons entitled
> to receive such information. If you have received this e-mail in error, please notify
the sender immediately. Please delete it and
> all attachments from any servers, hard drives or any other media. Other use of this e-mail
by you is strictly prohibited.
> All e-mails and attachments sent and received are subject to monitoring, reading and
archival by Monsanto, including its
> subsidiaries. The recipient of this e-mail is solely responsible for checking for the
presence of "Viruses" or other "Malware".
> Monsanto, along with its subsidiaries, accepts no liability for any damage caused by
any such code transmitted by or accompanying
> this e-mail or any attachment.
> The information contained in this email may be subject to the export control laws and
regulations of the United States, potentially
> including but not limited to the Export Administration Regulations (EAR) and sanctions
regulations issued by the U.S. Department of
> Treasury, Office of Foreign Asset Controls (OFAC).  As a recipient of this information
you are obligated to comply with all
> applicable U.S. export laws and regulations.

View raw message