spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mario Pastorelli <mario.pastore...@teralytics.ch>
Subject Re: Spark streaming: missing classes when kafka consumer classes
Date Fri, 12 Dec 2014 14:56:57 GMT
Hi,

I asked on SO and got an answer about this 
http://stackoverflow.com/questions/27444512/missing-classes-from-the-assembly-file-created-by-sbt-assembly

. Adding fullClasspath in assembly := (fullClasspath in Compile).value
  at the end of my builld.sbt solved the problem, apparently.

Best,
Mario

On 11.12.2014 20:04, Flávio Santos wrote:
> Hi Mario,
>
> Try to include this to your libraryDependencies (in your sbt file):
>
>   "org.apache.kafka" % "kafka_2.10" % "0.8.0"
>     exclude("javax.jms", "jms")
>     exclude("com.sun.jdmk", "jmxtools")
>     exclude("com.sun.jmx", "jmxri")
>     exclude("org.slf4j", "slf4j-simple")
>
> Regards,
>
> *--
> Flávio R. Santos*
>
> Chaordic | /Platform/
> _www.chaordic.com.br <http://www.chaordic.com.br/>_
> +55 48 3232.3200
>
> On Thu, Dec 11, 2014 at 12:32 PM, Mario Pastorelli 
> <mario.pastorelli@teralytics.ch 
> <mailto:mario.pastorelli@teralytics.ch>> wrote:
>
>     Thanks akhil for the answer.
>
>     I am using sbt assembly and the build.sbt is in the first email.
>     Do you know why those classes are included in that way?
>
>
>     Thanks,
>     Mario
>
>
>     On 11.12.2014 14:51, Akhil Das wrote:
>>     Yes. You can do/use *sbt assembly* and create a big fat jar with
>>     all dependencies bundled inside it.
>>
>>     Thanks
>>     Best Regards
>>
>>     On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli
>>     <mario.pastorelli@teralytics.ch
>>     <mailto:mario.pastorelli@teralytics.ch>> wrote:
>>
>>         In this way it works but it's not portable and the idea of
>>         having a fat jar is to avoid exactly this. Is there any
>>         system to create a self-contained portable fatJar?
>>
>>
>>         On 11.12.2014 13:57, Akhil Das wrote:
>>>         Add these jars while creating the Context.
>>>
>>>                val sc = new SparkContext(conf)
>>>
>>>         sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/*spark-streaming-kafka_2.10-1.1.0.jar*")
>>>         sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/*zkclient-0.3.jar*")
>>>         sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/*metrics-core-2.2.0.jar*")
>>>         sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/*kafka_2.10-0.8.0.jar*")
>>>                 val ssc = new StreamingContext(sc, Seconds(10))
>>>
>>>
>>>         Thanks
>>>         Best Regards
>>>
>>>         On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli
>>>         <mario.pastorelli@teralytics.ch
>>>         <mailto:mario.pastorelli@teralytics.ch>> wrote:
>>>
>>>             Hi,
>>>
>>>             I'm trying to use spark-streaming with kafka but I get a
>>>             strange error on class that are missing. I would like to
>>>             ask if my way to build the fat jar is correct or no. My
>>>             program is
>>>
>>>             val kafkaStream = KafkaUtils.createStream(ssc,
>>>             zookeeperQuorum, kafkaGroupId, kafkaTopicsWithThreads)
>>>             .map(_._2)
>>>
>>>             kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>>>             iter:Iterator[CellWithLAC] =>
>>>               println("time: " ++ t.toString ++ " #received: " ++
>>>             iter.size.toString)
>>>             })
>>>
>>>             I use sbt to manage my project and my build.sbt (with
>>>             assembly 0.12.0 plugin) is
>>>
>>>             name := "spark_example"
>>>
>>>             version := "0.0.1"
>>>
>>>             scalaVersion := "2.10.4"
>>>
>>>             scalacOptions ++= Seq("-deprecation","-feature")
>>>
>>>             libraryDependencies ++= Seq(
>>>               "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>>               "org.apache.spark" % "spark-streaming-kafka_2.10" %
>>>             "1.1.1",
>>>               "joda-time" % "joda-time" % "2.6"
>>>             )
>>>
>>>             assemblyMergeStrategy in assembly := {
>>>               case p if p startsWith "com/esotericsoftware/minlog"
>>>             => MergeStrategy.first
>>>               case p if p startsWith "org/apache/commons/beanutils"
>>>             => MergeStrategy.first
>>>               case p if p startsWith "org/apache/" =>
>>>             MergeStrategy.last
>>>               case "plugin.properties" => MergeStrategy.discard
>>>               case p if p startsWith "META-INF" =>
>>>             MergeStrategy.discard
>>>               case x =>
>>>                 val oldStrategy = (assemblyMergeStrategy in
>>>             assembly).value
>>>                 oldStrategy(x)
>>>             }
>>>
>>>             I create the jar with sbt assembly and the run with
>>>             $SPARK_HOME/bin/spark-submit --master
>>>             spark://master:7077 --class Main
>>>             target/scala-2.10/spark_example-assembly-0.0.1.jar
>>>             localhost:2181 test-consumer-group test1
>>>
>>>             where master:7077 is the spark master, localhost:2181 is
>>>             zookeeper, test-consumer-group is kafka groupid and
>>>             test1 is the kafka topic. The program starts and keep
>>>             running but I get an error and nothing is printed. In
>>>             the log I found the following stack trace:
>>>
>>>             14/12/11 13:02:08 INFO network.ConnectionManager:
>>>             Accepted connection from [10.0.3.1/10.0.3.1:54325
>>>             <http://10.0.3.1/10.0.3.1:54325>]
>>>             14/12/11 13:02:08 INFO network.SendingConnection:
>>>             Initiating connection to [jpl-devvax/127.0.1.1:38767
>>>             <http://127.0.1.1:38767>]
>>>             14/12/11 13:02:08 INFO network.SendingConnection:
>>>             Connected to [jpl-devvax/127.0.1.1:38767
>>>             <http://127.0.1.1:38767>], 1 messages pending
>>>             14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
>>>             broadcast_2_piece0 in memory on jpl-devvax:38767 (size:
>>>             842.0 B, free: 265.4 MB)
>>>             14/12/11 13:02:08 INFO scheduler.ReceiverTracker:
>>>             Registered receiver for stream 0 from
>>>             akka.tcp://sparkExecutor@jpl-devvax:46602
>>>             14/12/11 13:02:08 ERROR scheduler.ReceiverTracker:
>>>             Deregistered receiver for stream 0: Error starting
>>>             receiver 0 - java.lang.NoClassDefFoundError:
>>>             kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>>>             Source)
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>>>             Source)
>>>                 at
>>>             scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>>>
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>>>             Source)
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>>>             Source)
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>>>
>>>                 at
>>>             kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>>>             Source)
>>>                 at
>>>             org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>>>                 at
>>>             org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>                 at
>>>             org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>                 at
>>>             org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>                 at
>>>             org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>                 at
>>>             org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>                 at
>>>             org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>                 at
>>>             org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>
>>>                 at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>                 at
>>>             org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>                 at java.lang.Thread.run(Thread.java:745)
>>>
>>>             I have searched inside the fat jar and I found that that
>>>             class is not in it:
>>>
>>>             > jar -tf
>>>             target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar |
>>>             grep
>>>             "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>>             >
>>>
>>>             The problem is the double dollar before anonfun: if you
>>>             put only one then the class is there:
>>>
>>>             > jar -tf
>>>             target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar |
>>>             grep
>>>             "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>>             [...]
>>>             kafka/consumer/ZookeeperConsumerConnector.class
>>>             >
>>>
>>>             I'm submitting my job to spark-1.1.1 compiled with
>>>             hadoop2.4 downloaded from the spark website.
>>>
>>>             My question is: how can I solve this problem? I guess
>>>             the problem is my sbt script but I don't understand why.
>>>
>>>
>>>             Thanks,
>>>             Mario Pastorelli
>>>
>>>
>>
>>
>
>


Mime
View raw message