Is there another way to create topics from Spark? Is there any reason the above code snippet would still produce this error? I've dumbly inserted waits and retries for testing, but that still doesn't consistently work, even after waiting several minutes.

On Fri, Jan 29, 2016 at 8:29 AM, Cody Koeninger <> wrote:
The kafka direct stream doesn't do any explicit caching.  I haven't looked through the underlying simple consumer code in the kafka project in detail, but I doubt it does either.

Honestly, I'd recommend not using auto created topics (it makes it too easy to pollute your topics if someone fat-fingers something when interacting with kafka), and instead explicitly creating topics before using them.

If you're trying to create the topic in your spark job right before using it with direct stream, I can see how there might possibly be a race condition - you're using the ZK api, but the direct stream is talking only to the broker api.

On Thu, Jan 28, 2016 at 6:07 PM, asdf zxcv <> wrote:
Does Spark cache which kafka topics exist? A service incorrectly assumes all the relevant topics exist, even if they are empty, causing it to fail. Fortunately the service is automatically restarted and by default, kafka creates the topic after it is requested.

I'm trying to create the topic if it doesn't exist using AdminUtils.createTopic:

      val zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer)
      while (!AdminUtils.topicExists(zkClient, topic)) {
        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())

But I still get an Error getting partition metadata for 'topic-name'. Does the topic exist? when I execute KafkaUtils.createDirectStream

I've also tried to implement a retry with a wait such that the retry should occur after Kafka has created the requested topic with auto.create.topics.enable = true, but this still doesn't work consistently.

This is a bit frustrating to debug as well since the topic is successfully created about 50% of the time, other times I get message "Does the topic exist?". My thinking is that Spark may be caching the list of extant kafka topics, ignoring that I've added a new one. Is this the case? Am I missing something?