spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: kafka + mysql filtering problem
Date Mon, 29 Feb 2016 15:32:14 GMT
You're getting confused about what code is running on the driver vs what
code is running on the executor.  Read

http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka



On Mon, Feb 29, 2016 at 8:00 AM, franco barrientos <
franco.barrientos@exalitica.com> wrote:

> Hi all,
>
> I want to read some filtering rules from mysql (jdbc mysql driver)
> specifically its a char type containing a field and value to process in a
> kafka streaming input.
>
> The main idea is to process this from a web UI (livy server).
>
> Any suggestion or guidelines?
>
> e.g., I have this:
>
> *object Streaming {*
> *  def main(args: Array[String]) {*
> *    if (args.length < 4) {*
> *      System.err.println("Usage: KafkaWordCount <zkQuorum> <group>
> <topics> <numThreads>")*
> *      System.exit(1)*
> *    }*
> * val Array(zkQuorum, group, topics, numThreads) = args*
> * var spc = SparkContext.getOrCreate()*
> * val ssc = new StreamingContext(spc, Seconds(3))*
> *        val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> Map(topics -> 5)).map(_._2)*
> * /* TEST MYSQL */*
> * val sqlContext = new SQLContext(spc)*
> * val prop = new java.util.Properties*
> * val url = "jdbc:mysql://52.22.38.81:3306/tmp
> <http://52.22.38.81:3306/tmp>"*
> * val tbl_users = "santander_demo_users"*
> * val tbl_rules = "santander_demo_filters"*
> * val tbl_campaigns = "santander_demo_campaigns"*
> * prop.setProperty("user", "root")*
> * prop.setProperty("password", "Exalitica2014")*
> * val users = sqlContext.read.jdbc(url, tbl_users, prop)*
> * val rules = sqlContext.read.jdbc(url, tbl_rules, prop)*
> * val campaigns = sqlContext.read.jdbc(url, tbl_campaigns, prop)*
> * val toolbox = currentMirror.mkToolBox()*
> * val toRemove = "\"”.toSet*
> *        var mto = “0"*
>
> * def rule_apply (n:Int, t:String, rules:DataFrame) : String = {*
> *         // reading rules from mysql*
> *  var r = (rules.filter(rules("CID") ===
> n).select("FILTER_DSC").first())(0).toString()*
>
> *          // using mkToolbox for pre-processing rules*
> *   return toolbox.eval(toolbox.parse("""*
> *     val mto = """ + t + """*
> *     if(""" + r + """) {*
> *       return “true"*
> *     } else {*
> *       return “false"*
> *        }*
> *   """)).toString()*
> * }*
> * /* TEST MYSQL */*
>
> * lines.map{x =>*
> *  if(x.split(",").length > 1) {*
> *            // reading from kafka input*
> *    mto = spc.broadcast(x.split(",")(5).filterNot(toRemove))*
> *  }*
> * }*
> * var msg = rule_apply(1, mto, rules)*
> * var word = lines.map(x => msg)*
> * word.print()*
> *    ssc.start()*
> *    ssc.awaitTermination()*
> *  }*
> *}*
>
> The problem is that *mto* variable always returns to “0” value after
> mapping lines DStream. I tried to process *rule_apply *into map but I get
> not serializable mkToolbox class error.
>
> Thanks in advance.
>
> *Franco Barrientos*
> Data Scientist
>
> Málaga #115, Of. 1003, Las Condes.
> Santiago, Chile.
> (+562)-29699649
> (+569)-76347893
>
> franco.barrientos@exalitica.com
>
> www.exalitica.com
>
> [image: http://exalitica.com/web/img/frim.png]
>

Mime
View raw message