spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Beabes <mailinglist...@gmail.com>
Subject Re: Data source v2 streaming sinks does not support Update mode
Date Tue, 19 Jan 2021 06:41:18 GMT
Confirmed. The cluster Admin said his team installed the latest version
from Cloudera which comes with Spark 3.0.0-preview2. They are going to try
to upgrade it with the Community edition Spark 3.1.0.

Thanks Jungtaek for the tip. Greatly appreciate it.

On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes <mailinglists19@gmail.com>
wrote:

> >> "Could you please make sure you're not using "3.0.0-preview".
>
> This could be the reason. I will check with our Hadoop cluster
> administrator. It's quite possible that they installed the "Preview" mode.
> Yes, the code works in the Local dev environment.
>
>
> On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <kabhwan.opensource@gmail.com>
> wrote:
>
>> I see no issue from running this code in local dev. (changed the scope of
>> Spark artifacts to "compile" of course)
>>
>> Could you please make sure you're not using "3.0.0-preview"? In
>> 3.0.0-preview update mode was restricted (as the error message says) and it
>> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
>> .m2 cache may work.
>>
>> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <
>> kabhwan.opensource@gmail.com> wrote:
>>
>>> And also include some test data as well. I quickly looked through the
>>> code and the code may require a specific format of the record.
>>>
>>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <
>>> gschiavonspark@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> This is the jira
>>>> <https://issues.apache.org/jira/projects/SPARK/summary> and regarding
>>>> the repo, I believe just commit it to your personal repo and that should
be
>>>> it.
>>>>
>>>> Regards
>>>>
>>>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglists19@gmail.com>
>>>> wrote:
>>>>
>>>>> Sorry. Can you please tell me where to create the JIRA? Also is there
>>>>> any specific Github repository I need to commit code into - OR - just
in
>>>>> our own? Please let me know. Thanks.
>>>>>
>>>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <
>>>>> gabor.g.somogyi@gmail.com> wrote:
>>>>>
>>>>>> Thanks you, as we've asked could you please create a jira and commit
>>>>>> the code into github?
>>>>>> It would speed things up a lot.
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglists19@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Here's a very simple reproducer app. I've attached 3 files:
>>>>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents
in the
>>>>>>> email as well:
>>>>>>>
>>>>>>> package com.myorg
>>>>>>>
>>>>>>> import org.apache.hadoop.conf.Configuration
>>>>>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>>>>>> import org.apache.hadoop.security.UserGroupInformation
>>>>>>> import org.apache.kafka.clients.producer.ProducerConfig
>>>>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>>>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>>>>>
>>>>>>> import scala.util.{Failure, Success, Try}
>>>>>>>
>>>>>>> object Spark3Test {
>>>>>>>
>>>>>>>   val isLocal = false
>>>>>>>
>>>>>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>>>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>>>>>
>>>>>>>   val START_DATE_INDEX = 21
>>>>>>>   val END_DATE_INDEX = 40
>>>>>>>
>>>>>>>   def main(args: Array[String]) {
>>>>>>>
>>>>>>>     val spark: SparkSession = initializeSparkSession("Spark 3.0
Upgrade", isLocal)
>>>>>>>     spark.sparkContext.setLogLevel("WARN")
>>>>>>>
>>>>>>>     readKafkaStream(spark)
>>>>>>>       .groupByKey(row => {
>>>>>>>         row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>>>>>       })
>>>>>>>       .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>         updateAcrossEvents
>>>>>>>       )
>>>>>>>       .filter(row => !row.inProgress)
>>>>>>>       .map(row => "key: " + row.dateTime + " " + "count: "
+ row.count)
>>>>>>>       .writeStream
>>>>>>>       .format("kafka")
>>>>>>>       .option(
>>>>>>>         s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>>>>>>         "10.29.42.141:9092"
>>>>>>> //        "localhost:9092"
>>>>>>>       )
>>>>>>>       .option("topic", "spark3test")
>>>>>>>       .option("checkpointLocation", "/tmp/checkpoint_5")
>>>>>>>       .outputMode("update")
>>>>>>>       .start()
>>>>>>>     manageStreamingQueries(spark)
>>>>>>>   }
>>>>>>>
>>>>>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String]
= {
>>>>>>>
>>>>>>>     val stream = sparkSession.readStream
>>>>>>>       .format("kafka")
>>>>>>>       .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>>>>>       .option("subscribe", "inputTopic")
>>>>>>>       .option("startingOffsets", "latest")
>>>>>>>       .option("failOnDataLoss", "false")
>>>>>>>       .option("kafkaConsumer.pollTimeoutMs", "120000")
>>>>>>>       .load()
>>>>>>>       .selectExpr("CAST(value AS STRING)")
>>>>>>>       .as[String](Encoders.STRING)
>>>>>>>     stream
>>>>>>>   }
>>>>>>>
>>>>>>>   def updateAcrossEvents(key: String, inputs: Iterator[String],
oldState: GroupState[MyState]): MyState = {
>>>>>>>     if (!oldState.exists) {
>>>>>>>       println(key)
>>>>>>>       val state = MyState(key)
>>>>>>>       oldState.update(state)
>>>>>>>       oldState.setTimeoutDuration("1 minutes")
>>>>>>>       oldState.get
>>>>>>>     } else {
>>>>>>>       if (oldState.hasTimedOut) {
>>>>>>>         oldState.get.inProgress = false
>>>>>>>         val state = oldState.get
>>>>>>>         println("State timed out for key: " + state.dateTime)
>>>>>>>         oldState.remove()
>>>>>>>         state
>>>>>>>       } else {
>>>>>>>         val state = oldState.get
>>>>>>>         state.count = state.count + 1
>>>>>>>         oldState.update(state)
>>>>>>>         oldState.setTimeoutDuration("1 minutes")
>>>>>>>         oldState.get
>>>>>>>       }
>>>>>>>     }
>>>>>>>   }
>>>>>>>
>>>>>>>   def initializeSparkSession(applicationName: String, isLocal:
Boolean): SparkSession = {
>>>>>>>     UserGroupInformation.setLoginUser(
>>>>>>>       UserGroupInformation.createRemoteUser("hduser")
>>>>>>>     )
>>>>>>>
>>>>>>>     val builder = SparkSession
>>>>>>>       .builder()
>>>>>>>       .appName(applicationName)
>>>>>>>
>>>>>>>     if (isLocal) {
>>>>>>>       builder.config("spark.master", "local[2]")
>>>>>>>     }
>>>>>>>
>>>>>>>     builder.getOrCreate()
>>>>>>>   }
>>>>>>>
>>>>>>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>>>>>>
>>>>>>>     val sparkQueryListener = new QueryListener()
>>>>>>>     spark.streams.addListener(sparkQueryListener)
>>>>>>>
>>>>>>>     val shutdownMarker: String = "/tmp/stop_job"
>>>>>>>
>>>>>>>     val timeoutInMilliSeconds = 60000
>>>>>>>     while (!spark.streams.active.isEmpty) {
>>>>>>>       Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds))
match {
>>>>>>>         case Success(result) =>
>>>>>>>           if (result) {
>>>>>>>             println("A streaming query was terminated successfully")
>>>>>>>             spark.streams.resetTerminated()
>>>>>>>           }
>>>>>>>         case Failure(e) =>
>>>>>>>           println("Query failed with message: " + e.getMessage)
>>>>>>>           e.printStackTrace()
>>>>>>>           spark.streams.resetTerminated()
>>>>>>>       }
>>>>>>>
>>>>>>>       if (checkMarker(shutdownMarker)) {
>>>>>>>         spark.streams.active.foreach(query => {
>>>>>>>           println(s"Stopping streaming query: ${query.id}")
>>>>>>>           query.stop()
>>>>>>>         })
>>>>>>>         spark.stop()
>>>>>>>         removeMarker(shutdownMarker)
>>>>>>>       }
>>>>>>>     }
>>>>>>>     assert(spark.streams.active.isEmpty)
>>>>>>>     spark.streams.removeListener(sparkQueryListener)
>>>>>>>   }
>>>>>>>
>>>>>>>   def checkMarker(markerFile: String): Boolean = {
>>>>>>>     val fs = FileSystem.get(new Configuration())
>>>>>>>     fs.exists(new Path(markerFile))
>>>>>>>   }
>>>>>>>
>>>>>>>   def removeMarker(markerFile: String): Unit = {
>>>>>>>     val fs = FileSystem.get(new Configuration())
>>>>>>>     fs.delete(new Path(markerFile), true)
>>>>>>>   }
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> case class MyState(var dateTime: String, var inProgress: Boolean
= true, var count: Int = 1)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> package com.myorg
>>>>>>>
>>>>>>> import org.apache.spark.sql.streaming.StreamingQueryListener
>>>>>>>
>>>>>>> class QueryListener extends StreamingQueryListener {
>>>>>>>
>>>>>>>   def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent):
Unit = {}
>>>>>>>
>>>>>>>   def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
Unit = {
>>>>>>>     if (event.progress.numInputRows != 0) {
>>>>>>>       println(
>>>>>>>         s"InputRows: ${event.progress.numInputRows}"
>>>>>>>       )
>>>>>>>     }
>>>>>>>   }
>>>>>>>
>>>>>>>   def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent):
Unit = {
>>>>>>>     println(s"Query with id ${event.id} terminated")
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
>>>>>>>   <modelVersion>4.0.0</modelVersion>
>>>>>>>   <groupId>com.myorg</groupId>
>>>>>>>   <artifactId>spark-3-conversion</artifactId>
>>>>>>>   <packaging>jar</packaging>
>>>>>>>   <version>1.0-SNAPSHOT</version>
>>>>>>>   <name>spark-3-conversion</name>
>>>>>>>   <url>http://maven.apache.org</url>
>>>>>>>
>>>>>>>   <properties>
>>>>>>>     <spark.version>3.0.0</spark.version>
>>>>>>>     <scala.binary.version>2.12</scala.binary.version>
>>>>>>>     <scala.version>2.12.10</scala.version>
>>>>>>>     <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
>>>>>>>     <skipTests>true</skipTests>
>>>>>>>     <maven.compiler.source>1.5</maven.compiler.source>
>>>>>>>     <maven.compiler.target>1.5</maven.compiler.target>
>>>>>>>     <encoding>UTF-8</encoding>
>>>>>>>   </properties>
>>>>>>>
>>>>>>>
>>>>>>>   <dependencies>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.scala-lang</groupId>
>>>>>>>       <artifactId>scala-library</artifactId>
>>>>>>>       <version>${scala.version}</version>
>>>>>>>     </dependency>
>>>>>>>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>       <artifactId>spark-core_${scala.binary.version}</artifactId>
>>>>>>>       <version>${spark.version}</version>
>>>>>>>       <scope>provided</scope>
>>>>>>>     </dependency>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>>>>>>>       <version>${spark.version}</version>
>>>>>>>       <scope>provided</scope>
>>>>>>>     </dependency>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>       <artifactId>spark-sql_${scala.binary.version}</artifactId>
>>>>>>>       <version>${spark.version}</version>
>>>>>>>       <scope>provided</scope>
>>>>>>>     </dependency>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>       <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>       <version>${spark.version}</version>
>>>>>>>     </dependency>
>>>>>>>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>       <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>       <version>${spark.version}</version>
>>>>>>>     </dependency>
>>>>>>>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.slf4j</groupId>
>>>>>>>       <artifactId>slf4j-log4j12</artifactId>
>>>>>>>       <version>1.7.7</version>
>>>>>>>       <scope>runtime</scope>
>>>>>>>     </dependency>
>>>>>>>     <dependency>
>>>>>>>       <groupId>log4j</groupId>
>>>>>>>       <artifactId>log4j</artifactId>
>>>>>>>       <version>1.2.17</version>
>>>>>>>       <scope>compile</scope>
>>>>>>>     </dependency>
>>>>>>>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.scalatest</groupId>
>>>>>>>       <artifactId>scalatest_${scala.binary.version}</artifactId>
>>>>>>>       <version>3.0.7</version>
>>>>>>>       <scope>test</scope>
>>>>>>>     </dependency>
>>>>>>>
>>>>>>>     <dependency>
>>>>>>>       <groupId>junit</groupId>
>>>>>>>       <artifactId>junit</artifactId>
>>>>>>>       <version>3.8.1</version>
>>>>>>>       <scope>test</scope>
>>>>>>>     </dependency>
>>>>>>>   </dependencies>
>>>>>>>
>>>>>>>   <build>
>>>>>>>     <plugins>
>>>>>>>       <plugin>
>>>>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>>>>         <artifactId>maven-shade-plugin</artifactId>
>>>>>>>         <version>3.0.0</version>
>>>>>>>         <executions>
>>>>>>>           <!-- Run shade goal on package phase -->
>>>>>>>           <execution>
>>>>>>>             <phase>install</phase>
>>>>>>>             <goals>
>>>>>>>               <goal>shade</goal>
>>>>>>>             </goals>
>>>>>>>           </execution>
>>>>>>>         </executions>
>>>>>>>       </plugin>
>>>>>>>
>>>>>>>       <!-- Scala Compiler -->
>>>>>>>       <plugin>
>>>>>>>         <groupId>net.alchim31.maven</groupId>
>>>>>>>         <artifactId>scala-maven-plugin</artifactId>
>>>>>>>         <version>3.2.2</version>
>>>>>>>         <executions>
>>>>>>>           <execution>
>>>>>>>             <goals>
>>>>>>>               <goal>compile</goal>
>>>>>>>               <goal>testCompile</goal>
>>>>>>>             </goals>
>>>>>>>           </execution>
>>>>>>>         </executions>
>>>>>>>       </plugin>
>>>>>>>
>>>>>>>       <plugin>
>>>>>>>         <groupId>org.codehaus.mojo</groupId>
>>>>>>>         <artifactId>build-helper-maven-plugin</artifactId>
>>>>>>>         <version>1.7</version>
>>>>>>>         <executions>
>>>>>>>           <!-- Add src/main/scala to eclipse build path -->
>>>>>>>           <execution>
>>>>>>>             <id>add-source</id>
>>>>>>>             <phase>generate-sources</phase>
>>>>>>>             <goals>
>>>>>>>               <goal>add-source</goal>
>>>>>>>             </goals>
>>>>>>>             <configuration>
>>>>>>>               <sources>
>>>>>>>                 <source>src/main/scala</source>
>>>>>>>               </sources>
>>>>>>>             </configuration>
>>>>>>>           </execution>
>>>>>>>           <!-- Add src/test/scala to eclipse build path -->
>>>>>>>           <execution>
>>>>>>>             <id>add-test-source</id>
>>>>>>>             <phase>generate-test-sources</phase>
>>>>>>>             <goals>
>>>>>>>               <goal>add-test-source</goal>
>>>>>>>             </goals>
>>>>>>>             <configuration>
>>>>>>>               <sources>
>>>>>>>                 <source>src/test/scala</source>
>>>>>>>               </sources>
>>>>>>>             </configuration>
>>>>>>>           </execution>
>>>>>>>         </executions>
>>>>>>>       </plugin>
>>>>>>>
>>>>>>>       <!-- we disable surefile and enable scalatest so that
maven can run our tests -->
>>>>>>>       <plugin>
>>>>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>>>>         <artifactId>maven-surefire-plugin</artifactId>
>>>>>>>         <version>2.7</version>
>>>>>>>         <configuration>
>>>>>>>           <skipTests>true</skipTests>
>>>>>>>         </configuration>
>>>>>>>       </plugin>
>>>>>>>       <plugin>
>>>>>>>         <groupId>org.scalatest</groupId>
>>>>>>>         <artifactId>scalatest-maven-plugin</artifactId>
>>>>>>>         <version>1.0</version>
>>>>>>>         <configuration>
>>>>>>>           <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>>>>>>>           <junitxml>.</junitxml>
>>>>>>>           <filereports>WDF TestSuite.txt</filereports>
>>>>>>>         </configuration>
>>>>>>>         <executions>
>>>>>>>           <execution>
>>>>>>>             <id>test</id>
>>>>>>>             <goals>
>>>>>>>               <goal>test</goal>
>>>>>>>             </goals>
>>>>>>>           </execution>
>>>>>>>         </executions>
>>>>>>>       </plugin>
>>>>>>>       <plugin>
>>>>>>>         <groupId>org.scalastyle</groupId>
>>>>>>>         <artifactId>scalastyle-maven-plugin</artifactId>
>>>>>>>         <version>1.0.0</version>
>>>>>>>         <configuration>
>>>>>>>           <verbose>false</verbose>
>>>>>>>           <failOnViolation>true</failOnViolation>
>>>>>>>           <includeTestSourceDirectory>true</includeTestSourceDirectory>
>>>>>>>           <failOnWarning>false</failOnWarning>
>>>>>>>           <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
>>>>>>>           <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
>>>>>>>           <configLocation>lib/scalastyle_config.xml</configLocation>
>>>>>>>           <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
>>>>>>>           <outputEncoding>UTF-8</outputEncoding>
>>>>>>>         </configuration>
>>>>>>>         <executions>
>>>>>>>           <execution>
>>>>>>>             <goals>
>>>>>>>               <goal>check</goal>
>>>>>>>             </goals>
>>>>>>>           </execution>
>>>>>>>         </executions>
>>>>>>>       </plugin>
>>>>>>>       <plugin>
>>>>>>>         <groupId>org.sonarsource.scanner.maven</groupId>
>>>>>>>         <artifactId>sonar-maven-plugin</artifactId>
>>>>>>>         <version>3.6.0.1398</version>
>>>>>>>       </plugin>
>>>>>>>
>>>>>>>     </plugins>
>>>>>>>   </build>
>>>>>>> </project>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <
>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>
>>>>>>>> Ok. I will work on creating a reproducible app. Thanks.
>>>>>>>>
>>>>>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <
>>>>>>>> gabor.g.somogyi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Just reached this thread. +1 on to create a simple reproducer
app
>>>>>>>>> and I suggest to create a jira attaching the full driver
and executor logs.
>>>>>>>>> Ping me on the jira and I'll pick this up right away...
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> G
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <
>>>>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Would you mind if I ask for a simple reproducer?
Would be nice if
>>>>>>>>>> you could create a repository in Github and push
the code including the
>>>>>>>>>> build script.
>>>>>>>>>>
>>>>>>>>>> Thanks in advance!
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <
>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I tried both. First tried 3.0.0. That didn't
work so I
>>>>>>>>>>> tried 3.1.0.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim
<
>>>>>>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Which exact Spark version did you use? Did
you make sure the
>>>>>>>>>>>> version for Spark and the version for spark-sql-kafka
artifact are the
>>>>>>>>>>>> same? (I asked this because you've said you've
used Spark 3.0 but
>>>>>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.)
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes
<
>>>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException:
Data
>>>>>>>>>>>>> source v2 streaming sinks does not support
Update mode. === Streaming Query
>>>>>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e,
runId =
>>>>>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29]
Current Committed Offsets: {} Current
>>>>>>>>>>>>> Available Offsets: {} Current State:
INITIALIZING Thread State: RUNNABLE at
>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException:
Data source v2 streaming
>>>>>>>>>>>>> sinks does not support Update mode. at
>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>>>>>>>>> at
>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>>>>>>>>> ... 1 more
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Please see the attached image for more
information.*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek
Laskowski <
>>>>>>>>>>>>> jacek@japila.pl> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you post the whole message? I'm
trying to find what might
>>>>>>>>>>>>>> be causing it. A small reproducible
example would be of help too. Thank you.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Pozdrawiam,
>>>>>>>>>>>>>> Jacek Laskowski
>>>>>>>>>>>>>> ----
>>>>>>>>>>>>>> https://about.me/JacekLaskowski
>>>>>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric
Beabes <
>>>>>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Trying to port my Spark 2.4 based
(Structured) streaming
>>>>>>>>>>>>>>> application to Spark 3.0. I compiled
it using the dependency given below:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> <dependency>
>>>>>>>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>>>>>>>     <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>>>>>>>     <version>3.1.0</version>
>>>>>>>>>>>>>>> </dependency>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Every time I run it under Spark
3.0, I get this message: *Data
>>>>>>>>>>>>>>> source v2 streaming sinks does
not support Update mode*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am using '*mapGroupsWithState*'
so as per this link (
>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>>>>>>>>> the only supported Output mode
is "*Update*".
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My Sink is a Kafka topic so I
am using this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .writeStream
>>>>>>>>>>>>>>> .format("kafka")
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What am I missing?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>

Mime
View raw message