spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Narrell <matt.narr...@gmail.com>
Subject Re: Multiple Kafka Receivers and Union
Date Wed, 24 Sep 2014 14:46:17 GMT
The part that works is the commented out, single receiver stream below the loop.  It seems
that when I call KafkaUtils.createStream more than once, I don’t receive any messages.

I’ll dig through the logs, but at first glance yesterday I didn’t see anything suspect.
 I’ll have to look closer.

mn

On Sep 23, 2014, at 6:14 PM, Tim Smith <secsubs@gmail.com> wrote:

> Maybe post the before-code as in what was the code before you did the
> loop (that worked)? I had similar situations where reviewing code
> before (worked) and after (does not work) helped. Also, what helped is
> the Scala REPL because I can see what are the object types being
> returned by each statement.
> 
> Other than code, in the driver logs, you should see events that say
> "Registered receiver for stream 0 from
> akka.tcp://spark@node5.acme.net:53135"
> 
> Now, if you goto "node5" and look at Spark or YarnContainer logs
> (depending on who's doing RM), you should be able to see if the
> receiver has any errors when trying to talk to kafka.
> 
> 
> 
> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narrell@gmail.com> wrote:
>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, but
this may cause waves for me upstream (e.g., non-Java)
>> 
>> Thanks for looking at this.  If anyone else can see a glaring issue in the Java approach
that would be appreciated.
>> 
>> Thanks,
>> Matt
>> 
>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secsubs@gmail.com> wrote:
>> 
>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>> equivalent (that I have tested to work):
>>> 
>>> val kInStreams = (1 to 10).map{_ =>
>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>> across the cluster, one for each partition, potentially but active
>>> receivers are only as many kafka partitions you have
>>> 
>>> val kInMsg = ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narrell@gmail.com>
wrote:
>>>> So, this is scrubbed some for confidentiality, but the meat of it is as follows.
 Note, that if I substitute the commented section for the loop, I receive messages from the
topic.
>>>> 
>>>> SparkConf sparkConf = new SparkConf();
>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>> sparkConf.set("spark.logConf", "true");
>>>> 
>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>> kafkaProps.put("group.id", groupId);
>>>> 
>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Seconds.apply(1));
>>>> jsc.checkpoint("hdfs://<some_location>");
>>>> 
>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new
ArrayList<>(5);
>>>> 
>>>> for (int i = 0; i < 5; i++) {
>>>>   streamList.add(KafkaUtils.createStream(jsc,
>>>>                                          String.class, ProtobufModel.class,
>>>>                                          StringDecoder.class, ProtobufModelDecoder.class,
>>>>                                          kafkaProps,
>>>>                                          Collections.singletonMap(topic,
1),
>>>>                                          StorageLevel.MEMORY_ONLY_SER()));
>>>> }
>>>> 
>>>> final JavaPairDStream<String, ProtobufModel> stream = jsc.union(streamList.get(0),
streamList.subList(1, streamList.size()));
>>>> 
>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream
=
>>>> //                  KafkaUtils.createStream(jsc,
>>>> //                                          String.class, ProtobufModel.class,
>>>> //                                          StringDecoder.class, ProtobufModelDecoder.class,
>>>> //                                          kafkaProps,
>>>> //                                          Collections.singletonMap(topic,
5),
>>>> //                                          StorageLevel.MEMORY_ONLY_SER());
>>>> 
>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>       new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>()
{
>>>>           @Override
>>>>           public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel>
tuple) throws Exception {
>>>>               return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>           }
>>>>       });
>>>> 
>>>> … and futher Spark functions ...
>>>> 
>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secsubs@gmail.com> wrote:
>>>> 
>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>> 
>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narrell@gmail.com>
wrote:
>>>>>> Hey,
>>>>>> 
>>>>>> Spark 1.1.0
>>>>>> Kafka 0.8.1.1
>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>> 
>>>>>> I have a five partition Kafka topic.  I can create a single Kafka
receiver
>>>>>> via KafkaUtils.createStream with five threads in the topic map and
consume
>>>>>> messages fine.  Sifting through the user list and Google, I see that
its
>>>>>> possible to split the Kafka receiver among the Spark workers such
that I can
>>>>>> have a receiver per topic, and have this distributed to workers rather
than
>>>>>> localized to the driver.  I’m following something like this:
>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “
Receiving
>>>>>> multiple data streams can therefore be achieved by creating multiple
input
>>>>>> DStreams and configuring them to receive different partitions of
the data
>>>>>> stream from the source(s)."
>>>>>> 
>>>>>> However, I’m not able to consume any messages from Kafka after
I perform the
>>>>>> union operation.  Again, if I create a single, multi-threaded, receiver
I
>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and
call
>>>>>> jssc.union(…) i get:
>>>>>> 
>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>> 
>>>>>> 
>>>>>> Do I need to do anything to the unioned DStream?  Am I going about
this
>>>>>> incorrectly?
>>>>>> 
>>>>>> Thanks in advance.
>>>>>> 
>>>>>> Matt
>>>>> 
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>> 
>>>> 
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>> 
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message