spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Beabes <mailinglist...@gmail.com>
Subject Re: Data source v2 streaming sinks does not support Update mode
Date Tue, 19 Jan 2021 08:38:23 GMT
Will do, thanks!

On Tue, Jan 19, 2021 at 1:39 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
wrote:

> Thanks for double checking the version. Please report back with 3.1
> version whether it works or not.
>
> G
>
>
> On Tue, 19 Jan 2021, 07:41 Eric Beabes, <mailinglists19@gmail.com> wrote:
>
>> Confirmed. The cluster Admin said his team installed the latest version
>> from Cloudera which comes with Spark 3.0.0-preview2. They are going to try
>> to upgrade it with the Community edition Spark 3.1.0.
>>
>> Thanks Jungtaek for the tip. Greatly appreciate it.
>>
>> On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes <mailinglists19@gmail.com>
>> wrote:
>>
>>> >> "Could you please make sure you're not using "3.0.0-preview".
>>>
>>> This could be the reason. I will check with our Hadoop cluster
>>> administrator. It's quite possible that they installed the "Preview" mode.
>>> Yes, the code works in the Local dev environment.
>>>
>>>
>>> On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <
>>> kabhwan.opensource@gmail.com> wrote:
>>>
>>>> I see no issue from running this code in local dev. (changed the scope
>>>> of Spark artifacts to "compile" of course)
>>>>
>>>> Could you please make sure you're not using "3.0.0-preview"? In
>>>> 3.0.0-preview update mode was restricted (as the error message says) and
it
>>>> was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your
>>>> .m2 cache may work.
>>>>
>>>> On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <
>>>> kabhwan.opensource@gmail.com> wrote:
>>>>
>>>>> And also include some test data as well. I quickly looked through the
>>>>> code and the code may require a specific format of the record.
>>>>>
>>>>> On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <
>>>>> gschiavonspark@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> This is the jira
>>>>>> <https://issues.apache.org/jira/projects/SPARK/summary> and
>>>>>> regarding the repo, I believe just commit it to your personal repo
and that
>>>>>> should be it.
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglists19@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Sorry. Can you please tell me where to create the JIRA? Also
is
>>>>>>> there any specific Github repository I need to commit code into
- OR - just
>>>>>>> in our own? Please let me know. Thanks.
>>>>>>>
>>>>>>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <
>>>>>>> gabor.g.somogyi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks you, as we've asked could you please create a jira
and
>>>>>>>> commit the code into github?
>>>>>>>> It would speed things up a lot.
>>>>>>>>
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <
>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Here's a very simple reproducer app. I've attached 3
files:
>>>>>>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying
contents in the
>>>>>>>>> email as well:
>>>>>>>>>
>>>>>>>>> package com.myorg
>>>>>>>>>
>>>>>>>>> import org.apache.hadoop.conf.Configuration
>>>>>>>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>>>>>>>> import org.apache.hadoop.security.UserGroupInformation
>>>>>>>>> import org.apache.kafka.clients.producer.ProducerConfig
>>>>>>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>>>>>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders,
SparkSession}
>>>>>>>>>
>>>>>>>>> import scala.util.{Failure, Success, Try}
>>>>>>>>>
>>>>>>>>> object Spark3Test {
>>>>>>>>>
>>>>>>>>>   val isLocal = false
>>>>>>>>>
>>>>>>>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>>>>>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>>>>>>>
>>>>>>>>>   val START_DATE_INDEX = 21
>>>>>>>>>   val END_DATE_INDEX = 40
>>>>>>>>>
>>>>>>>>>   def main(args: Array[String]) {
>>>>>>>>>
>>>>>>>>>     val spark: SparkSession = initializeSparkSession("Spark
3.0 Upgrade", isLocal)
>>>>>>>>>     spark.sparkContext.setLogLevel("WARN")
>>>>>>>>>
>>>>>>>>>     readKafkaStream(spark)
>>>>>>>>>       .groupByKey(row => {
>>>>>>>>>         row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>>>>>>>       })
>>>>>>>>>       .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>         updateAcrossEvents
>>>>>>>>>       )
>>>>>>>>>       .filter(row => !row.inProgress)
>>>>>>>>>       .map(row => "key: " + row.dateTime + " " + "count:
" + row.count)
>>>>>>>>>       .writeStream
>>>>>>>>>       .format("kafka")
>>>>>>>>>       .option(
>>>>>>>>>         s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>>>>>>>>         "10.29.42.141:9092"
>>>>>>>>> //        "localhost:9092"
>>>>>>>>>       )
>>>>>>>>>       .option("topic", "spark3test")
>>>>>>>>>       .option("checkpointLocation", "/tmp/checkpoint_5")
>>>>>>>>>       .outputMode("update")
>>>>>>>>>       .start()
>>>>>>>>>     manageStreamingQueries(spark)
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String]
= {
>>>>>>>>>
>>>>>>>>>     val stream = sparkSession.readStream
>>>>>>>>>       .format("kafka")
>>>>>>>>>       .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>>>>>>>       .option("subscribe", "inputTopic")
>>>>>>>>>       .option("startingOffsets", "latest")
>>>>>>>>>       .option("failOnDataLoss", "false")
>>>>>>>>>       .option("kafkaConsumer.pollTimeoutMs", "120000")
>>>>>>>>>       .load()
>>>>>>>>>       .selectExpr("CAST(value AS STRING)")
>>>>>>>>>       .as[String](Encoders.STRING)
>>>>>>>>>     stream
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def updateAcrossEvents(key: String, inputs: Iterator[String],
oldState: GroupState[MyState]): MyState = {
>>>>>>>>>     if (!oldState.exists) {
>>>>>>>>>       println(key)
>>>>>>>>>       val state = MyState(key)
>>>>>>>>>       oldState.update(state)
>>>>>>>>>       oldState.setTimeoutDuration("1 minutes")
>>>>>>>>>       oldState.get
>>>>>>>>>     } else {
>>>>>>>>>       if (oldState.hasTimedOut) {
>>>>>>>>>         oldState.get.inProgress = false
>>>>>>>>>         val state = oldState.get
>>>>>>>>>         println("State timed out for key: " + state.dateTime)
>>>>>>>>>         oldState.remove()
>>>>>>>>>         state
>>>>>>>>>       } else {
>>>>>>>>>         val state = oldState.get
>>>>>>>>>         state.count = state.count + 1
>>>>>>>>>         oldState.update(state)
>>>>>>>>>         oldState.setTimeoutDuration("1 minutes")
>>>>>>>>>         oldState.get
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def initializeSparkSession(applicationName: String,
isLocal: Boolean): SparkSession = {
>>>>>>>>>     UserGroupInformation.setLoginUser(
>>>>>>>>>       UserGroupInformation.createRemoteUser("hduser")
>>>>>>>>>     )
>>>>>>>>>
>>>>>>>>>     val builder = SparkSession
>>>>>>>>>       .builder()
>>>>>>>>>       .appName(applicationName)
>>>>>>>>>
>>>>>>>>>     if (isLocal) {
>>>>>>>>>       builder.config("spark.master", "local[2]")
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     builder.getOrCreate()
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def manageStreamingQueries(spark: SparkSession): Unit
= {
>>>>>>>>>
>>>>>>>>>     val sparkQueryListener = new QueryListener()
>>>>>>>>>     spark.streams.addListener(sparkQueryListener)
>>>>>>>>>
>>>>>>>>>     val shutdownMarker: String = "/tmp/stop_job"
>>>>>>>>>
>>>>>>>>>     val timeoutInMilliSeconds = 60000
>>>>>>>>>     while (!spark.streams.active.isEmpty) {
>>>>>>>>>       Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds))
match {
>>>>>>>>>         case Success(result) =>
>>>>>>>>>           if (result) {
>>>>>>>>>             println("A streaming query was terminated
successfully")
>>>>>>>>>             spark.streams.resetTerminated()
>>>>>>>>>           }
>>>>>>>>>         case Failure(e) =>
>>>>>>>>>           println("Query failed with message: " + e.getMessage)
>>>>>>>>>           e.printStackTrace()
>>>>>>>>>           spark.streams.resetTerminated()
>>>>>>>>>       }
>>>>>>>>>
>>>>>>>>>       if (checkMarker(shutdownMarker)) {
>>>>>>>>>         spark.streams.active.foreach(query => {
>>>>>>>>>           println(s"Stopping streaming query: ${query.id}")
>>>>>>>>>           query.stop()
>>>>>>>>>         })
>>>>>>>>>         spark.stop()
>>>>>>>>>         removeMarker(shutdownMarker)
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>     assert(spark.streams.active.isEmpty)
>>>>>>>>>     spark.streams.removeListener(sparkQueryListener)
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def checkMarker(markerFile: String): Boolean = {
>>>>>>>>>     val fs = FileSystem.get(new Configuration())
>>>>>>>>>     fs.exists(new Path(markerFile))
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def removeMarker(markerFile: String): Unit = {
>>>>>>>>>     val fs = FileSystem.get(new Configuration())
>>>>>>>>>     fs.delete(new Path(markerFile), true)
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> case class MyState(var dateTime: String, var inProgress:
Boolean = true, var count: Int = 1)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> package com.myorg
>>>>>>>>>
>>>>>>>>> import org.apache.spark.sql.streaming.StreamingQueryListener
>>>>>>>>>
>>>>>>>>> class QueryListener extends StreamingQueryListener {
>>>>>>>>>
>>>>>>>>>   def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent):
Unit = {}
>>>>>>>>>
>>>>>>>>>   def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
Unit = {
>>>>>>>>>     if (event.progress.numInputRows != 0) {
>>>>>>>>>       println(
>>>>>>>>>         s"InputRows: ${event.progress.numInputRows}"
>>>>>>>>>       )
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent):
Unit = {
>>>>>>>>>     println(s"Query with id ${event.id} terminated")
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>>>>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
>>>>>>>>>   <modelVersion>4.0.0</modelVersion>
>>>>>>>>>   <groupId>com.myorg</groupId>
>>>>>>>>>   <artifactId>spark-3-conversion</artifactId>
>>>>>>>>>   <packaging>jar</packaging>
>>>>>>>>>   <version>1.0-SNAPSHOT</version>
>>>>>>>>>   <name>spark-3-conversion</name>
>>>>>>>>>   <url>http://maven.apache.org</url>
>>>>>>>>>
>>>>>>>>>   <properties>
>>>>>>>>>     <spark.version>3.0.0</spark.version>
>>>>>>>>>     <scala.binary.version>2.12</scala.binary.version>
>>>>>>>>>     <scala.version>2.12.10</scala.version>
>>>>>>>>>     <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
>>>>>>>>>     <skipTests>true</skipTests>
>>>>>>>>>     <maven.compiler.source>1.5</maven.compiler.source>
>>>>>>>>>     <maven.compiler.target>1.5</maven.compiler.target>
>>>>>>>>>     <encoding>UTF-8</encoding>
>>>>>>>>>   </properties>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>   <dependencies>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>org.scala-lang</groupId>
>>>>>>>>>       <artifactId>scala-library</artifactId>
>>>>>>>>>       <version>${scala.version}</version>
>>>>>>>>>     </dependency>
>>>>>>>>>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>>>       <artifactId>spark-core_${scala.binary.version}</artifactId>
>>>>>>>>>       <version>${spark.version}</version>
>>>>>>>>>       <scope>provided</scope>
>>>>>>>>>     </dependency>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>>>       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>>>>>>>>>       <version>${spark.version}</version>
>>>>>>>>>       <scope>provided</scope>
>>>>>>>>>     </dependency>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>>>       <artifactId>spark-sql_${scala.binary.version}</artifactId>
>>>>>>>>>       <version>${spark.version}</version>
>>>>>>>>>       <scope>provided</scope>
>>>>>>>>>     </dependency>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>>>       <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>       <version>${spark.version}</version>
>>>>>>>>>     </dependency>
>>>>>>>>>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>org.apache.spark</groupId>
>>>>>>>>>       <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>       <version>${spark.version}</version>
>>>>>>>>>     </dependency>
>>>>>>>>>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>org.slf4j</groupId>
>>>>>>>>>       <artifactId>slf4j-log4j12</artifactId>
>>>>>>>>>       <version>1.7.7</version>
>>>>>>>>>       <scope>runtime</scope>
>>>>>>>>>     </dependency>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>log4j</groupId>
>>>>>>>>>       <artifactId>log4j</artifactId>
>>>>>>>>>       <version>1.2.17</version>
>>>>>>>>>       <scope>compile</scope>
>>>>>>>>>     </dependency>
>>>>>>>>>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>org.scalatest</groupId>
>>>>>>>>>       <artifactId>scalatest_${scala.binary.version}</artifactId>
>>>>>>>>>       <version>3.0.7</version>
>>>>>>>>>       <scope>test</scope>
>>>>>>>>>     </dependency>
>>>>>>>>>
>>>>>>>>>     <dependency>
>>>>>>>>>       <groupId>junit</groupId>
>>>>>>>>>       <artifactId>junit</artifactId>
>>>>>>>>>       <version>3.8.1</version>
>>>>>>>>>       <scope>test</scope>
>>>>>>>>>     </dependency>
>>>>>>>>>   </dependencies>
>>>>>>>>>
>>>>>>>>>   <build>
>>>>>>>>>     <plugins>
>>>>>>>>>       <plugin>
>>>>>>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>>>>>>         <artifactId>maven-shade-plugin</artifactId>
>>>>>>>>>         <version>3.0.0</version>
>>>>>>>>>         <executions>
>>>>>>>>>           <!-- Run shade goal on package phase -->
>>>>>>>>>           <execution>
>>>>>>>>>             <phase>install</phase>
>>>>>>>>>             <goals>
>>>>>>>>>               <goal>shade</goal>
>>>>>>>>>             </goals>
>>>>>>>>>           </execution>
>>>>>>>>>         </executions>
>>>>>>>>>       </plugin>
>>>>>>>>>
>>>>>>>>>       <!-- Scala Compiler -->
>>>>>>>>>       <plugin>
>>>>>>>>>         <groupId>net.alchim31.maven</groupId>
>>>>>>>>>         <artifactId>scala-maven-plugin</artifactId>
>>>>>>>>>         <version>3.2.2</version>
>>>>>>>>>         <executions>
>>>>>>>>>           <execution>
>>>>>>>>>             <goals>
>>>>>>>>>               <goal>compile</goal>
>>>>>>>>>               <goal>testCompile</goal>
>>>>>>>>>             </goals>
>>>>>>>>>           </execution>
>>>>>>>>>         </executions>
>>>>>>>>>       </plugin>
>>>>>>>>>
>>>>>>>>>       <plugin>
>>>>>>>>>         <groupId>org.codehaus.mojo</groupId>
>>>>>>>>>         <artifactId>build-helper-maven-plugin</artifactId>
>>>>>>>>>         <version>1.7</version>
>>>>>>>>>         <executions>
>>>>>>>>>           <!-- Add src/main/scala to eclipse build
path -->
>>>>>>>>>           <execution>
>>>>>>>>>             <id>add-source</id>
>>>>>>>>>             <phase>generate-sources</phase>
>>>>>>>>>             <goals>
>>>>>>>>>               <goal>add-source</goal>
>>>>>>>>>             </goals>
>>>>>>>>>             <configuration>
>>>>>>>>>               <sources>
>>>>>>>>>                 <source>src/main/scala</source>
>>>>>>>>>               </sources>
>>>>>>>>>             </configuration>
>>>>>>>>>           </execution>
>>>>>>>>>           <!-- Add src/test/scala to eclipse build
path -->
>>>>>>>>>           <execution>
>>>>>>>>>             <id>add-test-source</id>
>>>>>>>>>             <phase>generate-test-sources</phase>
>>>>>>>>>             <goals>
>>>>>>>>>               <goal>add-test-source</goal>
>>>>>>>>>             </goals>
>>>>>>>>>             <configuration>
>>>>>>>>>               <sources>
>>>>>>>>>                 <source>src/test/scala</source>
>>>>>>>>>               </sources>
>>>>>>>>>             </configuration>
>>>>>>>>>           </execution>
>>>>>>>>>         </executions>
>>>>>>>>>       </plugin>
>>>>>>>>>
>>>>>>>>>       <!-- we disable surefile and enable scalatest
so that maven can run our tests -->
>>>>>>>>>       <plugin>
>>>>>>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>>>>>>         <artifactId>maven-surefire-plugin</artifactId>
>>>>>>>>>         <version>2.7</version>
>>>>>>>>>         <configuration>
>>>>>>>>>           <skipTests>true</skipTests>
>>>>>>>>>         </configuration>
>>>>>>>>>       </plugin>
>>>>>>>>>       <plugin>
>>>>>>>>>         <groupId>org.scalatest</groupId>
>>>>>>>>>         <artifactId>scalatest-maven-plugin</artifactId>
>>>>>>>>>         <version>1.0</version>
>>>>>>>>>         <configuration>
>>>>>>>>>           <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>>>>>>>>>           <junitxml>.</junitxml>
>>>>>>>>>           <filereports>WDF TestSuite.txt</filereports>
>>>>>>>>>         </configuration>
>>>>>>>>>         <executions>
>>>>>>>>>           <execution>
>>>>>>>>>             <id>test</id>
>>>>>>>>>             <goals>
>>>>>>>>>               <goal>test</goal>
>>>>>>>>>             </goals>
>>>>>>>>>           </execution>
>>>>>>>>>         </executions>
>>>>>>>>>       </plugin>
>>>>>>>>>       <plugin>
>>>>>>>>>         <groupId>org.scalastyle</groupId>
>>>>>>>>>         <artifactId>scalastyle-maven-plugin</artifactId>
>>>>>>>>>         <version>1.0.0</version>
>>>>>>>>>         <configuration>
>>>>>>>>>           <verbose>false</verbose>
>>>>>>>>>           <failOnViolation>true</failOnViolation>
>>>>>>>>>           <includeTestSourceDirectory>true</includeTestSourceDirectory>
>>>>>>>>>           <failOnWarning>false</failOnWarning>
>>>>>>>>>           <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
>>>>>>>>>           <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
>>>>>>>>>           <configLocation>lib/scalastyle_config.xml</configLocation>
>>>>>>>>>           <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
>>>>>>>>>           <outputEncoding>UTF-8</outputEncoding>
>>>>>>>>>         </configuration>
>>>>>>>>>         <executions>
>>>>>>>>>           <execution>
>>>>>>>>>             <goals>
>>>>>>>>>               <goal>check</goal>
>>>>>>>>>             </goals>
>>>>>>>>>           </execution>
>>>>>>>>>         </executions>
>>>>>>>>>       </plugin>
>>>>>>>>>       <plugin>
>>>>>>>>>         <groupId>org.sonarsource.scanner.maven</groupId>
>>>>>>>>>         <artifactId>sonar-maven-plugin</artifactId>
>>>>>>>>>         <version>3.6.0.1398</version>
>>>>>>>>>       </plugin>
>>>>>>>>>
>>>>>>>>>     </plugins>
>>>>>>>>>   </build>
>>>>>>>>> </project>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <
>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Ok. I will work on creating a reproducible app. Thanks.
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <
>>>>>>>>>> gabor.g.somogyi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just reached this thread. +1 on to create a simple
reproducer
>>>>>>>>>>> app and I suggest to create a jira attaching
the full driver and executor
>>>>>>>>>>> logs.
>>>>>>>>>>> Ping me on the jira and I'll pick this up right
away...
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>> G
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim
<
>>>>>>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Would you mind if I ask for a simple reproducer?
Would be nice
>>>>>>>>>>>> if you could create a repository in Github
and push the code including the
>>>>>>>>>>>> build script.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes
<
>>>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I tried both. First tried 3.0.0. That
didn't work so I
>>>>>>>>>>>>> tried 3.1.0.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek
Lim <
>>>>>>>>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Which exact Spark version did you
use? Did you make sure the
>>>>>>>>>>>>>> version for Spark and the version
for spark-sql-kafka artifact are the
>>>>>>>>>>>>>> same? (I asked this because you've
said you've used Spark 3.0 but
>>>>>>>>>>>>>> spark-sql-kafka dependency pointed
to 3.1.0.)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM
Eric Beabes <
>>>>>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException:
Data
>>>>>>>>>>>>>>> source v2 streaming sinks does
not support Update mode. === Streaming Query
>>>>>>>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e,
runId =
>>>>>>>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29]
Current Committed Offsets: {} Current
>>>>>>>>>>>>>>> Available Offsets: {} Current
State: INITIALIZING Thread State: RUNNABLE at
>>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException:
Data source v2 streaming
>>>>>>>>>>>>>>> sinks does not support Update
mode. at
>>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>>>>>>>>>>> ... 1 more
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Please see the attached image
for more information.*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:01
PM Jacek Laskowski <
>>>>>>>>>>>>>>> jacek@japila.pl> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Can you post the whole message?
I'm trying to find what
>>>>>>>>>>>>>>>> might be causing it. A small
reproducible example would be of help too.
>>>>>>>>>>>>>>>> Thank you.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Pozdrawiam,
>>>>>>>>>>>>>>>> Jacek Laskowski
>>>>>>>>>>>>>>>> ----
>>>>>>>>>>>>>>>> https://about.me/JacekLaskowski
>>>>>>>>>>>>>>>> "The Internals Of" Online
Books <https://books.japila.pl/>
>>>>>>>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35
AM Eric Beabes <
>>>>>>>>>>>>>>>> mailinglists19@gmail.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Trying to port my Spark
2.4 based (Structured) streaming
>>>>>>>>>>>>>>>>> application to Spark
3.0. I compiled it using the dependency given below:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> <dependency>
>>>>>>>>>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>>>>>>>>>     <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>>>>>>>>>     <version>3.1.0</version>
>>>>>>>>>>>>>>>>> </dependency>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Every time I run it under
Spark 3.0, I get this message: *Data
>>>>>>>>>>>>>>>>> source v2 streaming sinks
does not support Update mode*
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am using '*mapGroupsWithState*'
so as per this link (
>>>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>>>>>>>>>>> the only supported Output
mode is "*Update*".
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My Sink is a Kafka topic
so I am using this:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> .writeStream
>>>>>>>>>>>>>>>>> .format("kafka")
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What am I missing?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Mime
View raw message