spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vadim Bichutskiy <vadim.bichuts...@gmail.com>
Subject Re: Spark + Kinesis
Date Wed, 08 Apr 2015 01:17:27 GMT
Hey y'all,

While I haven't been able to get Spark + Kinesis integration working, I
pivoted to plan B: I now push data to S3 where I set up a DStream to
monitor an S3 bucket with textFileStream, and that works great.

I <3 Spark!

Best,
Vadim


ᐧ

On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy <
vadim.bichutskiy@gmail.com> wrote:

> Hi all,
>
> I am wondering, has anyone on this list been able to successfully
> implement Spark on top of Kinesis?
>
> Best,
> Vadim
>
> On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy <
> vadim.bichutskiy@gmail.com> wrote:
>
>> Hi all,
>>
>> Below is the output that I am getting. My Kinesis stream has 1 shard, and
>> my Spark cluster on EC2 has 2 slaves (I think that's fine?).
>> I should mention that my Kinesis producer is written in Python where I
>> followed the example
>> http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python
>>
>> I also wrote a Python consumer, again using the example at the above
>> link, that works fine. But I am unable to display output from my Spark
>> consumer.
>>
>> I'd appreciate any help.
>>
>> Thanks,
>> Vadim
>>
>> -------------------------------------------
>>
>> Time: 1428254090000 ms
>>
>> -------------------------------------------
>>
>>
>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job
>> 1428254090000 ms.0 from job set of time 1428254090000 ms
>>
>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for
>> time 1428254090000 ms (execution: 0.090 s)
>>
>> 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence
>> list
>>
>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63
>>
>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from
>> persistence list
>>
>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62
>>
>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from
>> persistence list
>>
>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61
>>
>> 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list
>>
>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60
>>
>> 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list
>>
>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59
>>
>> 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of
>> RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time
>> 1428254090000 ms
>>
>> ***********
>>
>> 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
>> ArrayBuffer(1428254070000 ms)
>> On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy <
>> vadim.bichutskiy@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> More good news! I was able to utilize mergeStrategy to assembly my
>>> Kinesis consumer into an "uber jar"
>>>
>>> Here's what I added to* build.sbt:*
>>>
>>> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>*
>>> *  {*
>>> *  case PathList("com", "esotericsoftware", "minlog", xs @ _*) =>
>>> MergeStrategy.first*
>>> *  case PathList("com", "google", "common", "base", xs @ _*) =>
>>> MergeStrategy.first*
>>> *  case PathList("org", "apache", "commons", xs @ _*) =>
>>> MergeStrategy.last*
>>> *  case PathList("org", "apache", "hadoop", xs @ _*) =>
>>> MergeStrategy.first*
>>> *  case PathList("org", "apache", "spark", "unused", xs @ _*) =>
>>> MergeStrategy.first*
>>> *        case x => old(x)*
>>> *  }*
>>> *}*
>>>
>>> Everything appears to be working fine. Right now my producer is pushing
>>> simple strings through Kinesis,
>>> which my consumer is trying to print (using Spark's print() method for
>>> now).
>>>
>>> However, instead of displaying my strings, I get the following:
>>>
>>> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches
>>> ArrayBuffer(1428173848000 ms)*
>>>
>>> Any idea on what might be going on?
>>>
>>> Thanks,
>>>
>>> Vadim
>>>
>>> Here's my consumer code (adapted from the WordCount example):
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *private object MyConsumer extends Logging {  def main(args:
>>> Array[String]) {    /* Check that all required args were passed in. */
>>> if (args.length < 2) {      System.err.println(        """          |Usage:
>>> KinesisWordCount <stream-name> <endpoint-url>          |    <stream-name>
>>> is the name of the Kinesis stream          |    <endpoint-url> is the
>>> endpoint of the Kinesis service          |                   (e.g.
>>> https://kinesis.us-east-1.amazonaws.com
>>> <https://kinesis.us-east-1.amazonaws.com>)        """.stripMargin)
>>> System.exit(1)    }    /* Populate the appropriate variables from the given
>>> args */    val Array(streamName, endpointUrl) = args    /* Determine the
>>> number of shards from the stream */    val kinesisClient = new
>>> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
>>> kinesisClient.setEndpoint(endpointUrl)    val numShards =
>>> kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>>>     .size()    System.out.println("Num shards: " + numShards)    /* In this
>>> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each
>>> shard. */    val numStreams = numShards    /* Setup the and SparkConfig and
>>> StreamingContext */    /* Spark Streaming batch interval */    val
>>> batchInterval = Milliseconds(2000)    val sparkConfig = new
>>> SparkConf().setAppName("MyConsumer")    val ssc = new
>>> StreamingContext(sparkConfig, batchInterval)    /* Kinesis checkpoint
>>> interval.  Same as batchInterval for this example. */    val
>>> kinesisCheckpointInterval = batchInterval    /* Create the same number of
>>> Kinesis DStreams/Receivers as Kinesis stream's shards */    val
>>> kinesisStreams = (0 until numStreams).map { i =>
>>> KinesisUtils.createStream(ssc, streamName, endpointUrl,
>>> kinesisCheckpointInterval,          InitialPositionInStream.LATEST,
>>> StorageLevel.MEMORY_AND_DISK_2)    }    /* Union all the streams */    val
>>> unionStreams  = ssc.union(kinesisStreams).map(byteArray => new
>>> String(byteArray))    unionStreams.print()    ssc.start()
>>> ssc.awaitTermination()  }}*
>>>
>>>
>>> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <tdas@databricks.com>
>>> wrote:
>>>
>>>> Just remove "provided" for spark-streaming-kinesis-asl
>>>>
>>>> libraryDependencies += "org.apache.spark" %%
>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>
>>>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy <
>>>> vadim.bichutskiy@gmail.com> wrote:
>>>>
>>>>> Thanks. So how do I fix it?
>>>>>
>>>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonathak@amazon.com>
>>>>> wrote:
>>>>>
>>>>>>   spark-streaming-kinesis-asl is not part of the Spark distribution
>>>>>> on your cluster, so you cannot have it be just a "provided" dependency.
>>>>>> This is also why the KCL and its dependencies were not included in
the
>>>>>> assembly (but yes, they should be).
>>>>>>
>>>>>>
>>>>>>  ~ Jonathan Kelly
>>>>>>
>>>>>>   From: Vadim Bichutskiy <vadim.bichutskiy@gmail.com>
>>>>>> Date: Friday, April 3, 2015 at 12:26 PM
>>>>>> To: Jonathan Kelly <jonathak@amazon.com>
>>>>>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>>>>> Subject: Re: Spark + Kinesis
>>>>>>
>>>>>>   Hi all,
>>>>>>
>>>>>>  Good news! I was able to create a Kinesis consumer and assemble
it
>>>>>> into an "uber jar" following
>>>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.html&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9>
>>>>>> and example
>>>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scala&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9>
>>>>>> .
>>>>>>
>>>>>>  However when I try to spark-submit it I get the following exception:
>>>>>>
>>>>>>  *Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>> com/amazonaws/auth/AWSCredentialsProvider*
>>>>>>
>>>>>>  Do I need to include KCL dependency in *build.sbt*, here's what
it
>>>>>> looks like currently:
>>>>>>
>>>>>>  import AssemblyKeys._
>>>>>> name := "Kinesis Consumer"
>>>>>> version := "1.0"
>>>>>> organization := "com.myconsumer"
>>>>>> scalaVersion := "2.11.5"
>>>>>>
>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"
>>>>>> % "provided"
>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>>>> "1.3.0" % "provided"
>>>>>> libraryDependencies += "org.apache.spark" %%
>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided"
>>>>>>
>>>>>>  assemblySettings
>>>>>> jarName in assembly :=  "consumer-assembly.jar"
>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>> assembly).value.copy(includeScala=false)
>>>>>>
>>>>>>  Any help appreciated.
>>>>>>
>>>>>>  Thanks,
>>>>>> Vadim
>>>>>>
>>>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonathak@amazon.com>
>>>>>> wrote:
>>>>>>
>>>>>>>  It looks like you're attempting to mix Scala versions, so that's
>>>>>>> going to cause some problems.  If you really want to use Scala
2.11.5, you
>>>>>>> must also use Spark package versions built for Scala 2.11 rather
than
>>>>>>> 2.10.  Anyway, that's not quite the correct way to specify Scala
>>>>>>> dependencies in build.sbt.  Instead of placing the Scala version
after the
>>>>>>> artifactId (like "spark-core_2.10"), what you actually want is
to use just
>>>>>>> "spark-core" with two percent signs before it.  Using two percent
signs
>>>>>>> will make it use the version of Scala that matches your declared
>>>>>>> scalaVersion.  For example:
>>>>>>>
>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" %
>>>>>>> "1.3.0" % "provided"
>>>>>>>
>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-streaming"
%
>>>>>>> "1.3.0" % "provided"
>>>>>>>
>>>>>>>  libraryDependencies += "org.apache.spark" %%
>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>>>
>>>>>>>  I think that may get you a little closer, though I think you're
>>>>>>> probably going to run into the same problems I ran into in this
thread:
>>>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html
 I
>>>>>>> never really got an answer for that, and I temporarily moved
on to other
>>>>>>> things for now.
>>>>>>>
>>>>>>>
>>>>>>>  ~ Jonathan Kelly
>>>>>>>
>>>>>>>   From: 'Vadim Bichutskiy' <vadim.bichutskiy@gmail.com>
>>>>>>> Date: Thursday, April 2, 2015 at 9:53 AM
>>>>>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>>>>>> Subject: Spark + Kinesis
>>>>>>>
>>>>>>>   Hi all,
>>>>>>>
>>>>>>>  I am trying to write an Amazon Kinesis consumer Scala app that
>>>>>>> processes data in the
>>>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*:
>>>>>>>
>>>>>>>  -------
>>>>>>> *import AssemblyKeys._*
>>>>>>> *name := "Kinesis Consumer"*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion
:=
>>>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % "spark-core_2.10"
>>>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10"
%
>>>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10"
% "1.3.0")*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> * assemblySettings jarName in assembly :=  "consumer-assembly.jar"
>>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>>> assembly).value.copy(includeScala=false)*
>>>>>>> --------
>>>>>>>
>>>>>>>  In *project/assembly.sbt* I have only the following line:
>>>>>>>
>>>>>>>  *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")*
>>>>>>>
>>>>>>>  I am using sbt 0.13.7. I adapted Example 7.7 in the Learning
Spark
>>>>>>> book.
>>>>>>>
>>>>>>>  Thanks,
>>>>>>> Vadim
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message