spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Du Li <l...@yahoo-inc.com.INVALID>
Subject Re: how to use rdd.countApprox
Date Fri, 15 May 2015 18:24:44 GMT
Hi TD,
Just let you know the job group and cancelation worked after I switched to spark 1.3.1. I
set a group id for rdd.countApprox() and cancel it, then set another group id for the remaining
job of the foreachRDD but let it complete. As a by-product, I use group id to indicate what
the job does. :-)
Thanks,Du 


     On Wednesday, May 13, 2015 4:23 PM, Tathagata Das <tdas@databricks.com> wrote:
   

 You might get stage information through SparkListener. But I am not sure whether you can
use that information to easily kill stages. Though i highly recommend using Spark 1.3.1 (or
even Spark master). Things move really fast between releases. 1.1.1 feels really old to me
:P
TD
On Wed, May 13, 2015 at 1:25 PM, Du Li <lidu@yahoo-inc.com> wrote:

I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside dstream.foreachRDD{...}.
After calling cancelJobGroup(), the spark context seems no longer valid, which crashes subsequent
jobs.
My spark version is 1.1.1. I will do more investigation into this issue, perhaps after upgrading
to 1.3.1, and then file a JIRA if it persists.
Is there a way to get stage or task id of a particular transformation or action on RDD and
then selectively kill the stage or tasks? It would be necessary and useful in situations similar
to countApprox.
Thanks,Du 


     On Wednesday, May 13, 2015 1:12 PM, Tathagata Das <tdas@databricks.com> wrote:
   

 That is not supposed to happen :/ That is probably a bug.If you have the log4j logs, would
be good to file a JIRA. This may be worth debugging.
On Wed, May 13, 2015 at 12:13 PM, Du Li <lidu@yahoo-inc.com> wrote:

Actually I tried that before asking. However, it killed the spark context. :-)
Du 


     On Wednesday, May 13, 2015 12:02 PM, Tathagata Das <tdas@databricks.com> wrote:
   

 That is a good question. I dont see a direct way to do that. 
You could do try the following 
val jobGroupId = <group-id-based-on-current-time>rdd.sparkContext.setJobGroup(jobGroupId)val
approxCount = rdd.countApprox().getInitialValue   // job launched with the set job grouprdd.sparkContext.cancelJobGroup(jobGroupId)
          // cancel the job


On Wed, May 13, 2015 at 11:24 AM, Du Li <lidu@yahoo-inc.com> wrote:

Hi TD,
Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it
keeps running until completion, producing results not used but consuming resources.
Thanks,Du 


     On Wednesday, May 13, 2015 10:33 AM, Du Li <lidu@yahoo-inc.com.INVALID> wrote:
   

  Hi TD,
Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing
a much better chance to complete processing each batch within the batch interval.
Du


     On Tuesday, May 12, 2015 10:31 PM, Tathagata Das <tdas@databricks.com> wrote:
   

 From the code it seems that as soon as the " rdd.countApprox(5000)" returns, you can call
"pResult.initialValue()" to get the approximate count at that point of time (that is after
timeout). Calling "pResult.getFinalValue()" will further block until the job is over, and
give the final correct values that you would have received by "rdd.count()"
On Tue, May 12, 2015 at 5:03 PM, Du Li <lidu@yahoo-inc.com.invalid> wrote:

HI,
I tested the following in my streaming app and hoped to get an approximate count within 5
seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes
completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean,
and high were the same.
val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(s"countApprox().getFinalValue():
low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}")
Can any expert here help explain the right way of usage?
Thanks,Du


 



     On Wednesday, May 6, 2015 7:55 AM, Du Li <lidu@yahoo-inc.com.INVALID> wrote:
   

 I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive.
Did anybody have experience using countApprox()? How accurate/reliable is it? 
The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can
I retrieve the count value by calling getFinalValue()? Does it block and return only after
the timeout? Or do I need to define onComplete/onFail handlers to extract count value from
the partial results?
Thanks,Du

   



   

   



   



   



  
Mime
View raw message