spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "A.K.M. Ashrafuzzaman" <ashrafuzzaman...@gmail.com>
Subject Re: Having problem with Spark streaming with Kinesis
Date Thu, 04 Dec 2014 04:05:03 GMT
Guys,
In my local machine it consumes a stream of Kinesis with 3 shards. But in EC2 it does not
consume from the stream. Later we found that the EC2 machine was of 2 cores and my local machine
was of 4 cores. I am using a single machine and in spark standalone mode. And we got a larger
machine from EC2 and now the kinesis is getting consumed.

4 cores Single machine -> works
2 cores Single machine -> does not work
2 cores 2 workers -> does not work

So my question is that do we need a cluster of (#KinesisShards + 1) workers to be able to
consume from Kinesis?


A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Nov 27, 2014, at 10:28 AM, Aniket Bhatnagar <aniket.bhatnagar@gmail.com> wrote:

> Did you set spark master as local[*]? If so, then it means that nunber of executors is
equal to number of cores of the machine. Perhaps your mac machine has more cores (certainly
more than number of kinesis shards +1).
> 
> Try explicitly setting master as local[N] where N is number of kinesis shards + 1. It
should then work on both the machines.
> 
> On Thu, Nov 27, 2014, 9:46 AM Ashrafuzzaman <ashrafuzzaman.g2@gmail.com> wrote:
> I was trying in one machine with just sbt run.
> 
> And it is working with my mac environment with the same configuration.
> 
> I used the sample code from https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
> 
> 
> val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
> kinesisClient.setEndpoint(endpointUrl)
> val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>   .size()
> 
> /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each
shard. */
> val numStreams = numShards
> 
> /* Setup the and SparkConfig and StreamingContext */
> /* Spark Streaming batch interval */
> val batchInterval = Milliseconds(2000)
> val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
> val ssc = new StreamingContext(sparkConfig, batchInterval)
> 
> /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
> val kinesisCheckpointInterval = batchInterval
> 
> /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */
> val kinesisStreams = (0 until numStreams).map { i =>
>   KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval,
>       InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
> }
> 
> /* Union all the streams */
> val unionStreams = ssc.union(kinesisStreams)
> 
> /* Convert each line of Array[Byte] to String, split into words, and count them */
> val words = unionStreams.flatMap(byteArray => new String(byteArray)
>   .split(" "))
> 
> /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
> 
> /* Print the first 10 wordCounts */
> wordCounts.print()
> 
> /* Start the streaming context and await termination */
> ssc.start()
> ssc.awaitTermination()
> 
> 
> 
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred
> 
> (M) 880-175-5592433
> Twitter | Blog | Facebook
> 
> Check out The Academy, your #1 source
> for free content marketing resources
> 
> On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar <aniket.bhatnagar@gmail.com>
wrote:
> What's your cluster size? For streamig to work, it needs shards + 1 executors.
> 
> On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman <ashrafuzzaman.g2@gmail.com>
wrote:
> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use more that
1 then it falls into an infinite loop and no data is processed by the spark streaming. In
the kinesis dynamo DB, I can see that it keeps increasing the leaseCounter. But it do start
processing.
> 
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
> 
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred
> 
> (M) 880-175-5592433
> Twitter | Blog | Facebook
> 
> Check out The Academy, your #1 source
> for free content marketing resources
> 
> 


Mime
View raw message