spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabhwan.opensou...@gmail.com>
Subject Re: Data source v2 streaming sinks does not support Update mode
Date Mon, 18 Jan 2021 23:50:46 GMT
And also include some test data as well. I quickly looked through the code
and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <gschiavonspark@gmail.com>
wrote:

> Hi,
>
> This is the jira <https://issues.apache.org/jira/projects/SPARK/summary> and
> regarding the repo, I believe just commit it to your personal repo and that
> should be it.
>
> Regards
>
> On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglists19@gmail.com>
> wrote:
>
>> Sorry. Can you please tell me where to create the JIRA? Also is there any
>> specific Github repository I need to commit code into - OR - just in our
>> own? Please let me know. Thanks.
>>
>> On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
>> wrote:
>>
>>> Thanks you, as we've asked could you please create a jira and commit the
>>> code into github?
>>> It would speed things up a lot.
>>>
>>> G
>>>
>>>
>>> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglists19@gmail.com>
>>> wrote:
>>>
>>>> Here's a very simple reproducer app. I've attached 3 files:
>>>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>>>> email as well:
>>>>
>>>> package com.myorg
>>>>
>>>> import org.apache.hadoop.conf.Configuration
>>>> import org.apache.hadoop.fs.{FileSystem, Path}
>>>> import org.apache.hadoop.security.UserGroupInformation
>>>> import org.apache.kafka.clients.producer.ProducerConfig
>>>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>>>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>>>
>>>> import scala.util.{Failure, Success, Try}
>>>>
>>>> object Spark3Test {
>>>>
>>>>   val isLocal = false
>>>>
>>>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>>>
>>>>   val START_DATE_INDEX = 21
>>>>   val END_DATE_INDEX = 40
>>>>
>>>>   def main(args: Array[String]) {
>>>>
>>>>     val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade",
isLocal)
>>>>     spark.sparkContext.setLogLevel("WARN")
>>>>
>>>>     readKafkaStream(spark)
>>>>       .groupByKey(row => {
>>>>         row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>>>       })
>>>>       .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>         updateAcrossEvents
>>>>       )
>>>>       .filter(row => !row.inProgress)
>>>>       .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>>>       .writeStream
>>>>       .format("kafka")
>>>>       .option(
>>>>         s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>>>         "10.29.42.141:9092"
>>>> //        "localhost:9092"
>>>>       )
>>>>       .option("topic", "spark3test")
>>>>       .option("checkpointLocation", "/tmp/checkpoint_5")
>>>>       .outputMode("update")
>>>>       .start()
>>>>     manageStreamingQueries(spark)
>>>>   }
>>>>
>>>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>>>
>>>>     val stream = sparkSession.readStream
>>>>       .format("kafka")
>>>>       .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>>>       .option("subscribe", "inputTopic")
>>>>       .option("startingOffsets", "latest")
>>>>       .option("failOnDataLoss", "false")
>>>>       .option("kafkaConsumer.pollTimeoutMs", "120000")
>>>>       .load()
>>>>       .selectExpr("CAST(value AS STRING)")
>>>>       .as[String](Encoders.STRING)
>>>>     stream
>>>>   }
>>>>
>>>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState:
GroupState[MyState]): MyState = {
>>>>     if (!oldState.exists) {
>>>>       println(key)
>>>>       val state = MyState(key)
>>>>       oldState.update(state)
>>>>       oldState.setTimeoutDuration("1 minutes")
>>>>       oldState.get
>>>>     } else {
>>>>       if (oldState.hasTimedOut) {
>>>>         oldState.get.inProgress = false
>>>>         val state = oldState.get
>>>>         println("State timed out for key: " + state.dateTime)
>>>>         oldState.remove()
>>>>         state
>>>>       } else {
>>>>         val state = oldState.get
>>>>         state.count = state.count + 1
>>>>         oldState.update(state)
>>>>         oldState.setTimeoutDuration("1 minutes")
>>>>         oldState.get
>>>>       }
>>>>     }
>>>>   }
>>>>
>>>>   def initializeSparkSession(applicationName: String, isLocal: Boolean):
SparkSession = {
>>>>     UserGroupInformation.setLoginUser(
>>>>       UserGroupInformation.createRemoteUser("hduser")
>>>>     )
>>>>
>>>>     val builder = SparkSession
>>>>       .builder()
>>>>       .appName(applicationName)
>>>>
>>>>     if (isLocal) {
>>>>       builder.config("spark.master", "local[2]")
>>>>     }
>>>>
>>>>     builder.getOrCreate()
>>>>   }
>>>>
>>>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>>>
>>>>     val sparkQueryListener = new QueryListener()
>>>>     spark.streams.addListener(sparkQueryListener)
>>>>
>>>>     val shutdownMarker: String = "/tmp/stop_job"
>>>>
>>>>     val timeoutInMilliSeconds = 60000
>>>>     while (!spark.streams.active.isEmpty) {
>>>>       Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match
{
>>>>         case Success(result) =>
>>>>           if (result) {
>>>>             println("A streaming query was terminated successfully")
>>>>             spark.streams.resetTerminated()
>>>>           }
>>>>         case Failure(e) =>
>>>>           println("Query failed with message: " + e.getMessage)
>>>>           e.printStackTrace()
>>>>           spark.streams.resetTerminated()
>>>>       }
>>>>
>>>>       if (checkMarker(shutdownMarker)) {
>>>>         spark.streams.active.foreach(query => {
>>>>           println(s"Stopping streaming query: ${query.id}")
>>>>           query.stop()
>>>>         })
>>>>         spark.stop()
>>>>         removeMarker(shutdownMarker)
>>>>       }
>>>>     }
>>>>     assert(spark.streams.active.isEmpty)
>>>>     spark.streams.removeListener(sparkQueryListener)
>>>>   }
>>>>
>>>>   def checkMarker(markerFile: String): Boolean = {
>>>>     val fs = FileSystem.get(new Configuration())
>>>>     fs.exists(new Path(markerFile))
>>>>   }
>>>>
>>>>   def removeMarker(markerFile: String): Unit = {
>>>>     val fs = FileSystem.get(new Configuration())
>>>>     fs.delete(new Path(markerFile), true)
>>>>   }
>>>>
>>>> }
>>>>
>>>> case class MyState(var dateTime: String, var inProgress: Boolean = true,
var count: Int = 1)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> package com.myorg
>>>>
>>>> import org.apache.spark.sql.streaming.StreamingQueryListener
>>>>
>>>> class QueryListener extends StreamingQueryListener {
>>>>
>>>>   def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit
= {}
>>>>
>>>>   def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
Unit = {
>>>>     if (event.progress.numInputRows != 0) {
>>>>       println(
>>>>         s"InputRows: ${event.progress.numInputRows}"
>>>>       )
>>>>     }
>>>>   }
>>>>
>>>>   def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent):
Unit = {
>>>>     println(s"Query with id ${event.id} terminated")
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
>>>>   <modelVersion>4.0.0</modelVersion>
>>>>   <groupId>com.myorg</groupId>
>>>>   <artifactId>spark-3-conversion</artifactId>
>>>>   <packaging>jar</packaging>
>>>>   <version>1.0-SNAPSHOT</version>
>>>>   <name>spark-3-conversion</name>
>>>>   <url>http://maven.apache.org</url>
>>>>
>>>>   <properties>
>>>>     <spark.version>3.0.0</spark.version>
>>>>     <scala.binary.version>2.12</scala.binary.version>
>>>>     <scala.version>2.12.10</scala.version>
>>>>     <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
>>>>     <skipTests>true</skipTests>
>>>>     <maven.compiler.source>1.5</maven.compiler.source>
>>>>     <maven.compiler.target>1.5</maven.compiler.target>
>>>>     <encoding>UTF-8</encoding>
>>>>   </properties>
>>>>
>>>>
>>>>   <dependencies>
>>>>     <dependency>
>>>>       <groupId>org.scala-lang</groupId>
>>>>       <artifactId>scala-library</artifactId>
>>>>       <version>${scala.version}</version>
>>>>     </dependency>
>>>>
>>>>     <dependency>
>>>>       <groupId>org.apache.spark</groupId>
>>>>       <artifactId>spark-core_${scala.binary.version}</artifactId>
>>>>       <version>${spark.version}</version>
>>>>       <scope>provided</scope>
>>>>     </dependency>
>>>>     <dependency>
>>>>       <groupId>org.apache.spark</groupId>
>>>>       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>>>>       <version>${spark.version}</version>
>>>>       <scope>provided</scope>
>>>>     </dependency>
>>>>     <dependency>
>>>>       <groupId>org.apache.spark</groupId>
>>>>       <artifactId>spark-sql_${scala.binary.version}</artifactId>
>>>>       <version>${spark.version}</version>
>>>>       <scope>provided</scope>
>>>>     </dependency>
>>>>     <dependency>
>>>>       <groupId>org.apache.spark</groupId>
>>>>       <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
>>>>       <version>${spark.version}</version>
>>>>     </dependency>
>>>>
>>>>     <dependency>
>>>>       <groupId>org.apache.spark</groupId>
>>>>       <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>       <version>${spark.version}</version>
>>>>     </dependency>
>>>>
>>>>     <dependency>
>>>>       <groupId>org.slf4j</groupId>
>>>>       <artifactId>slf4j-log4j12</artifactId>
>>>>       <version>1.7.7</version>
>>>>       <scope>runtime</scope>
>>>>     </dependency>
>>>>     <dependency>
>>>>       <groupId>log4j</groupId>
>>>>       <artifactId>log4j</artifactId>
>>>>       <version>1.2.17</version>
>>>>       <scope>compile</scope>
>>>>     </dependency>
>>>>
>>>>     <dependency>
>>>>       <groupId>org.scalatest</groupId>
>>>>       <artifactId>scalatest_${scala.binary.version}</artifactId>
>>>>       <version>3.0.7</version>
>>>>       <scope>test</scope>
>>>>     </dependency>
>>>>
>>>>     <dependency>
>>>>       <groupId>junit</groupId>
>>>>       <artifactId>junit</artifactId>
>>>>       <version>3.8.1</version>
>>>>       <scope>test</scope>
>>>>     </dependency>
>>>>   </dependencies>
>>>>
>>>>   <build>
>>>>     <plugins>
>>>>       <plugin>
>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>         <artifactId>maven-shade-plugin</artifactId>
>>>>         <version>3.0.0</version>
>>>>         <executions>
>>>>           <!-- Run shade goal on package phase -->
>>>>           <execution>
>>>>             <phase>install</phase>
>>>>             <goals>
>>>>               <goal>shade</goal>
>>>>             </goals>
>>>>           </execution>
>>>>         </executions>
>>>>       </plugin>
>>>>
>>>>       <!-- Scala Compiler -->
>>>>       <plugin>
>>>>         <groupId>net.alchim31.maven</groupId>
>>>>         <artifactId>scala-maven-plugin</artifactId>
>>>>         <version>3.2.2</version>
>>>>         <executions>
>>>>           <execution>
>>>>             <goals>
>>>>               <goal>compile</goal>
>>>>               <goal>testCompile</goal>
>>>>             </goals>
>>>>           </execution>
>>>>         </executions>
>>>>       </plugin>
>>>>
>>>>       <plugin>
>>>>         <groupId>org.codehaus.mojo</groupId>
>>>>         <artifactId>build-helper-maven-plugin</artifactId>
>>>>         <version>1.7</version>
>>>>         <executions>
>>>>           <!-- Add src/main/scala to eclipse build path -->
>>>>           <execution>
>>>>             <id>add-source</id>
>>>>             <phase>generate-sources</phase>
>>>>             <goals>
>>>>               <goal>add-source</goal>
>>>>             </goals>
>>>>             <configuration>
>>>>               <sources>
>>>>                 <source>src/main/scala</source>
>>>>               </sources>
>>>>             </configuration>
>>>>           </execution>
>>>>           <!-- Add src/test/scala to eclipse build path -->
>>>>           <execution>
>>>>             <id>add-test-source</id>
>>>>             <phase>generate-test-sources</phase>
>>>>             <goals>
>>>>               <goal>add-test-source</goal>
>>>>             </goals>
>>>>             <configuration>
>>>>               <sources>
>>>>                 <source>src/test/scala</source>
>>>>               </sources>
>>>>             </configuration>
>>>>           </execution>
>>>>         </executions>
>>>>       </plugin>
>>>>
>>>>       <!-- we disable surefile and enable scalatest so that maven can
run our tests -->
>>>>       <plugin>
>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>         <artifactId>maven-surefire-plugin</artifactId>
>>>>         <version>2.7</version>
>>>>         <configuration>
>>>>           <skipTests>true</skipTests>
>>>>         </configuration>
>>>>       </plugin>
>>>>       <plugin>
>>>>         <groupId>org.scalatest</groupId>
>>>>         <artifactId>scalatest-maven-plugin</artifactId>
>>>>         <version>1.0</version>
>>>>         <configuration>
>>>>           <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>>>>           <junitxml>.</junitxml>
>>>>           <filereports>WDF TestSuite.txt</filereports>
>>>>         </configuration>
>>>>         <executions>
>>>>           <execution>
>>>>             <id>test</id>
>>>>             <goals>
>>>>               <goal>test</goal>
>>>>             </goals>
>>>>           </execution>
>>>>         </executions>
>>>>       </plugin>
>>>>       <plugin>
>>>>         <groupId>org.scalastyle</groupId>
>>>>         <artifactId>scalastyle-maven-plugin</artifactId>
>>>>         <version>1.0.0</version>
>>>>         <configuration>
>>>>           <verbose>false</verbose>
>>>>           <failOnViolation>true</failOnViolation>
>>>>           <includeTestSourceDirectory>true</includeTestSourceDirectory>
>>>>           <failOnWarning>false</failOnWarning>
>>>>           <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
>>>>           <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
>>>>           <configLocation>lib/scalastyle_config.xml</configLocation>
>>>>           <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
>>>>           <outputEncoding>UTF-8</outputEncoding>
>>>>         </configuration>
>>>>         <executions>
>>>>           <execution>
>>>>             <goals>
>>>>               <goal>check</goal>
>>>>             </goals>
>>>>           </execution>
>>>>         </executions>
>>>>       </plugin>
>>>>       <plugin>
>>>>         <groupId>org.sonarsource.scanner.maven</groupId>
>>>>         <artifactId>sonar-maven-plugin</artifactId>
>>>>         <version>3.6.0.1398</version>
>>>>       </plugin>
>>>>
>>>>     </plugins>
>>>>   </build>
>>>> </project>
>>>>
>>>>
>>>>
>>>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglists19@gmail.com>
>>>> wrote:
>>>>
>>>>> Ok. I will work on creating a reproducible app. Thanks.
>>>>>
>>>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <
>>>>> gabor.g.somogyi@gmail.com> wrote:
>>>>>
>>>>>> Just reached this thread. +1 on to create a simple reproducer app
and
>>>>>> I suggest to create a jira attaching the full driver and executor
logs.
>>>>>> Ping me on the jira and I'll pick this up right away...
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <
>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>
>>>>>>> Would you mind if I ask for a simple reproducer? Would be nice
if
>>>>>>> you could create a repository in Github and push the code including
the
>>>>>>> build script.
>>>>>>>
>>>>>>> Thanks in advance!
>>>>>>>
>>>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <
>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>
>>>>>>>> I tried both. First tried 3.0.0. That didn't work so I tried
3.1.0.
>>>>>>>>
>>>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>>>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Which exact Spark version did you use? Did you make sure
the
>>>>>>>>> version for Spark and the version for spark-sql-kafka
artifact are the
>>>>>>>>> same? (I asked this because you've said you've used Spark
3.0 but
>>>>>>>>> spark-sql-kafka dependency pointed to 3.1.0.)
>>>>>>>>>
>>>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <
>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException:
Data
>>>>>>>>>> source v2 streaming sinks does not support Update
mode. === Streaming Query
>>>>>>>>>> === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e,
runId =
>>>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed
Offsets: {} Current
>>>>>>>>>> Available Offsets: {} Current State: INITIALIZING
Thread State: RUNNABLE at
>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data
source v2 streaming
>>>>>>>>>> sinks does not support Update mode. at
>>>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>>>>>> at
>>>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>>>>>> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>>>>>> ... 1 more
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Please see the attached image for more information.*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <jacek@japila.pl>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Can you post the whole message? I'm trying to
find what might be
>>>>>>>>>>> causing it. A small reproducible example would
be of help too. Thank you.
>>>>>>>>>>>
>>>>>>>>>>> Pozdrawiam,
>>>>>>>>>>> Jacek Laskowski
>>>>>>>>>>> ----
>>>>>>>>>>> https://about.me/JacekLaskowski
>>>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>>>>>
>>>>>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <
>>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Trying to port my Spark 2.4 based (Structured)
streaming
>>>>>>>>>>>> application to Spark 3.0. I compiled it using
the dependency given below:
>>>>>>>>>>>>
>>>>>>>>>>>> <dependency>
>>>>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>>>>     <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>>>>     <version>3.1.0</version>
>>>>>>>>>>>> </dependency>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Every time I run it under Spark 3.0, I get
this message: *Data
>>>>>>>>>>>> source v2 streaming sinks does not support
Update mode*
>>>>>>>>>>>>
>>>>>>>>>>>> I am using '*mapGroupsWithState*' so as per
this link (
>>>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>>>>>> the only supported Output mode is "*Update*".
>>>>>>>>>>>>
>>>>>>>>>>>> My Sink is a Kafka topic so I am using this:
>>>>>>>>>>>>
>>>>>>>>>>>> .writeStream
>>>>>>>>>>>> .format("kafka")
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> What am I missing?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>
>>>>>>>>>

Mime
View raw message