spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gino Bustelo <lbust...@gmail.com>
Subject Re: spark streaming, kafka, SPARK_CLASSPATH
Date Tue, 17 Jun 2014 18:35:34 GMT
Luis' experience validates what I'm seeing. You have to still set the properties in the SparkConf
for the context to work. For example, master URL and jars are specified again in the app.


Gino B.

> On Jun 17, 2014, at 12:05 PM, Luis Ángel Vicente Sánchez <langel.groups@gmail.com>
wrote:
> 
> I have been able to submit a job successfully but I had to config my spark job this way:
> 
>   val sparkConf: SparkConf =
>     new SparkConf()
>       .setAppName("TwitterPopularTags")
>       .setMaster("spark://int-spark-master:7077")
>       .setSparkHome("/opt/spark")
>       .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))
> 
> Now I'm getting this error on my worker:
> 
> 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any resources;
check your cluster UI to ensure that workers are registered and have sufficient memory
> 
> 
> 
> 2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez <langel.groups@gmail.com>:
>> After playing a bit, I have been able to create a fatjar this way:
>> 
>> lazy val rootDependencies = Seq(
>>   "org.apache.spark" %% "spark-core"              % "1.0.0" % "provided",
>>   "org.apache.spark" %% "spark-streaming"         % "1.0.0" % "provided",
>>   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0" exclude("org.apache.spark",
"spark-core_2.10") exclude("org.apache.spark", "spark-streaming_2.10")
>> )
>> 
>> Excluding those transitive dependencies, we can create a fatjar ~400Kb instead of
40Mb.
>> 
>> My problem is not to run the streaming job locally but trying to submit it to standalone
cluster using spark-submit, everytime I ran the following command, my workers died:
>> 
>> ~/development/tools/spark/1.0.0/bin/spark-submit \
>> --class "org.apache.spark.examples.streaming.TwitterPopularTags" \
>> --master "spark://int-spark-master:7077" \
>> --deploy-mode "cluster" \
>> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>> 
>> I have copied my fatjar to my master /tmp folder.
>> 
>> 
>> 2014-06-17 10:30 GMT+01:00 Michael Cutler <michael@tumra.com>:
>> 
>>> Admittedly getting Spark Streaming / Kafka working for the first time can be
a bit tricky with the web of dependencies that get pulled in.  I've taken the KafkaWorkCount
example from the Spark project and set up a simple standalone SBT project that shows you how
to get it working and using spark-submit.
>>> 
>>> https://github.com/cotdp/spark-example-kafka
>>> 
>>> The key trick is in the use of sbt-assembly instead of relying on any of the
"add jars" functionality.  You mark "spark-core" and "spark-streaming" as provided, because
they are part of the core spark-assembly already running your cluster.  However "spark-streaming-kafka"
is not, so you need to package it in your 'fat JAR' while excluding all the mess that causes
the build to break.
>>> 
>>> build.sbt:
>>> 
>>> import AssemblyKeys._
>>> 
>>> 
>>> assemblySettings
>>> 
>>> 
>>> name := "spark-example-kafka"
>>> 
>>> 
>>> version := "1.0"
>>> 
>>> 
>>> scalaVersion := "2.10.4"
>>> 
>>> 
>>> 
>>> 
>>> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>>> 
>>> 
>>> 
>>> 
>>> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>>> 
>>> 
>>> 
>>> 
>>> libraryDependencies ++= Seq(
>>> 
>>> 
>>> 
>>>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>>> 
>>> 
>>> 
>>>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>>> 
>>> 
>>> 
>>>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>>> 
>>> 
>>> 
>>>     exclude("commons-beanutils", "commons-beanutils").
>>> 
>>> 
>>> 
>>>     exclude("commons-collections", "commons-collections").
>>> 
>>> 
>>> 
>>>     exclude("com.esotericsoftware.minlog", "minlog")
>>> 
>>> 
>>> 
>>> )
>>> 
>>> 
>>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>> 
>>> 
>>> 
>>>   {
>>>     case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x if x.startsWith("plugin.properties") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x => old(x)
>>> 
>>> 
>>> 
>>>   }
>>> }
>>> 
>>> 
>>> You can see the "exclude()" has to go around the spark-streaming-kafka dependency,
and I've used a MergeStrategy to solve the "deduplicate: different file contents found in
the following" errors.
>>> 
>>> Build the JAR with sbt assembly and use the scripts in bin/ to run the examples.
>>> 
>>> I'm using this same approach to run my Spark Streaming jobs with spark-submit
and have them managed using Mesos/Marathon to handle failures and restarts with long running
processes.
>>> 
>>> Good luck!
>>> 
>>> MC
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Michael Cutler
>>> Founder, CTO
>>> 
>>> 	
>>> Mobile: +44 789 990 7847
>>> Email:   michael@tumra.com
>>> Web:     tumra.com
>>> Visit us at our offices in Chiswick Park
>>> Registered in England & Wales, 07916412. VAT No. 130595328
>>> 
>>> 
>>> This email and any files transmitted with it are confidential and may also be
privileged. It is intended only for the person to whom it is addressed. If you have received
this email in error, please inform the sender immediately. If you are not the intended recipient
you must not use, disclose, copy, print, distribute or rely on this email.
>>> 
>>> 
>>>> On 17 June 2014 02:51, Gino Bustelo <lbustelo@gmail.com> wrote:
>>>> +1 for this issue. Documentation for spark-submit are misleading. Among many
issues, the jar support is bad. HTTP urls do not work. This is because spark is using hadoop's
FileSystem class. You have to specify the jars twice to get things to work. Once for the DriverWrapper
to laid your classes and a 2nd time in the Context to distribute to workers. 
>>>> 
>>>> I would like to see some contrib response to this issue. 
>>>> 
>>>> Gino B.
>>>> 
>>>>> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez <langel.groups@gmail.com>
wrote:
>>>>> 
>>>>> Did you manage to make it work? I'm facing similar problems and this
a serious blocker issue. spark-submit seems kind of broken to me if you can use it for spark-streaming.
>>>>> 
>>>>> Regards,
>>>>> 
>>>>> Luis
>>>>> 
>>>>> 
>>>>> 2014-06-11 1:48 GMT+01:00 lannyripple <lanny.ripple@gmail.com>:
>>>>>> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>>>>>> 
>>>>>> I have a toy spark-streaming-kafka program.  It reads from a kafka
queue and
>>>>>> does
>>>>>> 
>>>>>>     stream
>>>>>>       .map {case (k, v) => (v, 1)}
>>>>>>       .reduceByKey(_ + _)
>>>>>>       .print()
>>>>>> 
>>>>>> using a 1 second interval on the stream.
>>>>>> 
>>>>>> The docs say to make Spark and Hadoop jars 'provided' but this breaks
for
>>>>>> spark-streaming.  Including spark-streaming (and spark-streaming-kafka)
as
>>>>>> 'compile' to sweep them into our assembly gives collisions on javax.*
>>>>>> classes.  To work around this I modified
>>>>>> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
>>>>>> spark-streaming-kafka, and zkclient.  (Note that kafka is included
as
>>>>>> 'compile' in my project and picked up in the assembly.)
>>>>>> 
>>>>>> I have set up conf/spark-env.sh as needed.  I have copied my assembly
to
>>>>>> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.
 I am
>>>>>> running spark-submit from my spark master.  I am guided by the information
>>>>>> here https://spark.apache.org/docs/latest/submitting-applications.html
>>>>>> 
>>>>>> Well at this point I was going to detail all the ways spark-submit
fails to
>>>>>> follow it's own documentation.  If I do not invoke sparkContext.setJars()
>>>>>> then it just fails to find the driver class.  This is using various
>>>>>> combinations of absolute path, file:, hdfs: (Warning: Skip remote
jar)??,
>>>>>> and local: prefixes on the application-jar and --jars arguments.
>>>>>> 
>>>>>> If I invoke sparkContext.setJars() and include my assembly jar I
get
>>>>>> further.  At this point I get a failure from
>>>>>> kafka.consumer.ConsumerConnector not being found.  I suspect this
is because
>>>>>> spark-streaming-kafka needs the Kafka dependency it but my assembly
jar is
>>>>>> too late in the classpath.
>>>>>> 
>>>>>> At this point I try setting spark.files.userClassPathfirst to 'true'
but
>>>>>> this causes more things to blow up.
>>>>>> 
>>>>>> I finally found something that works.  Namely setting environment
variable
>>>>>> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated
and I'm
>>>>>> helpfully informed to
>>>>>> 
>>>>>>   Please instead use:
>>>>>>    - ./spark-submit with --driver-class-path to augment the driver
classpath
>>>>>>    - spark.executor.extraClassPath to augment the executor classpath
>>>>>> 
>>>>>> which when put into a file and introduced with --properties-file
does not
>>>>>> work.  (Also tried spark.files.userClassPathFirst here.)  These fail
with
>>>>>> the kafka.consumer.ConsumerConnector error.
>>>>>> 
>>>>>> At a guess what's going on is that using SPARK_CLASSPATH I have my
assembly
>>>>>> jar in the classpath at SparkSubmit invocation
>>>>>> 
>>>>>>   Spark Command: java -cp
>>>>>> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
>>>>>> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>>>>>> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
>>>>>> /tmp/myjar.jar
>>>>>> 
>>>>>> but using --properties-file then the assembly is not available for
>>>>>> SparkSubmit.
>>>>>> 
>>>>>> I think the root cause is either spark-submit not handling the
>>>>>> spark-streaming libraries so they can be 'provided' or the inclusion
of
>>>>>> org.elicpse.jetty.orbit in the streaming libraries which cause
>>>>>> 
>>>>>>   [error] (*:assembly) deduplicate: different file contents found
in the
>>>>>> following:
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>>>>>> 
>>>>>> I've tried applying mergeStategy in assembly for my assembly.sbt
but then I
>>>>>> get
>>>>>> 
>>>>>>   Invalid signature file digest for Manifest main attributes
>>>>>> 
>>>>>> If anyone knows the magic to get this working a reply would be greatly
>>>>>> appreciated.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
>>>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 

Mime
View raw message