spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tobias Pfeiffer <>
Subject Distribute data from Kafka evenly on cluster
Date Sat, 28 Jun 2014 04:49:04 GMT

I have a number of questions using the Kafka receiver of Spark
Streaming. Maybe someone has some more experience with that and can
help me out.

I have set up an environment for getting to know Spark, consisting of
- a Mesos cluster with 3 only-slaves and 3 master-and-slaves,
- 2 Kafka nodes,
- 3 Zookeeper nodes providing service to both Kafka and Mesos.

My Kafka cluster has only one topic with one partition (replicated to
both nodes). When I start my Kafka receiver, it successfully connects
to Kafka and does the processing, but it seems as if the (expensive)
function in the final foreachRDD(...) is only executed on one node of
my cluster, which is not what I had in mind when setting up the
cluster ;-)

So first, I was wondering about the parameter `topics: Map[String,
Int]` to KafkaUtils.createStream(). Apparently it controls how many
connections are made from my cluster nodes to Kafka. The Kafka doc at says "each
message published to a topic is delivered to one consumer instance
within each subscribing consumer group" and "If all the consumer
instances have the same consumer group, then this works just like a
traditional queue balancing load over the consumers."

The Kafka docs *also* say: "Note however that there cannot be more
consumer instances than partitions." This seems to imply that with
only one partition, increasing the number in my Map should have no

However, if I increase the number of streams for my one topic in my
`topics` Map, I actually *do* see that the task in my foreachRDD(...)
call is now executed on multiple nodes. Maybe it's more of a Kafka
question than a Spark one, but can anyone explain this to me? Should I
always have more Kafka partitions than Mesos cluster nodes?

So, assuming that changing the number in that Map is not what I want
(although I don't know if it is), I tried to use
.repartition(numOfClusterNodes) (which doesn't seem right if I want to
add and remove Mesos nodes on demand). This *also* did spread the
foreachRDD(...) action evenly – however, the function never seems to
terminate, so I never get to process the next interval in the stream.
A similar behavior can be observed when running locally, not on the
cluster, then the program will not exit but instead hang after
everything else has shut down. Any hints concerning this issue?


View raw message