spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From franco barrientos <franco.barrien...@exalitica.com>
Subject kafka + mysql filtering problem
Date Mon, 29 Feb 2016 14:00:23 GMT
Hi all,

I want to read some filtering rules from mysql (jdbc mysql driver) specifically its a char
type containing a field and value to process in a kafka streaming input.

The main idea is to process this from a web UI (livy server).

Any suggestion or guidelines?

e.g., I have this:

object Streaming {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics>
<numThreads>")
      System.exit(1)
    }
val Array(zkQuorum, group, topics, numThreads) = args
var spc = SparkContext.getOrCreate()
val ssc = new StreamingContext(spc, Seconds(3))
        val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topics -> 5)).map(_._2)
/* TEST MYSQL */
val sqlContext = new SQLContext(spc)
val prop = new java.util.Properties
val url = "jdbc:mysql://52.22.38.81:3306/tmp"
val tbl_users = "santander_demo_users"
val tbl_rules = "santander_demo_filters"
val tbl_campaigns = "santander_demo_campaigns"
prop.setProperty("user", "root")
prop.setProperty("password", "Exalitica2014")
val users = sqlContext.read.jdbc(url, tbl_users, prop)
val rules = sqlContext.read.jdbc(url, tbl_rules, prop)
val campaigns = sqlContext.read.jdbc(url, tbl_campaigns, prop)
val toolbox = currentMirror.mkToolBox()
val toRemove = "\"”.toSet
        var mto = “0"

def rule_apply (n:Int, t:String, rules:DataFrame) : String = {
         // reading rules from mysql
  var r = (rules.filter(rules("CID") === n).select("FILTER_DSC").first())(0).toString()
  
          // using mkToolbox for pre-processing rules
    return toolbox.eval(toolbox.parse("""
      val mto = """ + t + """
      if(""" + r + """) {
        return “true"
      } else {
        return “false"
        }
    """)).toString()
}
/* TEST MYSQL */
    
lines.map{x =>
  if(x.split(",").length > 1) {
            // reading from kafka input
    mto = spc.broadcast(x.split(",")(5).filterNot(toRemove))
  }
}
var msg = rule_apply(1, mto, rules)
var word = lines.map(x => msg)
word.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

The problem is that mto variable always returns to “0” value after mapping lines DStream.
I tried to process rule_apply into map but I get not serializable mkToolbox class error.

Thanks in advance.

Franco Barrientos
Data Scientist

Málaga #115, Of. 1003, Las Condes.
Santiago, Chile.
(+562)-29699649
(+569)-76347893

franco.barrientos@exalitica.com 

www.exalitica.com



Mime
View raw message