spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chester At Work <ches...@alpinenow.com>
Subject Re: Launching Spark app in client mode for standalone cluster
Date Mon, 05 Jan 2015 04:31:23 GMT
Just a guess here, may not be correct.

  Spray needs to start akka actor system; spark context also creates an akka actor system,
is it possible there are some conflict ?



Sent from my iPad

On Jan 4, 2015, at 7:42 PM, Boromir Widas <vcsubsvc@gmail.com> wrote:

> Hello,
> 
> I am trying to launch a Spark app(client mode for standalone cluster) from a Spray server,
using the following code.
> 
> When I run it as 
> 
> $> java -cp <class paths> SprayServer    
> 
> the SimpleApp.getA() call from  SprayService returns -1(which means it sees the logData
RDD as null for HTTP requests), but the statements from within SimpleAppLoader.run() get correct
values from SimpleApp.getA().
> 
> Any idea why the HTTP requests do not see the cached RDD? I have been trying to debug
this for some time but not getting anywhere - any pointers will be greatly appreciated.
> 
> Thanks.
> 
> //////////////////////// BEGIN SPRAY SERVER
> 
> import akka.actor.{ActorSystem, Props}                                              
                                                                                         
                                                                                         
                           
> import akka.io.IO                                                                   
                                                                                         
                                                                                         
                            
> import spray.can.Http                                                               
                                                                                         
                                                                                         
                            
>                                                                                     
                                                                                         
                                                                                         
                           
> import akka.actor._                                                                 
                                                                                         
                                                                                         
                            
> import spray.routing.HttpService                                                    
                                                                                         
                                                                                         
                           
> import scala.concurrent.ops                                                         
                                                                                         
                                                                                         
                            
>                                                                                     
                                                                                         
                                                                                         
                           
> object SprayServer {                                                                
                                                                                         
                                                                                         
                           
>   def main(args: Array[String]) {                                                   
                                                                                         
                                                                                         
                            
>     // we need an ActorSystem to host our service                                   
                                                                                         
                                                                                         
                            
>     implicit val system = ActorSystem()                                             
                                                                                         
                                                                                         
                            
>                                                                                     
                                                                                         
                                                                                         
                           
>     //create our service actor                                                      
                                                                                         
                                                                                         
                           
>     val service = system.actorOf(Props[SprayServiceActor], "test-service")          
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
>     //bind our actor to an HTTP port                                                
                                                                                         
                                                                                         
                           
>     IO(Http) ! Http.Bind(service, interface = "0.0.0.0", port = 8085)               
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
>     ops.spawn {                                                                     
                                                                                         
                                                                                         
                           
>       SimpleAppLoader.run()                                                         
                                                                                         
                                                                                         
                            
>     }                                                                               
                                                                                         
                                                                                         
                           
>   }                                                                                 
                                                                                         
                                                                                         
                           
> }                                                                                   
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
> class SprayServiceActor extends SprayService with Actor {                           
                                                                                         
                                                                                         
                           
>   // the HttpService trait (which SprayService will extend) defines                 
                                                                                         
                                                                                         
                           
>   // only one abstract member, which connects the services environment              
                                                                                         
                                                                                         
                            
>   // to the enclosing actor or test.                                                
                                                                                         
                                                                                         
                           
>   def actorRefFactory = context                                                     
                                                                                         
                                                                                         
                            
>                                                                                     
                                                                                         
                                                                                         
                           
>   def receive = runRoute(rootRoute)                                                 
                                                                                         
                                                                                         
                            
> }                                                                                   
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
> trait SprayService extends HttpService {                                            
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
>   def default = path("") {                                                          
                                                                                         
                                                                                         
                           
>     println("handling default route")                                               
                                                                                         
                                                                                         
                            
>     val numAs = SimpleApp.getA()   // DOES NOT WORK                                 
                                                                                         
                                                                                         
                                           
>     get { complete(s"num A: $numAs") }                                              
                                                                                         
                                                                                         
                           
>   }                                                                                 
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
>   def pingRoute = path("ping") {                                                    
                                                                                         
                                                                                         
                           
>     get { complete("pong!") }                                                       
                                                                                         
                                                                                         
                            
>   }                                                                                 
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
>   def pongRoute = path("pong") {                                                    
                                                                                         
                                                                                         
                           
>     get { complete("pong!?") }                                                      
                                                                                         
                                                                                         
                           
>   }                                                                                 
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
>   def rootRoute = pingRoute ~ pongRoute ~ default                                   
                                                                                         
                                                                                         
                            
> }                                                                                   
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
> ////////// END SPRAY, BEGIN SPARK                                                   
                                                                                         
                                                                                         
                            
>                                                                                     
                                                                                         
                                                                                         
                           
> import org.apache.spark.SparkContext                                                
                                                                                         
                                                                                         
                           
> import org.apache.spark.SparkContext._                                              
                                                                                         
                                                                                         
                           
> import org.apache.spark.SparkConf                                                   
                                                                                         
                                                                                         
                            
> import org.apache.spark.deploy.SparkSubmit                                          
                                                                                         
                                                                                         
                           
> import org.apache.spark.rdd.RDD                                                     
                                                                                         
                                                                                         
                            
>                                                                                     
                                                                                         
                                                                                         
                           
> object SimpleApp {                                                                  
                                                                                         
                                                                                         
                           
>   var resultString: String = "Data not assigned"                                    
                                                                                         
                                                                                         
                           
>   var logData: RDD[String] = null                                                   
                                                                                         
                                                                                         
                            
>   def main(args: Array[String]) {                                                   
                                                                                         
                                                                                         
                            
>     val logFile = "/home/ovik/src/spark/README.md" // Should be some file on your system
                                                                                         
                                                                                         
                        
>     val conf = new SparkConf().setAppName("Simple Application")                     
                                                                                         
                                                                                         
                           
>     val sc = new SparkContext(conf)                                                 
                                                                                         
                                                                                         
                            
>     logData = sc.textFile(logFile, 2).cache()                                       
                                                                                         
                                                                                         
                            
>     val numAs = logData.filter(line => line.contains("a")).count()               
                                                                                         
                                                                                         
                               
>     val numBs = logData.filter(line => line.contains("b")).count()               
                                                                                         
                                                                                         
                               
>     resultString = "Lines with a: %s, Lines with b: %s".format(numAs, numBs)        
                                                                                         
                                                                                         
                            
>     println(resultString)                                                           
                                                                                         
                                                                                         
                            
>   }                                                                                 
                                                                                         
                                                                                         
                           
>   def getA(): Int = {                                                               
                                                                                         
                                                                                         
                            
>     println(resultString)                                                           
                                                                                         
                                                                                         
                            
>     if(null == logData) {                                                           
                                                                                         
                                                                                         
                            
>       println("**** logData is null!")                                              
                                                                                         
                                                                                         
                           
>       -1                                                                            
                                                                                         
                                                                                         
                            
>     } else {                                                                        
                                                                                         
                                                                                         
                            
>       val numAs = logData.filter(line => line.contains("a")).count().toInt       
                                                                                         
                                                                                         
                               
>       println(s"**** numAs: $numAs")                                                
                                                                                         
                                                                                         
                           
>       numAs                                                                         
                                                                                         
                                                                                         
                           
>     }                                                                               
                                                                                         
                                                                                         
                           
>   }                                                                                 
                                                                                         
                                                                                         
                           
> }                                                                                   
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
> object SimpleAppLoader {                                                            
                                                                                         
                                                                                         
                           
>   def main(args: Array[String]) {                                                   
                                                                                         
                                                                                         
                            
>     run()                                                                           
                                                                                         
                                                                                         
                           
>   }                                                                                 
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
>   def run() {                                                                       
                                                                                         
                                                                                         
                            
>                                                                                     
                                                                                         
                                                                                         
                           
>     val clArgs = Array(                                                             
                                                                                         
                                                                                         
                            
>       "--deploy-mode", "client"                                                     
                                                                                         
                                                                                         
                            
>       , "--total-executor-cores", "2"                                               
                                                                                         
                                                                                         
                            
>       , "--class", "SimpleApp"                                                      
                                                                                         
                                                                                         
                           
>       , "--conf", "spark.shuffle.spill=false"                                       
                                                                                         
                                                                                         
                            
>       , "--conf", "spark.master=spark://troika:7077"                                
                                                                                         
                                                                                         
                           
>       , "--conf", "spark.driver.memory=128m"                                        
                                                                                         
                                                                                         
                           
>       , "--conf", "spark.executor.memory=128m"                                      
                                                                                         
                                                                                         
                           
>       , "--conf", "spark.eventLog.enabled=true"                                     
                                                                                         
                                                                                         
                            
>       , "--conf", "spark.eventLog.dir=/home/ovik/logs"                              
                                                                                         
                                                                                         
                            
>       , SparkContext.jarOfClass(this.getClass).get)                                 
                                                                                         
                                                                                         
                            
>                                                                                     
                                                                                         
                                                                                         
                           
>     SparkSubmit.main(clArgs)                                                        
                                                                                         
                                                                                         
                           
>                                                                                     
                                                                                         
                                                                                         
                           
>     val numAs = SimpleApp.getA()    // WORKS                                        
                                                                                         
                                                                                         
                                   
>                                                                                     
                                                                                         
                                                                                         
                           
>     println(s"numAs is $numAs")                                                     
                                                                                         
                                                                                         
                            
>   }                                                                                 
                                                                                         
                                                                                         
                           
> }                                                                                   
                                                                                         
                                                                                         
                           
> 
> 
> 

Mime
View raw message