Hi Team,

Could you please respond on my below request.

Regards,
Rajesh



On Thu, Sep 25, 2014 at 11:38 PM, Madabhattula Rajesh Kumar <mrajaforu@gmail.com> wrote:
Hi Team,

Can I use Actors in Spark Streaming based on events type? Could you please review below Test program and let me know if any thing I need to change with respect to best practices

import akka.actor.Actor
import akka.actor.{ActorRef, Props}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import akka.actor.ActorSystem

case class one(r: org.apache.spark.rdd.RDD[String])
case class two(s: org.apache.spark.rdd.RDD[String])

class Events extends Actor
{
  def receive = {
    // Based on event type - Invoke respective methods asynchronously 
    case one(r) => println("ONE COUNT" + r.count) // Invoke respective functions
    case two(s) => println("TWO COUNT" + s.count) // Invoke respective functions
  }
}

object Test {
 
    def main(args: Array[String]) {
    val system = ActorSystem("System")
    val event: ActorRef = system.actorOf(Props[Events], "events") 
    val sparkConf = new SparkConf() setAppName("AlertsLinesCount") setMaster("local") 
    val ssc = new StreamingContext(sparkConf, Seconds(30))
    val lines = ssc textFileStream("hdfs://localhost:9000/user/rajesh/EventsDirectory/")
    lines foreachRDD(x => {
      event ! one(x)
      event ! two(x)     
    })
    ssc.start
    ssc.awaitTermination
    }
}

Regards,
Rajesh