spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Goldenberg <dgoldenberg...@gmail.com>
Subject Re: Reason for Kafka topic existence check / "Does the topic exist?" error
Date Sat, 29 Oct 2016 13:52:56 GMT
Cody,

Thanks for your comments.

The way I'm reading the Kafka documentation (
https://kafka.apache.org/documentation) is that auto.create.topics.enable
is set to true by default. Right now it's not set in our server.properties
on the Kafka broker side so I would imagine that the first request to
publish a document into topic X would cause X to be created, as
auto.create.topics.enable is presumably defaulted to true.

Basically, I used to be able to start a streaming Kafka job first, without
the topic X already existing, then let the producer publish the first (and
all subsequent) documents and the consumer would get the documents from
that point.

This mode is not working anymore. Despite auto.create.topics.enable
presumably defaulting to true (?), I'm getting the "Does the topic exist"
exception.

Not a big problem but raises the question of, when would the topic be
"auto-created" if not on the first document being published to it?

It was nice when it was working because we didn't have to operationalize
topic creation. Not a big deal but now we'll have to make sure we execute
the 'create-topics' type of task or shell script at install time.

This seems like a Kafka doc issue potentially, to explain what exactly one
can expect from the auto.create.topics.enable flag.

-Dmitry


On Sat, Oct 8, 2016 at 1:26 PM, Cody Koeninger <cody@koeninger.org> wrote:

> So I just now retested this with 1.5.2, and 2.0.0, and the behavior is
> exactly the same across spark versions.
>
> If the topic hasn't been created, you will get that error on startup,
> because the topic doesn't exist and thus doesn't have metadata.
>
> If you have auto.create.topics.enable set to true on the broker
> config, the request will fairly quickly lead to the topic being
> created after the fact.
>
> All you have to do is hit up-arrow-enter and re-submit the spark job,
> the second time around the topic will exist.  That seems pretty low
> effort.
>
> I'd rather stick with having an early error for those of us that
> prefer to run with auto.create set to false (because it makes sure the
> topic is actually set up the way you want, reduces the likelihood of
> spurious topics being created, etc).
>
>
>
> On Sat, Oct 8, 2016 at 11:44 AM, Dmitry Goldenberg
> <dgoldenberg123@gmail.com> wrote:
> > Hi,
> >
> > I am trying to start up a simple consumer that streams from a Kafka
> topic,
> > using Spark 2.0.0:
> >
> > spark-streaming_2.11
> > spark-streaming-kafka-0-8_2.11
> >
> > I was getting an error as below until I created the topic in Kafka. From
> > integrating Spark 1.5, I never used to hit this check; we were able to
> start
> > all of our Spark Kafka consumers, then start the producers, and have
> Kafka
> > automatically create the topics once the first message for a given topic
> was
> > published.
> >
> > Is there something I might be doing to cause this topic existence check
> in
> > KafkaCluster.scala to kick in? I'd much rather be able to not have to
> > pre-create the topics before I start the consumers.  Any
> thoughts/comments
> > would be appreciated.
> >
> > Thanks.
> > - Dmitry
> >
> > ========================================================================
> >
> > Exception in thread "main" org.apache.spark.SparkException:
> > java.nio.channels.ClosedChannelException
> >
> > java.nio.channels.ClosedChannelException
> >
> > org.apache.spark.SparkException: Error getting partition metadata for
> > '<topic name>'. Does the topic exist?
> >
> >         at
> > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> checkErrors$1.apply(KafkaCluster.scala:373)
> >
> >         at
> > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> checkErrors$1.apply(KafkaCluster.scala:373)
> >
> >         at scala.util.Either.fold(Either.scala:98)
> >
> >         at
> > org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.
> scala:372)
> >
> >         at
> > org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.
> scala:222)
> >
> >         at
> > org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:484)
> >
> >         at
> > org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:607)
> >
> >         at
> > org.apache.spark.streaming.kafka.KafkaUtils.
> createDirectStream(KafkaUtils.scala)
> >
> >         at
> > com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.
> createContext(KafkaSparkStreamingDriver.java:253)
> >
> >         at
> > com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.
> execute(KafkaSparkStreamingDriver.java:166)
> >
> >         at
> > com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.
> main(KafkaSparkStreamingDriver.java:305)
> >
> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> >         at
> > sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >
> >         at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >
> >         at java.lang.reflect.Method.invoke(Method.java:498)
> >
> >         at
> > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
> >
> >         at
> > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
> >
> >         at
> > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
> >
> >         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
> scala:124)
> >
> >         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>

Mime
View raw message