spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabhwan.opensou...@gmail.com>
Subject Re: Data source v2 streaming sinks does not support Update mode
Date Mon, 18 Jan 2021 23:59:04 GMT
I see no issue from running this code in local dev. (changed the scope of
Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In
3.0.0-preview update mode was restricted (as the error message says) and it
was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
.m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <kabhwan.opensource@gmail.com>
wrote:

> And also include some test data as well. I quickly looked through the code
> and the code may require a specific format of the record.
>
> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <gschiavonspark@gmail.com>
> wrote:
>
>> Hi,
>>
>> This is the jira <https://issues.apache.org/jira/projects/SPARK/summary> and
>> regarding the repo, I believe just commit it to your personal repo and that
>> should be it.
>>
>> Regards
>>
>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglists19@gmail.com>
>> wrote:
>>
>>> Sorry. Can you please tell me where to create the JIRA? Also is there
>>> any specific Github repository I need to commit code into - OR - just in
>>> our own? Please let me know. Thanks.
>>>
>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
>>> wrote:
>>>
>>>> Thanks you, as we've asked could you please create a jira and commit
>>>> the code into github?
>>>> It would speed things up a lot.
>>>>
>>>> G
>>>>
>>>>
>>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglists19@gmail.com>
>>>> wrote:
>>>>
>>>>> Here's a very simple reproducer app. I've attached 3 files:
>>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents
in the
>>>>> email as well:
>>>>>
>>>>> package com.myorg
>>>>>
>>>>> import org.apache.hadoop.conf.Configuration
>>>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>>>> import org.apache.hadoop.security.UserGroupInformation
>>>>> import org.apache.kafka.clients.producer.ProducerConfig
>>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>>>
>>>>> import scala.util.{Failure, Success, Try}
>>>>>
>>>>> object Spark3Test {
>>>>>
>>>>>   val isLocal = false
>>>>>
>>>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>>>
>>>>>   val START_DATE_INDEX = 21
>>>>>   val END_DATE_INDEX = 40
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>
>>>>>     val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade",
isLocal)
>>>>>     spark.sparkContext.setLogLevel("WARN")
>>>>>
>>>>>     readKafkaStream(spark)
>>>>>       .groupByKey(row => {
>>>>>         row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>>>       })
>>>>>       .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>         updateAcrossEvents
>>>>>       )
>>>>>       .filter(row => !row.inProgress)
>>>>>       .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>>>       .writeStream
>>>>>       .format("kafka")
>>>>>       .option(
>>>>>         s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>>>>         "10.29.42.141:9092"
>>>>> //        "localhost:9092"
>>>>>       )
>>>>>       .option("topic", "spark3test")
>>>>>       .option("checkpointLocation", "/tmp/checkpoint_5")
>>>>>       .outputMode("update")
>>>>>       .start()
>>>>>     manageStreamingQueries(spark)
>>>>>   }
>>>>>
>>>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] =
{
>>>>>
>>>>>     val stream = sparkSession.readStream
>>>>>       .format("kafka")
>>>>>       .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>>>       .option("subscribe", "inputTopic")
>>>>>       .option("startingOffsets", "latest")
>>>>>       .option("failOnDataLoss", "false")
>>>>>       .option("kafkaConsumer.pollTimeoutMs", "120000")
>>>>>       .load()
>>>>>       .selectExpr("CAST(value AS STRING)")
>>>>>       .as[String](Encoders.STRING)
>>>>>     stream
>>>>>   }
>>>>>
>>>>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState:
GroupState[MyState]): MyState = {
>>>>>     if (!oldState.exists) {
>>>>>       println(key)
>>>>>       val state = MyState(key)
>>>>>       oldState.update(state)
>>>>>       oldState.setTimeoutDuration("1 minutes")
>>>>>       oldState.get
>>>>>     } else {
>>>>>       if (oldState.hasTimedOut) {
>>>>>         oldState.get.inProgress = false
>>>>>         val state = oldState.get
>>>>>         println("State timed out for key: " + state.dateTime)
>>>>>         oldState.remove()
>>>>>         state
>>>>>       } else {
>>>>>         val state = oldState.get
>>>>>         state.count = state.count + 1
>>>>>         oldState.update(state)
>>>>>         oldState.setTimeoutDuration("1 minutes")
>>>>>         oldState.get
>>>>>       }
>>>>>     }
>>>>>   }
>>>>>
>>>>>   def initializeSparkSession(applicationName: String, isLocal: Boolean):
SparkSession = {
>>>>>     UserGroupInformation.setLoginUser(
>>>>>       UserGroupInformation.createRemoteUser("hduser")
>>>>>     )
>>>>>
>>>>>     val builder = SparkSession
>>>>>       .builder()
>>>>>       .appName(applicationName)
>>>>>
>>>>>     if (isLocal) {
>>>>>       builder.config("spark.master", "local[2]")
>>>>>     }
>>>>>
>>>>>     builder.getOrCreate()
>>>>>   }
>>>>>
>>>>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>>>>
>>>>>     val sparkQueryListener = new QueryListener()
>>>>>     spark.streams.addListener(sparkQueryListener)
>>>>>
>>>>>     val shutdownMarker: String = "/tmp/stop_job"
>>>>>
>>>>>     val timeoutInMilliSeconds = 60000
>>>>>     while (!spark.streams.active.isEmpty) {
>>>>>       Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match
{
>>>>>         case Success(result) =>
>>>>>           if (result) {
>>>>>             println("A streaming query was terminated successfully")
>>>>>             spark.streams.resetTerminated()
>>>>>           }
>>>>>         case Failure(e) =>
>>>>>           println("Query failed with message: " + e.getMessage)
>>>>>           e.printStackTrace()
>>>>>           spark.streams.resetTerminated()
>>>>>       }
>>>>>
>>>>>       if (checkMarker(shutdownMarker)) {
>>>>>         spark.streams.active.foreach(query => {
>>>>>           println(s"Stopping streaming query: ${query.id}")
>>>>>           query.stop()
>>>>>         })
>>>>>         spark.stop()
>>>>>         removeMarker(shutdownMarker)
>>>>>       }
>>>>>     }
>>>>>     assert(spark.streams.active.isEmpty)
>>>>>     spark.streams.removeListener(sparkQueryListener)
>>>>>   }
>>>>>
>>>>>   def checkMarker(markerFile: String): Boolean = {
>>>>>     val fs = FileSystem.get(new Configuration())
>>>>>     fs.exists(new Path(markerFile))
>>>>>   }
>>>>>
>>>>>   def removeMarker(markerFile: String): Unit = {
>>>>>     val fs = FileSystem.get(new Configuration())
>>>>>     fs.delete(new Path(markerFile), true)
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>> case class MyState(var dateTime: String, var inProgress: Boolean = true,
var count: Int = 1)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> package com.myorg
>>>>>
>>>>> import org.apache.spark.sql.streaming.StreamingQueryListener
>>>>>
>>>>> class QueryListener extends StreamingQueryListener {
>>>>>
>>>>>   def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent):
Unit = {}
>>>>>
>>>>>   def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
Unit = {
>>>>>     if (event.progress.numInputRows != 0) {
>>>>>       println(
>>>>>         s"InputRows: ${event.progress.numInputRows}"
>>>>>       )
>>>>>     }
>>>>>   }
>>>>>
>>>>>   def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent):
Unit = {
>>>>>     println(s"Query with id ${event.id} terminated")
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
>>>>>   <modelVersion>4.0.0</modelVersion>
>>>>>   <groupId>com.myorg</groupId>
>>>>>   <artifactId>spark-3-conversion</artifactId>
>>>>>   <packaging>jar</packaging>
>>>>>   <version>1.0-SNAPSHOT</version>
>>>>>   <name>spark-3-conversion</name>
>>>>>   <url>http://maven.apache.org</url>
>>>>>
>>>>>   <properties>
>>>>>     <spark.version>3.0.0</spark.version>
>>>>>     <scala.binary.version>2.12</scala.binary.version>
>>>>>     <scala.version>2.12.10</scala.version>
>>>>>     <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
>>>>>     <skipTests>true</skipTests>
>>>>>     <maven.compiler.source>1.5</maven.compiler.source>
>>>>>     <maven.compiler.target>1.5</maven.compiler.target>
>>>>>     <encoding>UTF-8</encoding>
>>>>>   </properties>
>>>>>
>>>>>
>>>>>   <dependencies>
>>>>>     <dependency>
>>>>>       <groupId>org.scala-lang</groupId>
>>>>>       <artifactId>scala-library</artifactId>
>>>>>       <version>${scala.version}</version>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       <artifactId>spark-core_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>       <scope>provided</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>       <scope>provided</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       <artifactId>spark-sql_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>       <scope>provided</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.spark</groupId>
>>>>>       <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>       <version>${spark.version}</version>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.slf4j</groupId>
>>>>>       <artifactId>slf4j-log4j12</artifactId>
>>>>>       <version>1.7.7</version>
>>>>>       <scope>runtime</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>log4j</groupId>
>>>>>       <artifactId>log4j</artifactId>
>>>>>       <version>1.2.17</version>
>>>>>       <scope>compile</scope>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.scalatest</groupId>
>>>>>       <artifactId>scalatest_${scala.binary.version}</artifactId>
>>>>>       <version>3.0.7</version>
>>>>>       <scope>test</scope>
>>>>>     </dependency>
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>junit</groupId>
>>>>>       <artifactId>junit</artifactId>
>>>>>       <version>3.8.1</version>
>>>>>       <scope>test</scope>
>>>>>     </dependency>
>>>>>   </dependencies>
>>>>>
>>>>>   <build>
>>>>>     <plugins>
>>>>>       <plugin>
>>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>>         <artifactId>maven-shade-plugin</artifactId>
>>>>>         <version>3.0.0</version>
>>>>>         <executions>
>>>>>           <!-- Run shade goal on package phase -->
>>>>>           <execution>
>>>>>             <phase>install</phase>
>>>>>             <goals>
>>>>>               <goal>shade</goal>
>>>>>             </goals>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>
>>>>>       <!-- Scala Compiler -->
>>>>>       <plugin>
>>>>>         <groupId>net.alchim31.maven</groupId>
>>>>>         <artifactId>scala-maven-plugin</artifactId>
>>>>>         <version>3.2.2</version>
>>>>>         <executions>
>>>>>           <execution>
>>>>>             <goals>
>>>>>               <goal>compile</goal>
>>>>>               <goal>testCompile</goal>
>>>>>             </goals>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>
>>>>>       <plugin>
>>>>>         <groupId>org.codehaus.mojo</groupId>
>>>>>         <artifactId>build-helper-maven-plugin</artifactId>
>>>>>         <version>1.7</version>
>>>>>         <executions>
>>>>>           <!-- Add src/main/scala to eclipse build path -->
>>>>>           <execution>
>>>>>             <id>add-source</id>
>>>>>             <phase>generate-sources</phase>
>>>>>             <goals>
>>>>>               <goal>add-source</goal>
>>>>>             </goals>
>>>>>             <configuration>
>>>>>               <sources>
>>>>>                 <source>src/main/scala</source>
>>>>>               </sources>
>>>>>             </configuration>
>>>>>           </execution>
>>>>>           <!-- Add src/test/scala to eclipse build path -->
>>>>>           <execution>
>>>>>             <id>add-test-source</id>
>>>>>             <phase>generate-test-sources</phase>
>>>>>             <goals>
>>>>>               <goal>add-test-source</goal>
>>>>>             </goals>
>>>>>             <configuration>
>>>>>               <sources>
>>>>>                 <source>src/test/scala</source>
>>>>>               </sources>
>>>>>             </configuration>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>
>>>>>       <!-- we disable surefile and enable scalatest so that maven
can run our tests -->
>>>>>       <plugin>
>>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>>         <artifactId>maven-surefire-plugin</artifactId>
>>>>>         <version>2.7</version>
>>>>>         <configuration>
>>>>>           <skipTests>true</skipTests>
>>>>>         </configuration>
>>>>>       </plugin>
>>>>>       <plugin>
>>>>>         <groupId>org.scalatest</groupId>
>>>>>         <artifactId>scalatest-maven-plugin</artifactId>
>>>>>         <version>1.0</version>
>>>>>         <configuration>
>>>>>           <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>>>>>           <junitxml>.</junitxml>
>>>>>           <filereports>WDF TestSuite.txt</filereports>
>>>>>         </configuration>
>>>>>         <executions>
>>>>>           <execution>
>>>>>             <id>test</id>
>>>>>             <goals>
>>>>>               <goal>test</goal>
>>>>>             </goals>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>       <plugin>
>>>>>         <groupId>org.scalastyle</groupId>
>>>>>         <artifactId>scalastyle-maven-plugin</artifactId>
>>>>>         <version>1.0.0</version>
>>>>>         <configuration>
>>>>>           <verbose>false</verbose>
>>>>>           <failOnViolation>true</failOnViolation>
>>>>>           <includeTestSourceDirectory>true</includeTestSourceDirectory>
>>>>>           <failOnWarning>false</failOnWarning>
>>>>>           <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
>>>>>           <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
>>>>>           <configLocation>lib/scalastyle_config.xml</configLocation>
>>>>>           <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
>>>>>           <outputEncoding>UTF-8</outputEncoding>
>>>>>         </configuration>
>>>>>         <executions>
>>>>>           <execution>
>>>>>             <goals>
>>>>>               <goal>check</goal>
>>>>>             </goals>
>>>>>           </execution>
>>>>>         </executions>
>>>>>       </plugin>
>>>>>       <plugin>
>>>>>         <groupId>org.sonarsource.scanner.maven</groupId>
>>>>>         <artifactId>sonar-maven-plugin</artifactId>
>>>>>         <version>3.6.0.1398</version>
>>>>>       </plugin>
>>>>>
>>>>>     </plugins>
>>>>>   </build>
>>>>> </project>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglists19@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Ok. I will work on creating a reproducible app. Thanks.
>>>>>>
>>>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <
>>>>>> gabor.g.somogyi@gmail.com> wrote:
>>>>>>
>>>>>>> Just reached this thread. +1 on to create a simple reproducer
app
>>>>>>> and I suggest to create a jira attaching the full driver and
executor logs.
>>>>>>> Ping me on the jira and I'll pick this up right away...
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <
>>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>>
>>>>>>>> Would you mind if I ask for a simple reproducer? Would be
nice if
>>>>>>>> you could create a repository in Github and push the code
including the
>>>>>>>> build script.
>>>>>>>>
>>>>>>>> Thanks in advance!
>>>>>>>>
>>>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <
>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I tried both. First tried 3.0.0. That didn't work so
I tried 3.1.0.
>>>>>>>>>
>>>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>>>>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Which exact Spark version did you use? Did you make
sure the
>>>>>>>>>> version for Spark and the version for spark-sql-kafka
artifact are the
>>>>>>>>>> same? (I asked this because you've said you've used
Spark 3.0 but
>>>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.)
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <
>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException:
Data
>>>>>>>>>>> source v2 streaming sinks does not support Update
mode. === Streaming Query
>>>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e,
runId =
>>>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current
Committed Offsets: {} Current
>>>>>>>>>>> Available Offsets: {} Current State: INITIALIZING
Thread State: RUNNABLE at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException:
Data source v2 streaming
>>>>>>>>>>> sinks does not support Update mode. at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>>>>>>> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>>>>>>> ... 1 more
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Please see the attached image for more information.*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski
<jacek@japila.pl>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Can you post the whole message? I'm trying
to find what might
>>>>>>>>>>>> be causing it. A small reproducible example
would be of help too. Thank you.
>>>>>>>>>>>>
>>>>>>>>>>>> Pozdrawiam,
>>>>>>>>>>>> Jacek Laskowski
>>>>>>>>>>>> ----
>>>>>>>>>>>> https://about.me/JacekLaskowski
>>>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>>>>>>
>>>>>>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes
<
>>>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Trying to port my Spark 2.4 based (Structured)
streaming
>>>>>>>>>>>>> application to Spark 3.0. I compiled
it using the dependency given below:
>>>>>>>>>>>>>
>>>>>>>>>>>>> <dependency>
>>>>>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>>>>>     <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>>>>>     <version>3.1.0</version>
>>>>>>>>>>>>> </dependency>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Every time I run it under Spark 3.0,
I get this message: *Data
>>>>>>>>>>>>> source v2 streaming sinks does not support
Update mode*
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am using '*mapGroupsWithState*' so
as per this link (
>>>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>>>>>>> the only supported Output mode is "*Update*".
>>>>>>>>>>>>>
>>>>>>>>>>>>> My Sink is a Kafka topic so I am using
this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> .writeStream
>>>>>>>>>>>>> .format("kafka")
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> What am I missing?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>

Mime
View raw message