spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Reason for Kafka topic existence check / "Does the topic exist?" error
Date Sat, 29 Oct 2016 14:48:01 GMT
I tested your claims that "it used to work that way", and was unable
to reproduce them.  As far as I can tell, streams have always failed
the very first time you start them in that situation.  As Chris and I
pointed out, there are good reasons for that.

If you don't wan't to operationalize topic creation, just start the
stream again after it fails the very first time you start it with a
new topic.  If you don't want to operationalize monitoring whether
streams actually started, especially when it fails within seconds, I
don't know what more I can say.

On Sat, Oct 29, 2016 at 8:52 AM, Dmitry Goldenberg
<dgoldenberg123@gmail.com> wrote:
> Cody,
>
> Thanks for your comments.
>
> The way I'm reading the Kafka documentation
> (https://kafka.apache.org/documentation) is that auto.create.topics.enable
> is set to true by default. Right now it's not set in our server.properties
> on the Kafka broker side so I would imagine that the first request to
> publish a document into topic X would cause X to be created, as
> auto.create.topics.enable is presumably defaulted to true.
>
> Basically, I used to be able to start a streaming Kafka job first, without
> the topic X already existing, then let the producer publish the first (and
> all subsequent) documents and the consumer would get the documents from that
> point.
>
> This mode is not working anymore. Despite auto.create.topics.enable
> presumably defaulting to true (?), I'm getting the "Does the topic exist"
> exception.
>
> Not a big problem but raises the question of, when would the topic be
> "auto-created" if not on the first document being published to it?
>
> It was nice when it was working because we didn't have to operationalize
> topic creation. Not a big deal but now we'll have to make sure we execute
> the 'create-topics' type of task or shell script at install time.
>
> This seems like a Kafka doc issue potentially, to explain what exactly one
> can expect from the auto.create.topics.enable flag.
>
> -Dmitry
>
>
> On Sat, Oct 8, 2016 at 1:26 PM, Cody Koeninger <cody@koeninger.org> wrote:
>>
>> So I just now retested this with 1.5.2, and 2.0.0, and the behavior is
>> exactly the same across spark versions.
>>
>> If the topic hasn't been created, you will get that error on startup,
>> because the topic doesn't exist and thus doesn't have metadata.
>>
>> If you have auto.create.topics.enable set to true on the broker
>> config, the request will fairly quickly lead to the topic being
>> created after the fact.
>>
>> All you have to do is hit up-arrow-enter and re-submit the spark job,
>> the second time around the topic will exist.  That seems pretty low
>> effort.
>>
>> I'd rather stick with having an early error for those of us that
>> prefer to run with auto.create set to false (because it makes sure the
>> topic is actually set up the way you want, reduces the likelihood of
>> spurious topics being created, etc).
>>
>>
>>
>> On Sat, Oct 8, 2016 at 11:44 AM, Dmitry Goldenberg
>> <dgoldenberg123@gmail.com> wrote:
>> > Hi,
>> >
>> > I am trying to start up a simple consumer that streams from a Kafka
>> > topic,
>> > using Spark 2.0.0:
>> >
>> > spark-streaming_2.11
>> > spark-streaming-kafka-0-8_2.11
>> >
>> > I was getting an error as below until I created the topic in Kafka. From
>> > integrating Spark 1.5, I never used to hit this check; we were able to
>> > start
>> > all of our Spark Kafka consumers, then start the producers, and have
>> > Kafka
>> > automatically create the topics once the first message for a given topic
>> > was
>> > published.
>> >
>> > Is there something I might be doing to cause this topic existence check
>> > in
>> > KafkaCluster.scala to kick in? I'd much rather be able to not have to
>> > pre-create the topics before I start the consumers.  Any
>> > thoughts/comments
>> > would be appreciated.
>> >
>> > Thanks.
>> > - Dmitry
>> >
>> > ========================================================================
>> >
>> > Exception in thread "main" org.apache.spark.SparkException:
>> > java.nio.channels.ClosedChannelException
>> >
>> > java.nio.channels.ClosedChannelException
>> >
>> > org.apache.spark.SparkException: Error getting partition metadata for
>> > '<topic name>'. Does the topic exist?
>> >
>> >         at
>> >
>> > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
>> >
>> >         at
>> >
>> > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)
>> >
>> >         at scala.util.Either.fold(Either.scala:98)
>> >
>> >         at
>> >
>> > org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372)
>> >
>> >         at
>> >
>> > org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>> >
>> >         at
>> >
>> > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>> >
>> >         at
>> >
>> > org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
>> >
>> >         at
>> >
>> > org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>> >
>> >         at
>> >
>> > com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.createContext(KafkaSparkStreamingDriver.java:253)
>> >
>> >         at
>> >
>> > com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.execute(KafkaSparkStreamingDriver.java:166)
>> >
>> >         at
>> >
>> > com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.main(KafkaSparkStreamingDriver.java:305)
>> >
>> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >
>> >         at
>> >
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >
>> >         at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >
>> >         at java.lang.reflect.Method.invoke(Method.java:498)
>> >
>> >         at
>> >
>> > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
>> >
>> >         at
>> > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
>> >
>> >         at
>> > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
>> >
>> >         at
>> > org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
>> >
>> >         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message