spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Feynman Liang <fli...@databricks.com>
Subject Re: Finding moving average using Spark and Scala
Date Tue, 14 Jul 2015 17:21:04 GMT
If your rows may have NAs in them, I would process each column individually
by first projecting the column ( map(x => x.nameOfColumn) ), filtering out
the NAs, then running a summarizer over each column.

Even if you have many rows, after summarizing you will only have a vector
of length #columns.

On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi <anupam_bagchi@rocketmail.com
> wrote:

> Hello Feynman,
>
> Actually in my case, the vectors I am summarizing over will not have the
> same dimension since many devices will be inactive on some days. This is at
> best a sparse matrix where we take only the active days and attempt to fit
> a moving average over it.
>
> The reason I would like to save it to HDFS is that there are really
> several million (almost a billion) devices for which this data needs to be
> written. I am perhaps writing a very few columns, but the number of rows is
> pretty large.
>
> Given the above two cases, is using MultivariateOnlineSummarizer not a
> good idea then?
>
> Anupam Bagchi
>
>
> On Jul 13, 2015, at 7:06 PM, Feynman Liang <fliang@databricks.com> wrote:
>
> Dimensions mismatch when adding new sample. Expecting 8 but got 14.
>
> Make sure all the vectors you are summarizing over have the same dimension.
>
> Why would you want to write a MultivariateOnlineSummary object (which can
> be represented with a couple Double's) into a distributed filesystem like
> HDFS?
>
> On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <
> anupam_bagchi@rocketmail.com> wrote:
>
>> Thank you Feynman for the lead.
>>
>> I was able to modify the code using clues from the RegressionMetrics
>> example. Here is what I got now.
>>
>> val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>
>> // Calculate statistics based on bytes-transferred
>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>> println(deviceIdsMap.collect().deep.mkString("\n"))
>>
>> val summary: MultivariateStatisticalSummary = {
>>   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
>>     case (deviceId, allaggregates) => Vectors.dense({
>>       val sortedAggregates = allaggregates.toArray
>>       Sorting.quickSort(sortedAggregates)
>>       sortedAggregates.map(dda => dda.bytes.toDouble)
>>     })
>>   }.aggregate(new MultivariateOnlineSummarizer())(
>>       (summary, v) => summary.add(v),  // Not sure if this is really what I want,
it just came from the example
>>       (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
>>     )
>>   summary
>> }
>>
>> It compiles fine. But I am now getting an exception as follows at Runtime.
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent
>> failure: Lost task 1.0 in stage 3.0 (TID 5, localhost):
>> java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch
>> when adding new sample. Expecting 8 but got 14.
>>         at scala.Predef$.require(Predef.scala:233)
>>         at
>> org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
>>         at
>> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>>         at
>> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>>         at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>>         at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>         at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>>         at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>>         at
>> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>>         at
>> scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>>         at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>>         at
>> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>>         at
>> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:722)
>>
>> Can’t tell where exactly I went wrong. Also, how do I take the
>> MultivariateOnlineSummary object and write it to HDFS? I have the
>> MultivariateOnlineSummary object with me, but I really need an RDD to call
>> saveAsTextFile() on it.
>>
>> Anupam Bagchi
>>
>>
>> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fliang@databricks.com> wrote:
>>
>> A good example is RegressionMetrics
>> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s
>> use of of OnlineMultivariateSummarizer to aggregate statistics across
>> labels and residuals; take a look at how aggregateByKey is used there.
>>
>> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <
>> anupam_bagchi@rocketmail.com> wrote:
>>
>>> Thank you Feynman for your response. Since I am very new to Scala I may
>>> need a bit more hand-holding at this stage.
>>>
>>> I have been able to incorporate your suggestion about sorting - and it
>>> now works perfectly. Thanks again for that.
>>>
>>> I tried to use your suggestion of using MultiVariateOnlineSummarizer,
>>> but could not proceed further. For each deviceid (the key) my goal is to
>>> get a vector of doubles on which I can query the mean and standard
>>> deviation. Now because RDDs are immutable, I cannot use a foreach loop to
>>> interate through the groupby results and individually add the values in an
>>> RDD - Spark does not allow that. I need to apply the RDD functions directly
>>> on the entire set to achieve the transformations I need. This is where I am
>>> faltering since I am not used to the lambda expressions that Scala uses.
>>>
>>> object DeviceAnalyzer {
>>>   def main(args: Array[String]) {
>>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>     val sc = new SparkContext(sparkConf)
>>>
>>>     val logFile = args(0)
>>>
>>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>>
>>>     // Calculate statistics based on bytes
>>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>>
>>>     // Question: Can we not write the line above as deviceAggregateLogs.groupBy(_.device_id).sortBy(c
=> c_.2, true) // Anything wrong?
>>>
>>>     // All I need to do below is collect the vector of bytes for each device
and store it in the RDD
>>>
>>>     // The problem with the ‘foreach' approach below, is that it generates
the vector values one at a time, which I cannot
>>>
>>>     // add individually to an immutable RDD
>>>
>>>     deviceIdsMap.foreach(a => {
>>>       val device_id = a._1  // This is the device ID
>>>       val allaggregates = a._2  // This is an array of all device-aggregates
for this device
>>>
>>>       val sortedaggregates = allaggregates.toArray      Sorting.quickSort(sortedaggregates)
>>>
>>>       val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray
>>>       val count = byteValues.count(A => true)
>>>       val sum = byteValues.sum
>>>       val xbar = sum / count
>>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>>
>>>       val vector: Vector = Vectors.dense(byteValues)
>>>       println(vector)
>>>       println(device_id + "," + xbar + "," + stddev)
>>>     })
>>>
>>>       //val vector: Vector = Vectors.dense(byteValues)
>>>       //println(vector)
>>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>>
>>>
>>> sc.stop() } }
>>>
>>> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way?
>>> Thanks a lot for your help.
>>>
>>> Anupam Bagchi
>>>
>>>
>>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fliang@databricks.com>
>>> wrote:
>>>
>>> The call to Sorting.quicksort is not working. Perhaps I am calling it
>>>> the wrong way.
>>>
>>> allaggregates.toArray allocates and creates a new array separate from
>>> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
>>> val sortedAggregates = allaggregates.toArray
>>> Sorting.quickSort(sortedAggregates)
>>>
>>>> I would like to use the Spark mllib class
>>>> MultivariateStatisticalSummary to calculate the statistical values.
>>>
>>> MultivariateStatisticalSummary is a trait (similar to a Java interface);
>>> you probably want to use MultivariateOnlineSummarizer.
>>>
>>>> For that I would need to keep all my intermediate values as RDD so that
>>>> I can directly use the RDD methods to do the job.
>>>
>>> Correct; you would do an aggregate using the add and merge functions
>>> provided by MultivariateOnlineSummarizer
>>>
>>>> At the end I also need to write the results to HDFS for which there is
>>>> a method provided on the RDD class to do so, which is another reason I
>>>> would like to retain everything as RDD.
>>>
>>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to
>>> HDFS, or you could unpack the relevant statistics from
>>> MultivariateOnlineSummarizer into an array/tuple using a mapValues first
>>> and then write.
>>>
>>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <
>>> anupam_bagchi@rocketmail.com> wrote:
>>>
>>>> I have to do the following tasks on a dataset using Apache Spark with
>>>> Scala as the programming language:
>>>>
>>>>    1. Read the dataset from HDFS. A few sample lines look like this:
>>>>
>>>>  deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613
>>>>
>>>>
>>>>    1. Group the data by device id. Thus we now have a map of deviceid
>>>>    => (bytes,eventdate)
>>>>    2. For each device, sort the set by eventdate. We now have an
>>>>    ordered set of bytes based on eventdate for each device.
>>>>    3. Pick the last 30 days of bytes from this ordered set.
>>>>    4. Find the moving average of bytes for the last date using a time
>>>>    period of 30.
>>>>    5. Find the standard deviation of the bytes for the final date
>>>>    using a time period of 30.
>>>>    6. Return two values in the result (mean - k*stddev) and (mean + k*stddev)
>>>>    [Assume k = 3]
>>>>
>>>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has
>>>> to run on a billion rows finally.
>>>> Here is the data structure for the dataset.
>>>>
>>>> package com.testingcase class DeviceAggregates (
>>>>                         device_id: Integer,
>>>>                         bytes: Long,
>>>>                         eventdate: Integer
>>>>                    ) extends Ordered[DailyDeviceAggregates] {
>>>>   def compare(that: DailyDeviceAggregates): Int = {
>>>>     eventdate - that.eventdate
>>>>   }}object DeviceAggregates {
>>>>   def parseLogLine(logline: String): DailyDeviceAggregates = {
>>>>     val c = logline.split(",")
>>>>     DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>>>>   }}
>>>>
>>>> The DeviceAnalyzer class looks like this:
>>>> I have a very crude implementation that does the job, but it is not up
>>>> to the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
>>>> basic. Here is what I have now:
>>>>
>>>> import com.testing.DailyDeviceAggregatesimport org.apache.spark.{SparkContext,
SparkConf}import org.apache.spark.mllib.linalg.Vectorimport org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,
Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors}
>>>> import scala.util.Sorting
>>>> object DeviceAnalyzer {
>>>>   def main(args: Array[String]) {
>>>>     val sparkConf = new SparkConf().setAppName("Device Analyzer")
>>>>     val sc = new SparkContext(sparkConf)
>>>>
>>>>     val logFile = args(0)
>>>>
>>>>     val deviceAggregateLogs = sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>>>
>>>>     // Calculate statistics based on bytes
>>>>     val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>>>>
>>>>     deviceIdsMap.foreach(a => {
>>>>       val device_id = a._1  // This is the device ID
>>>>       val allaggregates = a._2  // This is an array of all device-aggregates
for this device
>>>>
>>>>       println(allaggregates)
>>>>       Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer
of DailyDeviceAggregates based on eventdate
>>>>       println(allaggregates) // This does not work - results are not sorted
!!
>>>>
>>>>       val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
>>>>       val count = byteValues.count(A => true)
>>>>       val sum = byteValues.sum
>>>>       val xbar = sum / count
>>>>       val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
>>>>       val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>>>>
>>>>       val vector: Vector = Vectors.dense(byteValues)
>>>>       println(vector)
>>>>       println(device_id + "," + xbar + "," + stddev)
>>>>
>>>>       //val vector: Vector = Vectors.dense(byteValues)
>>>>       //println(vector)
>>>>       //val summary: MultivariateStatisticalSummary = Statistics.colStats(vector)
>>>>     })
>>>>
>>>>     sc.stop()
>>>>   }}
>>>>
>>>> I would really appreciate if someone can suggests improvements for the
>>>> following:
>>>>
>>>>    1. The call to Sorting.quicksort is not working. Perhaps I am
>>>>    calling it the wrong way.
>>>>    2. I would like to use the Spark mllib class
>>>>    MultivariateStatisticalSummary to calculate the statistical values.
>>>>    3. For that I would need to keep all my intermediate values as RDD
>>>>    so that I can directly use the RDD methods to do the job.
>>>>    4. At the end I also need to write the results to HDFS for which
>>>>    there is a method provided on the RDD class to do so, which is another
>>>>    reason I would like to retain everything as RDD.
>>>>
>>>>
>>>> Thanks in advance for your help.
>>>>
>>>> Anupam Bagchi
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>
>

Mime
View raw message