spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "PAULI, KEVIN CHRISTIAN [AG-Contractor/1000]" <kevin.christian.pa...@monsanto.com>
Subject streaming and piping to R, sending all data in window to pipe()
Date Fri, 17 Jul 2015 16:21:22 GMT
Spark newbie here, using Spark 1.3.1.

I’m consuming a stream and trying to pipe the data from the entire window to R for analysis.
 The R algorithm needs the entire dataset from the stream (everything in the window) in order
to function properly; it can’t be broken up.

So I tried doing a coalesce(1) before calling pipe(), but it still seems to be breaking up
the data and invoking R, but it still seems to to be breaking up the data and invoking R multiple
times with small pieces of data.  Is there some other approach I should try?

Here’s a small snippet:

    val inputs: DStream[String] = MQTTUtils.createStream(ssc, mqttBrokerUrl, inputsTopic,
StorageLevel.MEMORY_AND_DISK_SER)
      .window(duration)
    inputs.foreachRDD {
      windowRdd => {
        if (windowRdd.count() > 0) processWindow(windowRdd)
      }
    }

...

  def processWindow(windowRdd: RDD[String]) = {
    // call R script to process data
    windowRdd.coalesce(1)
    val outputsRdd: RDD[String] = windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString))
    outputsRdd.cache()

    if (outputsRdd.count() > 0) processOutputs(outputsRdd)
  }

...

This e-mail message may contain privileged and/or confidential information, and is intended
to be received only by persons entitled
to receive such information. If you have received this e-mail in error, please notify the
sender immediately. Please delete it and
all attachments from any servers, hard drives or any other media. Other use of this e-mail
by you is strictly prohibited.

All e-mails and attachments sent and received are subject to monitoring, reading and archival
by Monsanto, including its
subsidiaries. The recipient of this e-mail is solely responsible for checking for the presence
of "Viruses" or other "Malware".
Monsanto, along with its subsidiaries, accepts no liability for any damage caused by any such
code transmitted by or accompanying
this e-mail or any attachment.


The information contained in this email may be subject to the export control laws and regulations
of the United States, potentially
including but not limited to the Export Administration Regulations (EAR) and sanctions regulations
issued by the U.S. Department of
Treasury, Office of Foreign Asset Controls (OFAC).  As a recipient of this information you
are obligated to comply with all
applicable U.S. export laws and regulations.

Mime
View raw message