spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: class KafkaCluster related errors
Date Thu, 17 Jun 2021 16:39:30 GMT
This is interesting because I am using PySpark but I need these jar files
for Spark 3.1.1 and Kafka 2.7.0 to work

kafka-clients-2.7.0.jar
commons-pool2-2.9.0.jar
spark-streaming_2.12-3.1.1.jar
spark-sql-kafka-0-10_2.12-3.1.0.jar

Do you have equivalent of these artifacts in your POM file?

HTH




   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 17 Jun 2021 at 17:30, Kiran Biswal <biswalkiran@gmail.com> wrote:

> Hello Mich
>
> i added kafka-client back, I am back to seeing the earlier error about
> streaming-start.
>
> +        <dependency>
>
> +            <groupId>org.apache.kafka</groupId>
>
> +            <artifactId>kafka-clients</artifactId>
>
> +            <version>0.10.0.0</version>
>
> +        </dependency>
>
> Anything you can think of
>
> Exception in thread "streaming-start" java.lang.NoClassDefFoundError:
> org/apache/kafka/common/security/JaasContext
>
> at
> org.apache.spark.kafka010.KafkaTokenUtil$.isGlobalJaasConfigurationProvided(KafkaTokenUtil.scala:155)
>
> at
> org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:72)
>
> at
> org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:62)
>
> at
> org.apache.spark.streaming.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:64)
>
> at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:91)
>
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>
> at
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
>
> at
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
>
> at scala.collection.Iterator.foreach(Iterator.scala:941)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>
> at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
>
> at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>
> at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>
> at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>
> at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>
> at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
>
> at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
>
> at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
>
> at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
>
> at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
>
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.security.JaasContext
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>
> ... 27 more
>
> Regards
> Kiran
>
> On Thu, Jun 17, 2021 at 7:30 AM Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> Hi Kiran,
>>
>> You need kafka-clients for the version of kafka you are using. So if it
>> is the correct version keep it.
>>
>> Try running and see what the error says.
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 17 Jun 2021 at 15:10, Kiran Biswal <biswalkiran@gmail.com> wrote:
>>
>>> Hello Mich
>>>
>>> Thanks for the pointer. I believe they helped. I remove kafka-streams
>>> and  kafka-client. Is kafka-client needed?
>>>
>>> The streaming application runs in kubernetes environment. I see that the
>>> driver starts but subsequently the executors start but they crash and start
>>> in a repeat loop. i have attached some logs from the spark driver pod (see
>>> spark_driver_pod.txt)
>>>
>>> Any thoughts on what's causing the crash? Do these warnings look serious?
>>>
>>> Thanks
>>> Kiran
>>>
>>> On Sun, Jun 13, 2021 at 12:48 AM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Hi Kiran
>>>>
>>>> Looking at your pom file Isee below
>>>>
>>>>      <dependency>
>>>> 	        <groupId>org.apache.kafka</groupId>
>>>> 	        <artifactId>kafka-streams</artifactId>
>>>> 	        <version>0.10.0.0</version>
>>>>
>>>>      </dependency>
>>>>
>>>>
>>>> Why do you need  kafka-streams? I don't think spark uses it and may get
>>>> libraries confused.
>>>>
>>>> HTH
>>>>
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, 13 Jun 2021 at 05:17, Kiran Biswal <biswalkiran@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Mich
>>>>>
>>>>> Thanks a lot for all your help. As per this document spark 3.0.1 and
>>>>> kafka 0.10 is a supported  combination that's what i have
>>>>>
>>>>>
>>>>> https://spark.apache.org/docs/3.0.1/streaming-kafka-0-10-integration.html
>>>>>
>>>>> I have attached the pom.xml file (I use maven environment). Would you
>>>>> kindly take a look to see if something needs to change in terms of
>>>>> versioning or anything additional I need to include to get streaming
>>>>> working?
>>>>>
>>>>> Look forward to your response
>>>>>
>>>>> Regards
>>>>> Kiran
>>>>>
>>>>> On Fri, Jun 11, 2021 at 1:09 AM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Hi Kiran. I was using
>>>>>>
>>>>>>    - Kafka Cluster 2.12-1.1.0
>>>>>>    - Spark Streaming 2.3, Spark SQL 2.3
>>>>>>    - Scala 2.11.8
>>>>>>
>>>>>> Your Kafka version 0.10 seems to be pretty old. That may be the issue
>>>>>> here. Try upgrading Kafka in a test environment to see if it helps.
>>>>>>
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property
which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary
damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, 11 Jun 2021 at 08:55, Kiran Biswal <biswalkiran@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Mich
>>>>>>>
>>>>>>> When you were using dstream, what version of kafka, spark and
scala
>>>>>>> were you using?
>>>>>>>
>>>>>>> I am using kafka 0.10 with spark 3.0.1 and scala 2.12. Do you
feel
>>>>>>> this combination can reliably work from streaming point of view.?
>>>>>>>
>>>>>>> I get below error when invoke createDirectStreamException. Any
>>>>>>> suggestions on how to move forard here?
>>>>>>>
>>>>>>> Thanks a kit for ak help.
>>>>>>> Thanks
>>>>>>> kiran
>>>>>>>
>>>>>>> in thread "streaming-start" java.lang.NoClassDefFoundError:
>>>>>>> org/apache/kafka/common/security/JaasContext
>>>>>>>
>>>>>>> 	at org.apache.spark.kafka010.KafkaTokenUtil$.isGlobalJaasConfigurationProvided(KafkaTokenUtil.scala:155)
>>>>>>> 	at org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:72)
>>>>>>> 	at org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:62)
>>>>>>> 	at org.apache.spark.streaming.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:64)
>>>>>>> 	at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:91)
>>>>>>> 	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>>>>>>> 	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>>>>>>> 	at org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
>>>>>>> 	at org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
>>>>>>> 	at scala.collection.Iterator.foreach(Iterator.scala:941)
>>>>>>> 	at scala.collection.Iterator.foreach$(Iterator.scala:941)
>>>>>>> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>>>>>>> 	at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
>>>>>>> 	at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>>>>>>> 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>>>>>> 	at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>>>>>>> 	at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>>>>>>> 	at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>>>>>>> 	at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
>>>>>>> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
>>>>>>> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
>>>>>>> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
>>>>>>> 	at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
>>>>>>> 	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>>>>>> 	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>>>>>>> 	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>>>>>>> 	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>>>>>>> Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.security.JaasContext
>>>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>>>>>> 	... 27 more
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 8, 2021 at 7:26 AM Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Kiran,
>>>>>>>>
>>>>>>>> I don't seem to have a reference to handling offsets in my
old code.
>>>>>>>>
>>>>>>>> However, in Spark structured streaming (SSS) I handle it
using a
>>>>>>>> reference to checkpointLocation as below: (this is in Python)
>>>>>>>>
>>>>>>>>        checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
>>>>>>>>
>>>>>>>>           result = resultMF.withColumn("uuid",uuidUdf())
\
>>>>>>>>                      .selectExpr("CAST(uuid AS STRING) AS
key",
>>>>>>>> "to_json(struct(startOfWindow, endOfWindow, AVGTemperature))
AS value") \
>>>>>>>>                      .writeStream \
>>>>>>>>                      .outputMode('complete') \
>>>>>>>>                      .format("kafka") \
>>>>>>>>                      .option("kafka.bootstrap.servers",
>>>>>>>> config['MDVariables']['bootstrapServers'],) \
>>>>>>>>                      .option("topic", "avgtemperature") \
>>>>>>>>                     * .option('checkpointLocation',
>>>>>>>> checkpoint_path) \*
>>>>>>>>                      .queryName("avgtemperature") \
>>>>>>>>                      .start()
>>>>>>>>
>>>>>>>> Now within that  checkpoint_path directory you have five
>>>>>>>> sub-directories containing all you need  including offsets
>>>>>>>>
>>>>>>>> /ssd/hduser/avgtemperature/chkpt> ls
>>>>>>>> commits  metadata  offsets  sources  state
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>>
>>>>>>>>    view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other
property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any
monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 8 Jun 2021 at 01:21, Kiran Biswal <biswalkiran@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello Mich
>>>>>>>>>
>>>>>>>>> Thanks a lot. Using code similar to yours I was able
to compile.
>>>>>>>>>
>>>>>>>>> One outstanding question is in my older code the
>>>>>>>>> *getConsumerOffsets *older method was handling offsets(
>>>>>>>>> latestLeaderOffsets, earliestLeaderOffsets etc, was calling
>>>>>>>>> kafkaCluster).
>>>>>>>>>
>>>>>>>>>  Will there be data loss if I don't handle offsets? In
your
>>>>>>>>> example handling offsets was not required? If I were
to handle offsets any
>>>>>>>>> examples you could share?
>>>>>>>>>
>>>>>>>>> Thanks a lot again and appreciate the great help.
>>>>>>>>> Regards
>>>>>>>>> Kiran
>>>>>>>>>
>>>>>>>>> On Mon, Jun 7, 2021 at 2:58 AM Mich Talebzadeh <
>>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Kiran,
>>>>>>>>>>
>>>>>>>>>> As you be aware  createDirectStream is depreciated
and you ought
>>>>>>>>>> to use Spark Structured streaming, especially that
you are moving to
>>>>>>>>>> version 3.0.1.
>>>>>>>>>>
>>>>>>>>>> If you still want to use dstream then that page seems
to be
>>>>>>>>>> correct
>>>>>>>>>>
>>>>>>>>>> Looking at my old code I have
>>>>>>>>>>
>>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>>> import org.apache.spark.streaming.kafka._
>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>>>
>>>>>>>>>>     val kafkaParams = Map[String, String](
>>>>>>>>>>                                       "bootstrap.servers"
->
>>>>>>>>>> bootstrapServers,
>>>>>>>>>>                                       "schema.registry.url"
->
>>>>>>>>>> schemaRegistryURL,
>>>>>>>>>>                                        "zookeeper.connect"
->
>>>>>>>>>> zookeeperConnect,
>>>>>>>>>>                                        "group.id"
->
>>>>>>>>>> sparkAppName,
>>>>>>>>>>                                        "
>>>>>>>>>> zookeeper.connection.timeout.ms" -> zookeeperConnectionTimeoutMs,
>>>>>>>>>>                                        "rebalance.backoff.ms"
->
>>>>>>>>>> rebalanceBackoffMS,
>>>>>>>>>>                                        "
>>>>>>>>>> zookeeper.session.timeout.ms" -> zookeeperSessionTimeOutMs,
>>>>>>>>>>                                        "auto.commit.interval.ms"
>>>>>>>>>> -> autoCommitIntervalMS
>>>>>>>>>>                                      )
>>>>>>>>>>     //val topicsSet = topics.split(",").toSet
>>>>>>>>>>     val topicsValue = Set(topics)
>>>>>>>>>>     val dstream = KafkaUtils.createDirectStream[String,
String,
>>>>>>>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams,
topicsValue)
>>>>>>>>>>     dstream.cache()
>>>>>>>>>>
>>>>>>>>>> HTH,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Mich
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>> responsibility for any loss, damage or destruction
of data or any other
>>>>>>>>>> property which may arise from relying on this email's
technical content is
>>>>>>>>>> explicitly disclaimed. The author will in no case
be liable for any
>>>>>>>>>> monetary damages arising from such loss, damage or
destruction.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, 7 Jun 2021 at 10:34, Kiran Biswal <biswalkiran@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Mich, Thanks a lot for your response. I am
basically trying
>>>>>>>>>>> to get some older code(streaming job to read
from kafka) in 2.0.1 spark to
>>>>>>>>>>> work in 3.0,1. The specific area where I am having
problem (KafkaCluster)
>>>>>>>>>>> has most likely to do with get/ set commit offsets
in kafka
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> // Create message Dstream for each (topic, schema
class)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     val msgStreams = config.getTopicSchemaClassMap.map
{
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>       case (kafkaTopic, schemaClass) => {
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>         val consumerOffsets = *getConsumerOffsets*(kafkaTopic)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>         val msgDStream = (KafkaUtils.createDirectStream
>>>>>>>>>>> [Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder,
>>>>>>>>>>>
>>>>>>>>>>>           Tuple2[Array[Byte],Array[Byte]]]
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>           (ssc, kafkaParams, consumerOffsets,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>           (msg: MessageAndMetadata[Array[Byte],
Array[Byte]]) =>
>>>>>>>>>>> (msg.key, msg.message)
>>>>>>>>>>>
>>>>>>>>>>>           ))
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>         (kafkaTopic, schemaClass, msgDStream)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>       }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *The getConsumerOffsets  *method  internally
used KafkaCluter
>>>>>>>>>>> which is probably deprecated.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Do You think I need to mimic the code shown here
to get/set
>>>>>>>>>>> offsets rather than use kafkaCluster?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> https://spark.apache.org/docs/3.0.0-preview/streaming-kafka-0-10-integration.html
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> Kiran
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jun 7, 2021 at 1:04 AM Mich Talebzadeh
<
>>>>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Are you trying to read topics from Kafka
in spark 3.0.1?
>>>>>>>>>>>>
>>>>>>>>>>>> Have you checked Spark 3.0.1 documentation?
>>>>>>>>>>>>
>>>>>>>>>>>> Integrating Spark with Kafka is pretty straight
forward. with
>>>>>>>>>>>> 3.0.1 and higher
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> HTH
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any
and all
>>>>>>>>>>>> responsibility for any loss, damage or destruction
of data or any other
>>>>>>>>>>>> property which may arise from relying on
this email's technical content is
>>>>>>>>>>>> explicitly disclaimed. The author will in
no case be liable for any
>>>>>>>>>>>> monetary damages arising from such loss,
damage or destruction.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, 6 Jun 2021 at 21:18, Kiran Biswal
<
>>>>>>>>>>>> biswalkiran@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> *I am using spark 3.0.1 AND Kafka 0.10
AND Scala 2.12. Getting
>>>>>>>>>>>>> an error related to KafkaCluster (not
found: type KafkaCluster). Is this
>>>>>>>>>>>>> class deprecated? How do I find a replacement?*
>>>>>>>>>>>>>
>>>>>>>>>>>>> *I am upgrading from spark 2.0.1 to spark
3.0.1*
>>>>>>>>>>>>>
>>>>>>>>>>>>> *In spark 2.0.1 KafkaCluster was supported*
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/streaming/kafka/KafkaCluster.html
>>>>>>>>>>>>>
>>>>>>>>>>>>> just looking for ideas how to achieve
same functionality in
>>>>>>>>>>>>> spark 3.0.1. Any thoughts and examples
will be highly appreciated.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Kiran
>>>>>>>>>>>>>
>>>>>>>>>>>>

Mime
View raw message