spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Spark streaming: missing classes when kafka consumer classes
Date Thu, 11 Dec 2014 16:31:41 GMT
Last time i did an sbt assembly and this is how i added the dependencies.


libraryDependencies ++= Seq(
  ("org.apache.spark" % "spark-streaming_2.10" % "1.1.0" % "provided").
    exclude("org.eclipse.jetty.orbit", "javax.transaction").
    exclude("org.eclipse.jetty.orbit", "javax.mail").
    exclude("org.eclipse.jetty.orbit", "javax.activation").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-logging", "commons-logging").
    exclude("commons-collections", "commons-collections").
    exclude("org.eclipse.jetty.orbit", "javax.servlet")
)

libraryDependencies ++= Seq(
  ("org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0").
    exclude("org.eclipse.jetty.orbit", "javax.transaction").
    exclude("org.eclipse.jetty.orbit", "javax.mail").
    exclude("org.eclipse.jetty.orbit", "javax.activation").
    exclude("com.esotericsoftware.minlog", "minlog").
    exclude("commons-beanutils", "commons-beanutils-core").
    exclude("commons-logging", "commons-logging").
    exclude("commons-collections", "commons-collections").
    exclude("org.eclipse.jetty.orbit", "javax.servlet")
)


Those excluded were causing conflicts.

Thanks
Best Regards

On Thu, Dec 11, 2014 at 8:02 PM, Mario Pastorelli <
mario.pastorelli@teralytics.ch> wrote:

>  Thanks akhil for the answer.
>
> I am using sbt assembly and the build.sbt is in the first email. Do you
> know why those classes are included in that way?
>
>
> Thanks,
> Mario
>
>
> On 11.12.2014 14:51, Akhil Das wrote:
>
>  Yes. You can do/use *sbt assembly* and create a big fat jar with all
> dependencies bundled inside it.
>
>  Thanks
> Best Regards
>
> On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli <
> mario.pastorelli@teralytics.ch> wrote:
>
>>  In this way it works but it's not portable and the idea of having a fat
>> jar is to avoid exactly this. Is there any system to create a
>> self-contained portable fatJar?
>>
>>
>> On 11.12.2014 13:57, Akhil Das wrote:
>>
>>  Add these jars while creating the Context.
>>
>>         val sc = new SparkContext(conf)
>>
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/
>> *spark-streaming-kafka_2.10-1.1.0.jar*")
>>         sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/
>> *zkclient-0.3.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/
>> *metrics-core-2.2.0.jar*")
>>
>> sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/
>> *kafka_2.10-0.8.0.jar*")
>>
>>         val ssc = new StreamingContext(sc, Seconds(10))
>>
>>
>>  Thanks
>> Best Regards
>>
>> On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli <
>> mario.pastorelli@teralytics.ch> wrote:
>>
>>>  Hi,
>>>
>>> I'm trying to use spark-streaming with kafka but I get a strange error
>>> on class that are missing. I would like to ask if my way to build the fat
>>> jar is correct or no. My program is
>>>
>>> val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum,
>>> kafkaGroupId, kafkaTopicsWithThreads)
>>>                             .map(_._2)
>>>
>>> kafkaStream.foreachRDD((rdd,t) => rdd.foreachPartition {
>>> iter:Iterator[CellWithLAC] =>
>>>   println("time: " ++ t.toString ++ " #received: " ++
>>> iter.size.toString)
>>> })
>>>
>>> I use sbt to manage my project and my build.sbt (with assembly 0.12.0
>>> plugin) is
>>>
>>> name := "spark_example"
>>>
>>> version := "0.0.1"
>>>
>>> scalaVersion := "2.10.4"
>>>
>>> scalacOptions ++= Seq("-deprecation","-feature")
>>>
>>> libraryDependencies ++= Seq(
>>>   "org.apache.spark" % "spark-streaming_2.10" % "1.1.1",
>>>   "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.1",
>>>   "joda-time" % "joda-time" % "2.6"
>>> )
>>>
>>> assemblyMergeStrategy in assembly := {
>>>   case p if p startsWith "com/esotericsoftware/minlog" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/commons/beanutils" =>
>>> MergeStrategy.first
>>>   case p if p startsWith "org/apache/" => MergeStrategy.last
>>>   case "plugin.properties" => MergeStrategy.discard
>>>   case p if p startsWith "META-INF" => MergeStrategy.discard
>>>   case x =>
>>>     val oldStrategy = (assemblyMergeStrategy in assembly).value
>>>     oldStrategy(x)
>>> }
>>>
>>> I create the jar with sbt assembly and the run with
>>> $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main
>>> target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181
>>> test-consumer-group test1
>>>
>>> where master:7077 is the spark master, localhost:2181 is zookeeper,
>>> test-consumer-group is kafka groupid and test1 is the kafka topic. The
>>> program starts and keep running but I get an error and nothing is printed.
>>> In the log I found the following stack trace:
>>>
>>> 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection
>>> from [10.0.3.1/10.0.3.1:54325]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Initiating connection
>>> to [jpl-devvax/127.0.1.1:38767]
>>> 14/12/11 13:02:08 INFO network.SendingConnection: Connected to
>>> [jpl-devvax/127.0.1.1:38767], 1 messages pending
>>> 14/12/11 13:02:08 INFO storage.BlockManagerInfo: Added
>>> broadcast_2_piece0 in memory on jpl-devvax:38767 (size: 842.0 B, free:
>>> 265.4 MB)
>>> 14/12/11 13:02:08 INFO scheduler.ReceiverTracker: Registered receiver
>>> for stream 0 from akka.tcp://sparkExecutor@jpl-devvax:46602
>>> 14/12/11 13:02:08 ERROR scheduler.ReceiverTracker: Deregistered receiver
>>> for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>> kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues$1
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(Unknown
>>> Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown
>>> Source)
>>>     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown
>>> Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown
>>> Source)
>>>     at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
>>>     at
>>> kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown
>>> Source)
>>>     at
>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:114)
>>>     at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>>     at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>>     at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
>>>     at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>     at
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
>>>     at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:54)
>>>     at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> I have searched inside the fat jar and I found that that class is not in
>>> it:
>>>
>>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
>>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>> >
>>>
>>> The problem is the double dollar before anonfun: if you put only one
>>> then the class is there:
>>>
>>> > jar -tf target/scala-2.10/rtstat_in_spark-assembly-0.0.1.jar  | grep
>>> "kafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener$anonfun$kafka$consumer$ZookeeperConsumerConnector"
>>> [...]
>>> kafka/consumer/ZookeeperConsumerConnector.class
>>> >
>>>
>>> I'm submitting my job to spark-1.1.1 compiled with hadoop2.4 downloaded
>>> from the spark website.
>>>
>>> My question is: how can I solve this problem? I guess the problem is my
>>> sbt script but I don't understand why.
>>>
>>>
>>> Thanks,
>>> Mario Pastorelli
>>>
>>>
>>
>>
>
>

Mime
View raw message