spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabor Somogyi <gabor.g.somo...@gmail.com>
Subject Re: Writing the contents of spark dataframe to Kafka with Spark 2.2
Date Tue, 19 Mar 2019 07:48:39 GMT
Hi Anna,

  Have you added spark-sql-kafka-0-10_2.11:2.2.0 package as well?
Further info can be found here:
https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#deploying
The same --packages option can be used with spark-shell as well...

BR,
G


On Mon, Mar 18, 2019 at 10:07 PM anna stax <annastax80@gmail.com> wrote:

> Hi all,
> I am unable to write the contents of spark dataframe to Kafka.
> I am using Spark 2.2
>
> This is my code
>
> val df = Seq(("1","One"),("2","two")).toDF("key","value")
> df.printSchema()
> df.show(false)
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .write
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "127.0.0.1:9092")
>   .option("topic", "testtopic")
>   .save()
>
> and I am getting the following error message
> [Stage 0:>                                                          (0 +
> 2) / 2]Exception in thread "main" org.apache.spark.SparkException: Job
> aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most
> recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor
> driver): java.lang.NoSuchMethodError:
> org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
> at
> org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
> at
> org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)
> at
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
> at
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I have added this dependency
>
> <dependency>
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-sql_2.11</artifactId>
>   <version>2.2.2</version>
> </dependency>
>
> Appreciate any help. Thanks.
>
> https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2
>

Mime
View raw message