spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Luis Ángel Vicente Sánchez <langel.gro...@gmail.com>
Subject Re: spark streaming, kafka, SPARK_CLASSPATH
Date Tue, 17 Jun 2014 14:38:24 GMT
After playing a bit, I have been able to create a fatjar this way:

lazy val rootDependencies = Seq(
  "org.apache.spark" %% "spark-core"              % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming"         % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark",
"spark-streaming_2.10")
)

Excluding those transitive dependencies, we can create a fatjar ~400Kb
instead of 40Mb.

My problem is not to run the streaming job locally but trying to submit it
to standalone cluster using spark-submit, everytime I ran the following
command, my workers died:

~/development/tools/spark/1.0.0/bin/spark-submit \
--class "org.apache.spark.examples.streaming.TwitterPopularTags" \
--master "spark://int-spark-master:7077" \
--deploy-mode "cluster" \
file:///tmp/spark-test-0.1-SNAPSHOT.jar

I have copied my fatjar to my master /tmp folder.


2014-06-17 10:30 GMT+01:00 Michael Cutler <michael@tumra.com>:

> Admittedly getting Spark Streaming / Kafka working for the first time can
> be a bit tricky with the web of dependencies that get pulled in.  I've
> taken the KafkaWorkCount example from the Spark project and set up a simple
> standalone SBT project that shows you how to get it working and using
> spark-submit.
>
> *https://github.com/cotdp/spark-example-kafka
> <https://github.com/cotdp/spark-example-kafka>*
>
> The key trick is in the use of sbt-assembly instead of relying on any of
> the "add jars" functionality.  You mark "spark-core" and "spark-streaming"
> as provided, because they are part of the core spark-assembly already
> running your cluster.  However "spark-streaming-kafka" is not, so you need
> to package it in your 'fat JAR' while excluding all the mess that causes
> the build to break.
>
> build.sbt
> <https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt>:
>
> import AssemblyKeys._
>
> assemblySettings
>
> name := "spark-example-kafka"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>
> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>     exclude("commons-beanutils", "commons-beanutils").
>     exclude("commons-collections", "commons-collections").
>     exclude("com.esotericsoftware.minlog", "minlog")
> )
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
>     case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>     case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>     case x if x.startsWith("plugin.properties") => MergeStrategy.last
>     case x => old(x)
>   }
> }
>
>
> You can see the "exclude()" has to go around the spark-streaming-kafka dependency,
> and I've used a MergeStrategy to solve the "deduplicate: different file
> contents found in the following" errors.
>
> Build the JAR with sbt assembly and use the scripts in bin/ to run the
> examples.
>
> I'm using this same approach to run my Spark Streaming jobs with
> spark-submit and have them managed using Mesos/Marathon
> <http://mesosphere.io/> to handle failures and restarts with long running
> processes.
>
> Good luck!
>
> MC
>
>
>
>
>
>  *Michael Cutler*
> Founder, CTO
>
>
> * Mobile: +44 789 990 7847 Email:   michael@tumra.com <michael@tumra.com>
> Web:     tumra.com
> <http://tumra.com/?utm_source=signature&utm_medium=email> *
> *Visit us at our offices in Chiswick Park <http://goo.gl/maps/abBxq>*
> *Registered in England & Wales, 07916412. VAT No. 130595328*
>
>
> This email and any files transmitted with it are confidential and may also
> be privileged. It is intended only for the person to whom it is addressed.
> If you have received this email in error, please inform the sender immediately.
> If you are not the intended recipient you must not use, disclose, copy,
> print, distribute or rely on this email.
>
>
> On 17 June 2014 02:51, Gino Bustelo <lbustelo@gmail.com> wrote:
>
>> +1 for this issue. Documentation for spark-submit are misleading. Among
>> many issues, the jar support is bad. HTTP urls do not work. This is because
>> spark is using hadoop's FileSystem class. You have to specify the jars
>> twice to get things to work. Once for the DriverWrapper to laid your
>> classes and a 2nd time in the Context to distribute to workers.
>>
>> I would like to see some contrib response to this issue.
>>
>> Gino B.
>>
>> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez <
>> langel.groups@gmail.com> wrote:
>>
>> Did you manage to make it work? I'm facing similar problems and this a
>> serious blocker issue. spark-submit seems kind of broken to me if you can
>> use it for spark-streaming.
>>
>> Regards,
>>
>> Luis
>>
>>
>> 2014-06-11 1:48 GMT+01:00 lannyripple <lanny.ripple@gmail.com>:
>>
>>> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>>>
>>> I have a toy spark-streaming-kafka program.  It reads from a kafka queue
>>> and
>>> does
>>>
>>>     stream
>>>       .map {case (k, v) => (v, 1)}
>>>       .reduceByKey(_ + _)
>>>       .print()
>>>
>>> using a 1 second interval on the stream.
>>>
>>> The docs say to make Spark and Hadoop jars 'provided' but this breaks for
>>> spark-streaming.  Including spark-streaming (and spark-streaming-kafka)
>>> as
>>> 'compile' to sweep them into our assembly gives collisions on javax.*
>>> classes.  To work around this I modified
>>> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
>>> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
>>> 'compile' in my project and picked up in the assembly.)
>>>
>>> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
>>> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I
>>> am
>>> running spark-submit from my spark master.  I am guided by the
>>> information
>>> here https://spark.apache.org/docs/latest/submitting-applications.html
>>>
>>> Well at this point I was going to detail all the ways spark-submit fails
>>> to
>>> follow it's own documentation.  If I do not invoke sparkContext.setJars()
>>> then it just fails to find the driver class.  This is using various
>>> combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
>>> and local: prefixes on the application-jar and --jars arguments.
>>>
>>> If I invoke sparkContext.setJars() and include my assembly jar I get
>>> further.  At this point I get a failure from
>>> kafka.consumer.ConsumerConnector not being found.  I suspect this is
>>> because
>>> spark-streaming-kafka needs the Kafka dependency it but my assembly jar
>>> is
>>> too late in the classpath.
>>>
>>> At this point I try setting spark.files.userClassPathfirst to 'true' but
>>> this causes more things to blow up.
>>>
>>> I finally found something that works.  Namely setting environment
>>> variable
>>> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
>>> helpfully informed to
>>>
>>>   Please instead use:
>>>    - ./spark-submit with --driver-class-path to augment the driver
>>> classpath
>>>    - spark.executor.extraClassPath to augment the executor classpath
>>>
>>> which when put into a file and introduced with --properties-file does not
>>> work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
>>> the kafka.consumer.ConsumerConnector error.
>>>
>>> At a guess what's going on is that using SPARK_CLASSPATH I have my
>>> assembly
>>> jar in the classpath at SparkSubmit invocation
>>>
>>>   Spark Command: java -cp
>>>
>>> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
>>> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>>> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
>>> /tmp/myjar.jar
>>>
>>> but using --properties-file then the assembly is not available for
>>> SparkSubmit.
>>>
>>> I think the root cause is either spark-submit not handling the
>>> spark-streaming libraries so they can be 'provided' or the inclusion of
>>> org.elicpse.jetty.orbit in the streaming libraries which cause
>>>
>>>   [error] (*:assembly) deduplicate: different file contents found in the
>>> following:
>>>   [error]
>>>
>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>>>   [error]
>>>
>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>>>   [error]
>>>
>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>>>   [error]
>>>
>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>>>
>>> I've tried applying mergeStategy in assembly for my assembly.sbt but
>>> then I
>>> get
>>>
>>>   Invalid signature file digest for Manifest main attributes
>>>
>>> If anyone knows the magic to get this working a reply would be greatly
>>> appreciated.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>

Mime
View raw message