spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From godraude <eduga...@gmail.com>
Subject parsing json in spark streaming
Date Wed, 03 Sep 2014 11:52:27 GMT
Hello everyone. I'm trying to receive a DStream structured as a json from a
kafka topic and I want to parse the content of each json. The json I receive
is something like this:

{"type":"position","ident":"IBE32JZ","air_ground":"A","alt":"34000","clock":"1409733420","id":"IBE32JZ-1409715361-ed-0002:0","gs":"446","heading":"71","lat":"44.50987","lon":"2.98972","reg":"ECJRE","squawk":"1004","updateType":"A","altChange":"
"}

I'm trying to extract the ident field only, at least for now. My program
looks like this:

object ScalaExample {
	val kafkaHost = "localhost"
	val kafkaPort = 9092
	val zookeeperHost = "localhost"
	val zookeeperPort = 2181

	implicit val formats = DefaultFormats
	case class PlaneInfo(ident: String)


	def parser(json: String): String = {

		val parsedJson = parse(json)
		val m = paso1.extract[PlaneInfo]
 		return m.ident
	}

	def main(args : Array[String]) {
			 val zkQuorum = "localhost:2181"
			 val group = "myGroup"
			 val topic = Map("flightStatus" -> 1)
			 val sparkContext = new SparkContext("local[4]", "KafkaConsumer")
			val ssc = new StreamingContext(sparkContext, Seconds(10))


			val json = KafkaUtils.createStream(ssc, zkQuorum, group, topic)

			val id = json.map(_._2).map(parser)

			id.print

			ssc.start()

  }
}

but it throws me the exception below:

java.lang.NoClassDefFoundError: scala/reflect/ClassManifest
	at net.liftweb.json.JsonAST$JValue.extract(JsonAST.scala:300)
	at aero.catec.stratio.ScalaExample$.parser(ScalaExample.scala:33)
	at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
	at aero.catec.stratio.ScalaExample$$anonfun$2.apply(ScalaExample.scala:48)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
	at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
	at scala.collection.AbstractIterator.to(Iterator.scala:1157)
	at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
	at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
	at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
	at org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
	at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
	at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
	at
org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:575)
	at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:560)
Caused by: java.lang.ClassNotFoundException: scala.reflect.ClassManifest
	at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)


The thing is that if a run the same without using spark (reading from a
file) it works perfectly. The problem starts when I try to put it in a spark
program. Also, if I change the parser function to something like this:

def parser(json: String): JValue = {

		val parsedJson = parse(json)
		return (parsedJson \\ "ident")
	}

It also works. But when I try the get the actual String, I get the same
error.

Thank you for your help. I hope I had explained it well.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parsing-json-in-spark-streaming-tp13352.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message