spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anupam Bagchi <anupam_bag...@rocketmail.com>
Subject Finding moving average using Spark and Scala
Date Mon, 13 Jul 2015 17:07:05 GMT
I have to do the following tasks on a dataset using Apache Spark with Scala as the programming
language:   
   - Read the dataset from HDFS. A few sample lines look like this:

deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
   
   - Group the data by device id. Thus we now have a map of deviceid => (bytes,eventdate)
   - For each device, sort the set by eventdate. We now have an ordered set of bytes based
on eventdate for each device.
   - Pick the last 30 days of bytes from this ordered set.
   - Find the moving average of bytes for the last date using a time period of 30.
   - Find the standard deviation of the bytes for the final date using a time period of 30.
   - Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k = 3]
I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run on a billion
rows finally.Here is the data structure for the dataset.package com.testing
case class DeviceAggregates (
                        device_id: Integer,
                        bytes: Long,
                        eventdate: Integer
                   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
    eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
    val c = logline.split(",")
    DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}The DeviceAnalyzer class looks like this:I have a very crude implementation that does the
job, but it is not up to the mark. Sorry, I am very new to Scala/Spark, so my questions are
quite basic. Here is what I have now:
import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Device Analyzer")
    val sc = new SparkContext(sparkConf)

    val logFile = args(0)

    val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

    // Calculate statistics based on bytes
    val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

    deviceIdsMap.foreach(a => {
      val device_id = a._1  // This is the device ID
      val allaggregates = a._2  // This is an array of all device-aggregates for this device

      println(allaggregates)
      Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of DailyDeviceAggregates
based on eventdate
      println(allaggregates) // This does not work - results are not sorted !!

      val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
      val count = byteValues.count(A => true)
      val sum = byteValues.sum
      val xbar = sum / count
      val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
      val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

      val vector: Vector = Vectors.dense(byteValues)
      println(vector)
      println(device_id + "," + xbar + "," + stddev)

      //val vector: Vector = Vectors.dense(byteValues)
      //println(vector)
      //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
    })

    sc.stop()
  }
}I would really appreciate if someone can suggests improvements for the following:   
   - The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong way.
   - I would like to use the Spark mllib class MultivariateStatisticalSummary to calculate
the statistical values.
   - For that I would need to keep all my intermediate values as RDD so that I can directly
use the RDD methods to do the job.
   - At the end I also need to write the results to HDFS for which there is a method provided
on the RDD class to do so, which is another reason I would like to retain everything as RDD.

Thanks in advance for your help.
Anupam Bagchi 
Mime
View raw message