spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Fregly <ch...@fregly.com>
Subject Re: Spark + Kinesis
Date Sun, 10 May 2015 04:14:34 GMT
hey vadim-

sorry for the delay.

if you're interested in trying to get Kinesis working one-on-one, shoot me
a direct email and we'll get it going off-list.

we can circle back and summarize our findings here.

lots of people are using Spark Streaming+Kinesis successfully.

would love to help you through this - albeit a month later!  the goal is to
have this working out of the box, so i'd like to implement anything i can
do to make that happen.

lemme know.

btw, Spark 1.4 will have some improvements to the Kinesis Spark Streaming.

TD and I have been working together on this.

thanks!

-chris

On Tue, Apr 7, 2015 at 6:17 PM, Vadim Bichutskiy <vadim.bichutskiy@gmail.com
> wrote:

> Hey y'all,
>
> While I haven't been able to get Spark + Kinesis integration working, I
> pivoted to plan B: I now push data to S3 where I set up a DStream to
> monitor an S3 bucket with textFileStream, and that works great.
>
> I <3 Spark!
>
> Best,
> Vadim
>
>
> ᐧ
>
> On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy <
> vadim.bichutskiy@gmail.com> wrote:
>
>> Hi all,
>>
>> I am wondering, has anyone on this list been able to successfully
>> implement Spark on top of Kinesis?
>>
>> Best,
>> Vadim
>>
>> On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy <
>> vadim.bichutskiy@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Below is the output that I am getting. My Kinesis stream has 1 shard,
>>> and my Spark cluster on EC2 has 2 slaves (I think that's fine?).
>>> I should mention that my Kinesis producer is written in Python where I
>>> followed the example
>>> http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python
>>>
>>> I also wrote a Python consumer, again using the example at the above
>>> link, that works fine. But I am unable to display output from my Spark
>>> consumer.
>>>
>>> I'd appreciate any help.
>>>
>>> Thanks,
>>> Vadim
>>>
>>> -------------------------------------------
>>>
>>> Time: 1428254090000 ms
>>>
>>> -------------------------------------------
>>>
>>>
>>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming
>>> job 1428254090000 ms.0 from job set of time 1428254090000 ms
>>>
>>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for
>>> time 1428254090000 ms (execution: 0.090 s)
>>>
>>> 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence
>>> list
>>>
>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63
>>>
>>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from
>>> persistence list
>>>
>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62
>>>
>>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from
>>> persistence list
>>>
>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61
>>>
>>> 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence
>>> list
>>>
>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60
>>>
>>> 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence
>>> list
>>>
>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59
>>>
>>> 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of
>>> RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time
>>> 1428254090000 ms
>>>
>>> ***********
>>>
>>> 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
>>> ArrayBuffer(1428254070000 ms)
>>> On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy <
>>> vadim.bichutskiy@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> More good news! I was able to utilize mergeStrategy to assembly my
>>>> Kinesis consumer into an "uber jar"
>>>>
>>>> Here's what I added to* build.sbt:*
>>>>
>>>> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old)
=>*
>>>> *  {*
>>>> *  case PathList("com", "esotericsoftware", "minlog", xs @ _*) =>
>>>> MergeStrategy.first*
>>>> *  case PathList("com", "google", "common", "base", xs @ _*) =>
>>>> MergeStrategy.first*
>>>> *  case PathList("org", "apache", "commons", xs @ _*) =>
>>>> MergeStrategy.last*
>>>> *  case PathList("org", "apache", "hadoop", xs @ _*) =>
>>>> MergeStrategy.first*
>>>> *  case PathList("org", "apache", "spark", "unused", xs @ _*) =>
>>>> MergeStrategy.first*
>>>> *        case x => old(x)*
>>>> *  }*
>>>> *}*
>>>>
>>>> Everything appears to be working fine. Right now my producer is pushing
>>>> simple strings through Kinesis,
>>>> which my consumer is trying to print (using Spark's print() method for
>>>> now).
>>>>
>>>> However, instead of displaying my strings, I get the following:
>>>>
>>>> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting
>>>> batches ArrayBuffer(1428173848000 ms)*
>>>>
>>>> Any idea on what might be going on?
>>>>
>>>> Thanks,
>>>>
>>>> Vadim
>>>>
>>>> Here's my consumer code (adapted from the WordCount example):
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *private object MyConsumer extends Logging {  def main(args:
>>>> Array[String]) {    /* Check that all required args were passed in. */
>>>> if (args.length < 2) {      System.err.println(        """          |Usage:
>>>> KinesisWordCount <stream-name> <endpoint-url>          |    <stream-name>
>>>> is the name of the Kinesis stream          |    <endpoint-url> is the
>>>> endpoint of the Kinesis service          |                   (e.g.
>>>> https://kinesis.us-east-1.amazonaws.com
>>>> <https://kinesis.us-east-1.amazonaws.com>)        """.stripMargin)
>>>> System.exit(1)    }    /* Populate the appropriate variables from the given
>>>> args */    val Array(streamName, endpointUrl) = args    /* Determine the
>>>> number of shards from the stream */    val kinesisClient = new
>>>> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
>>>> kinesisClient.setEndpoint(endpointUrl)    val numShards =
>>>> kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>>>>     .size()    System.out.println("Num shards: " + numShards)    /* In this
>>>> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each
>>>> shard. */    val numStreams = numShards    /* Setup the and SparkConfig and
>>>> StreamingContext */    /* Spark Streaming batch interval */    val
>>>> batchInterval = Milliseconds(2000)    val sparkConfig = new
>>>> SparkConf().setAppName("MyConsumer")    val ssc = new
>>>> StreamingContext(sparkConfig, batchInterval)    /* Kinesis checkpoint
>>>> interval.  Same as batchInterval for this example. */    val
>>>> kinesisCheckpointInterval = batchInterval    /* Create the same number of
>>>> Kinesis DStreams/Receivers as Kinesis stream's shards */    val
>>>> kinesisStreams = (0 until numStreams).map { i =>
>>>> KinesisUtils.createStream(ssc, streamName, endpointUrl,
>>>> kinesisCheckpointInterval,          InitialPositionInStream.LATEST,
>>>> StorageLevel.MEMORY_AND_DISK_2)    }    /* Union all the streams */    val
>>>> unionStreams  = ssc.union(kinesisStreams).map(byteArray => new
>>>> String(byteArray))    unionStreams.print()    ssc.start()
>>>> ssc.awaitTermination()  }}*
>>>>
>>>>
>>>> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <tdas@databricks.com>
>>>> wrote:
>>>>
>>>>> Just remove "provided" for spark-streaming-kinesis-asl
>>>>>
>>>>> libraryDependencies += "org.apache.spark" %%
>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>
>>>>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy <
>>>>> vadim.bichutskiy@gmail.com> wrote:
>>>>>
>>>>>> Thanks. So how do I fix it?
>>>>>>
>>>>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonathak@amazon.com>
>>>>>> wrote:
>>>>>>
>>>>>>>   spark-streaming-kinesis-asl is not part of the Spark distribution
>>>>>>> on your cluster, so you cannot have it be just a "provided" dependency.
>>>>>>> This is also why the KCL and its dependencies were not included
in the
>>>>>>> assembly (but yes, they should be).
>>>>>>>
>>>>>>>
>>>>>>>  ~ Jonathan Kelly
>>>>>>>
>>>>>>>   From: Vadim Bichutskiy <vadim.bichutskiy@gmail.com>
>>>>>>> Date: Friday, April 3, 2015 at 12:26 PM
>>>>>>> To: Jonathan Kelly <jonathak@amazon.com>
>>>>>>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>>>>>> Subject: Re: Spark + Kinesis
>>>>>>>
>>>>>>>   Hi all,
>>>>>>>
>>>>>>>  Good news! I was able to create a Kinesis consumer and assemble
it
>>>>>>> into an "uber jar" following
>>>>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.html&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9>
>>>>>>> and example
>>>>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scala&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9>
>>>>>>> .
>>>>>>>
>>>>>>>  However when I try to spark-submit it I get the following
>>>>>>> exception:
>>>>>>>
>>>>>>>  *Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>>> com/amazonaws/auth/AWSCredentialsProvider*
>>>>>>>
>>>>>>>  Do I need to include KCL dependency in *build.sbt*, here's what
it
>>>>>>> looks like currently:
>>>>>>>
>>>>>>>  import AssemblyKeys._
>>>>>>> name := "Kinesis Consumer"
>>>>>>> version := "1.0"
>>>>>>> organization := "com.myconsumer"
>>>>>>> scalaVersion := "2.11.5"
>>>>>>>
>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" %
>>>>>>> "1.3.0" % "provided"
>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming"
%
>>>>>>> "1.3.0" % "provided"
>>>>>>> libraryDependencies += "org.apache.spark" %%
>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided"
>>>>>>>
>>>>>>>  assemblySettings
>>>>>>> jarName in assembly :=  "consumer-assembly.jar"
>>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>>> assembly).value.copy(includeScala=false)
>>>>>>>
>>>>>>>  Any help appreciated.
>>>>>>>
>>>>>>>  Thanks,
>>>>>>> Vadim
>>>>>>>
>>>>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonathak@amazon.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>>  It looks like you're attempting to mix Scala versions, so
that's
>>>>>>>> going to cause some problems.  If you really want to use
Scala 2.11.5, you
>>>>>>>> must also use Spark package versions built for Scala 2.11
rather than
>>>>>>>> 2.10.  Anyway, that's not quite the correct way to specify
Scala
>>>>>>>> dependencies in build.sbt.  Instead of placing the Scala
version after the
>>>>>>>> artifactId (like "spark-core_2.10"), what you actually want
is to use just
>>>>>>>> "spark-core" with two percent signs before it.  Using two
percent signs
>>>>>>>> will make it use the version of Scala that matches your declared
>>>>>>>> scalaVersion.  For example:
>>>>>>>>
>>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core"
%
>>>>>>>> "1.3.0" % "provided"
>>>>>>>>
>>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-streaming"
%
>>>>>>>> "1.3.0" % "provided"
>>>>>>>>
>>>>>>>>  libraryDependencies += "org.apache.spark" %%
>>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>>>>
>>>>>>>>  I think that may get you a little closer, though I think
you're
>>>>>>>> probably going to run into the same problems I ran into in
this thread:
>>>>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html
>>>>>>>> I never really got an answer for that, and I temporarily
moved on to other
>>>>>>>> things for now.
>>>>>>>>
>>>>>>>>
>>>>>>>>  ~ Jonathan Kelly
>>>>>>>>
>>>>>>>>   From: 'Vadim Bichutskiy' <vadim.bichutskiy@gmail.com>
>>>>>>>> Date: Thursday, April 2, 2015 at 9:53 AM
>>>>>>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>>>>>>> Subject: Spark + Kinesis
>>>>>>>>
>>>>>>>>   Hi all,
>>>>>>>>
>>>>>>>>  I am trying to write an Amazon Kinesis consumer Scala app
that
>>>>>>>> processes data in the
>>>>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*:
>>>>>>>>
>>>>>>>>  -------
>>>>>>>> *import AssemblyKeys._*
>>>>>>>> *name := "Kinesis Consumer"*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion
:=
>>>>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" %
"spark-core_2.10"
>>>>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10"
%
>>>>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10"
% "1.3.0")*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> * assemblySettings jarName in assembly :=  "consumer-assembly.jar"
>>>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>>>> assembly).value.copy(includeScala=false)*
>>>>>>>> --------
>>>>>>>>
>>>>>>>>  In *project/assembly.sbt* I have only the following line:
>>>>>>>>
>>>>>>>>  *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")*
>>>>>>>>
>>>>>>>>  I am using sbt 0.13.7. I adapted Example 7.7 in the Learning
>>>>>>>> Spark book.
>>>>>>>>
>>>>>>>>  Thanks,
>>>>>>>> Vadim
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message