Thanks Feynman for your direction.

I was able to solve this problem by calling Spark API from Java.

Here is a code snippet that may help other people who might face the same challenge.

        if (args.length > 2) {
            earliestEventDate = Integer.parseInt(args[2]);
        } else {
            Date now = Calendar.getInstance().getTime();
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
            earliestEventDate = Integer.parseInt(dateFormat.format(new Date(now.getTime()-30L*AnalyticsConstants.ONE_DAY_IN_MILLISECONDS)));
        }
        System.out.println("Filtering out dates earlier than: " + earliestEventDate);
        JavaRDD<String> logLines = sc.textFile(inputFile);

        // Convert the text log lines to DailyDeviceAggregates objects and cache them
        JavaRDD<DailyDeviceAggregates> accessLogs = logLines.map(Functions.PARSE_DEVICE_AGGREGATE_LINE).filter(new Function<DailyDeviceAggregates, Boolean>() {
            @Override
            public Boolean call(DailyDeviceAggregates value) {
                return (value.getEventdate() >= earliestEventDate);
            }
        }).cache();
        // accessLogs.saveAsTextFile("accessLogs.saved");

        JavaPairRDD<Object, Iterable<DailyDeviceAggregates>> groupMap = accessLogs.groupBy(new Function<DailyDeviceAggregates, Object>() {
            @Override
            public Object call(DailyDeviceAggregates agg) throws Exception {
                return agg.getDevice_id();
            }
        });
        // groupMap.saveAsTextFile("groupedAccessLogs.saved");

        JavaPairRDD<Object, DailyDeviceSummary> deviceCharacteristics = groupMap.mapValues(new Function<Iterable<DailyDeviceAggregates>, DailyDeviceSummary>() {
            @Override
            public DailyDeviceSummary call(Iterable<DailyDeviceAggregates> allDeviceDataForMonth) throws Exception {
                // First task is to sort the input values by eventdate
                ArrayList<DailyDeviceAggregates> arr = new ArrayList<DailyDeviceAggregates>();
                for (DailyDeviceAggregates agg: allDeviceDataForMonth) {
                    arr.add(agg);
                }
                Collections.sort(arr);
                // Done sorting

                double bytesTransferred[] = new double[arr.size()];
                double bytesIn[] = new double[arr.size()];
                double bytesOut[] = new double[arr.size()];

                DailyDeviceAggregates lastAggregate = null;
                int index = 0;
                for (DailyDeviceAggregates aggregate : arr) {
                    // System.out.println(aggregate);
                    bytesIn[index] = aggregate.getBytes_in();
                    bytesOut[index] = aggregate.getBytes_out();
                    bytesTransferred[index] = aggregate.getBytes_transferred();
                    index++;
                    lastAggregate = aggregate;
                }
                BollingerBands bollingerBytesTransferrred = new BollingerBands(bytesTransferred, 30, 2.0);
                BollingerBands bollingerBytesIn = new BollingerBands(bytesIn, 30, 2.0);
                BollingerBands bollingerBytesOut = new BollingerBands(bytesOut, 30, 2.0);

                return new DailyDeviceSummary(lastAggregate.getAccount_id(), lastAggregate.getDevice_id(), index,
                        bollingerBytesIn.getLastMean(), bollingerBytesOut.getLastMean(), bollingerBytesTransferrred.getLastMean(),
                        bollingerBytesIn.getLastStandardDeviation(), bollingerBytesOut.getLastStandardDeviation(), bollingerBytesTransferrred.getLastStandardDeviation(),
                        (long)bollingerBytesIn.getLastUpperThreshold(), (long)bollingerBytesOut.getLastUpperThreshold(), (long)bollingerBytesTransferrred.getLastUpperThreshold(),
                        (long)bollingerBytesIn.getLastLowerThreshold(), (long)bollingerBytesOut.getLastLowerThreshold(), (long)bollingerBytesTransferrred.getLastLowerThreshold());
            }
        });

        deviceCharacteristics.values().saveAsTextFile(outputFile);

Anupam Bagchi


On Jul 14, 2015, at 10:21 AM, Feynman Liang <fliang@databricks.com> wrote:

If your rows may have NAs in them, I would process each column individually by first projecting the column ( map(x => x.nameOfColumn) ), filtering out the NAs, then running a summarizer over each column.

Even if you have many rows, after summarizing you will only have a vector of length #columns.

On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com> wrote:
Hello Feynman,

Actually in my case, the vectors I am summarizing over will not have the same dimension since many devices will be inactive on some days. This is at best a sparse matrix where we take only the active days and attempt to fit a moving average over it.

The reason I would like to save it to HDFS is that there are really several million (almost a billion) devices for which this data needs to be written. I am perhaps writing a very few columns, but the number of rows is pretty large.

Given the above two cases, is using MultivariateOnlineSummarizer not a good idea then?

Anupam Bagchi


On Jul 13, 2015, at 7:06 PM, Feynman Liang <fliang@databricks.com> wrote:

Dimensions mismatch when adding new sample. Expecting 8 but got 14.

Make sure all the vectors you are summarizing over have the same dimension.

Why would you want to write a MultivariateOnlineSummary object (which can be represented with a couple Double's) into a distributed filesystem like HDFS?

On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com> wrote:
Thank you Feynman for the lead.

I was able to modify the code using clues from the RegressionMetrics example. Here is what I got now.

val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

// Calculate statistics based on bytes-transferred
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
println(deviceIdsMap.collect().deep.mkString("\n"))

val summary: MultivariateStatisticalSummary = {
val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
case (deviceId, allaggregates) => Vectors.dense({
val sortedAggregates = allaggregates.toArray
Sorting.quickSort(sortedAggregates)
sortedAggregates.map(dda => dda.bytes.toDouble)
})
}.aggregate(new MultivariateOnlineSummarizer())(
(summary, v) => summary.add(v), // Not sure if this is really what I want, it just came from the example
(sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
)
summary
}
It compiles fine. But I am now getting an exception as follows at Runtime.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 5, localhost): java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch when adding new sample. Expecting 8 but got 14.
        at scala.Predef$.require(Predef.scala:233)
        at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
        at com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
        at com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
        at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
        at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
        at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
        at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
        at org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

Can’t tell where exactly I went wrong. Also, how do I take the MultivariateOnlineSummary object and write it to HDFS? I have the MultivariateOnlineSummary object with me, but I really need an RDD to call saveAsTextFile() on it.

Anupam Bagchi


On Jul 13, 2015, at 4:52 PM, Feynman Liang <fliang@databricks.com> wrote:

A good example is RegressionMetrics's use of of OnlineMultivariateSummarizer to aggregate statistics across labels and residuals; take a look at how aggregateByKey is used there.

On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com> wrote:
Thank you Feynman for your response. Since I am very new to Scala I may need a bit more hand-holding at this stage.

I have been able to incorporate your suggestion about sorting - and it now works perfectly. Thanks again for that.

I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could not proceed further. For each deviceid (the key) my goal is to get a vector of doubles on which I can query the mean and standard deviation. Now because RDDs are immutable, I cannot use a foreach loop to interate through the groupby results and individually add the values in an RDD - Spark does not allow that. I need to apply the RDD functions directly on the entire set to achieve the transformations I need. This is where I am faltering since I am not used to the lambda expressions that Scala uses.

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
    // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything wrong?
    // All I need to do below is collect the vector of bytes for each device and store it in the RDD
    // The problem with the ‘foreach' approach below, is that it generates the vector values one at a time, which I cannot 
    // add individually to an immutable RDD
    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates for this device

      val sortedaggregates = allaggregates.toArray
      Sorting.quickSort(sortedaggregates)

      val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray 
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

    })
      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)

sc
.stop() } }
Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks a lot for your help.

Anupam Bagchi


On Jul 13, 2015, at 12:21 PM, Feynman Liang <fliang@databricks.com> wrote:

  • The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
  • allaggregates.toArray allocates and creates a new array separate from allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
    val sortedAggregates = allaggregates.toArray
    Sorting.quickSort(sortedAggregates)
  • I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
  • MultivariateStatisticalSummary is a trait (similar to a Java interface); you probably want to use MultivariateOnlineSummarizer. 
  • For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
  • Correct; you would do an aggregate using the add and merge functions provided by MultivariateOnlineSummarizer 
  • At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.
  • You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or you could unpack the relevant statistics from MultivariateOnlineSummarizer into an array/tuple using a mapValues first and then write.

    On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <anupam_bagchi@rocketmail.com> wrote:
    I have to do the following tasks on a dataset using Apache Spark with Scala as the programming language:
    1. Read the dataset from HDFS. A few sample lines look like this:
    deviceid,bytes,eventdate
    15590657,246620,20150630
    14066921,1907,20150621
    14066921,1906,20150626
    6522013,2349,20150626
    6522013,2525,20150613
    1. Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)
    2. For each device, sort the set by eventdate. We now have an ordered set of bytes based on eventdate for each device.
    3. Pick the last 30 days of bytes from this ordered set.
    4. Find the moving average of bytes for the last date using a time period of 30.
    5. Find the standard deviation of the bytes for the final date using a time period of 30.
    6. Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]
    I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion rows finally.
    Here is the data structure for the dataset.
    package com.testing
    case class DeviceAggregates (
                            device_id: Integer,
                            bytes: Long,
                            eventdate: Integer
                       ) extends Ordered[DailyDeviceAggregates] {
      def compare(that: DailyDeviceAggregates): Int = {
        eventdate - that.eventdate
      }
    }
    object DeviceAggregates {
      def parseLogLine(logline: String): DailyDeviceAggregates = {
        val c = logline.split(",")
        DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
      }
    }
    The DeviceAnalyzer class looks like this:
    I have a very crude implementation that does the job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. Here is what I have now:

    import com.testing.DailyDeviceAggregates
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.mllib.linalg.Vector
    import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    
    import scala.util.Sorting
    
    object DeviceAnalyzer {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("Device Analyzer")
        val sc = new SparkContext(sparkConf)
    
        val logFile = args(0)
    
        val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
    
        // Calculate statistics based on bytes
        val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
    
        deviceIdsMap.foreach(a => {
          val device_id = a._1  // This is the device ID
          val allaggregates = a._2  // This is an array of all device-aggregates for this device
    
          println(allaggregates)
          Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates based on eventdate
          println(allaggregates) // This does not work - results are not sorted !!
    
          val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
          val count = byteValues.count(A => true)
          val sum = byteValues.sum
          val xbar = sum / count
          val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
          val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
    
          val vector: Vector = Vectors.dense(byteValues)
          println(vector)
          println(device_id + "," + xbar + "," + stddev)
    
          //val vector: Vector = Vectors.dense(byteValues)
          //println(vector)
          //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
        })
    
        sc.stop()
      }
    }
    I would really appreciate if someone can suggests improvements for the following:
    1. The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
    2. I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate the statistical values.
    3. For that I would need to keep all my intermediate values as RDD so that I can directly use the RDD methods to do the job.
    4. At the end I also need to write the results to HDFS for which there is a method provided on the RDD class to do so, which is another reason I would like to retain everything as RDD.

    Thanks in advance for your help.

    Anupam Bagchi