spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Handling worker batch processing during driver shutdown
Date Fri, 13 Mar 2015 02:52:44 GMT
What version of Spark are you using. You may be hitting a known but solved
bug where the receivers would not get stop signal and (stopGracefully =
true) would wait for a while for the receivers to stop indefinitely. Try
setting stopGracefully to false and see if it works.
This bug should have been solved in spark 1.2.1

https://issues.apache.org/jira/browse/SPARK-5035

TD

On Thu, Mar 12, 2015 at 7:48 PM, Jose Fernandez <jfernandez@sdl.com> wrote:

>  Thanks for the reply!
>
>
>
> Theoretically I should be able to do as you suggest as I follow the pool
> design pattern from the documentation, but I don’t seem to be able to run
> any code after .stop() is called.
>
>
>
>   override def main(args: Array[String]) {
>
>     // setup
>
>     val ssc = new StreamingContext(sparkConf, Seconds(streamTime))
>
>     val inputStreams = (1 to numReceivers).map(i =>
> ssc.receiverStream(<custom receiver>))
>
>     val messages = ssc.union(inputStreams)
>
>
>
>     messages.foreachRDD { rdd =>
>
>       rdd.foreachPartition { p =>
>
>         val indexer = Indexer.getInstance()
>
>
>
>         p.foreach(Indexer.process(_) match {
>
>           case Some(entry) => indexer.index(entry)
>
>           case None =>
>
>         })
>
>
>
>         Indexer.returnInstance(indexer)
>
>       }
>
>     }
>
>
>
>     messages.print()
>
>
>
>     sys.ShutdownHookThread {
>
>       logInfo("****************** Shutdown hook triggered
> ******************")
>
>       ssc.stop(false, true)
>
>       logInfo("****************** Shutdown finished ******************")
>
>       ssc.stop(true)
>
>     }
>
>
>
>     ssc.start()
>
>     ssc.awaitTermination()
>
>   }
>
>
>
> The first shutdown log message is always displayed, but the second message
> never does. I’ve tried multiple permutations of the stop function calls and
> even used try/catch around it. I’m running in yarn-cluster mode using Spark
> 1.2 on CDH 5.3. I stop the application with yarn application -kill <appID>.
>
>
>
>
>
> *From:* Tathagata Das [mailto:tdas@databricks.com]
> *Sent:* Thursday, March 12, 2015 1:29 PM
> *To:* Jose Fernandez
> *Cc:* user@spark.apache.org
> *Subject:* Re: Handling worker batch processing during driver shutdown
>
>
>
> Can you access the batcher directly? Like is there is there a handle to
> get access to the batchers on the executors by running a task on that
> executor? If so, after the streamingContext has been stopped (not the
> SparkContext), then you can use `sc.makeRDD()` to run a dummy task like
> this.
>
>
>
> sc.makeRDD(1 to 1000, 1000).foreach { x =>
>
>    Batcher.get().flush()
>
> }
>
>
>
> With large number of tasks and no other jobs running in the system, at
> least one task will run in each executor and therefore will flush the
> batcher.
>
>
>
> TD
>
>
>
> On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez <jfernandez@sdl.com>
> wrote:
>
> Hi folks,
>
>
>
> I have a shutdown hook in my driver which stops the streaming context
> cleanly. This is great as workers can finish their current processing unit
> before shutting down. Unfortunately each worker contains a batch processor
> which only flushes every X entries. We’re indexing to different indices in
> elasticsearch and using the bulk index request for performance. As far as
> Spark is concerned, once data is added to the batcher it is considered
> processed, so our workers are being shut down with data still in the
> batcher.
>
>
>
> Is there any way to coordinate the shutdown with the workers? I haven’t
> had any luck searching for a solution online. I would appreciate any
> suggestions you may have.
>
>
>
> Thanks :)
>
>
>
>
>   <http://www.sdl.com/innovate/sanfran>
>
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
>
>
>  This message has been scanned for malware by Websense. www.websense.com
>
>
>
>
>
> Click here
> <https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q==>
> to report this email as spam.
>
>   <http://www.sdl.com/innovate/sanfran>
>
>   SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>

Mime
View raw message