spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabor Somogyi <gabor.g.somo...@gmail.com>
Subject Re: Data source v2 streaming sinks does not support Update mode
Date Mon, 18 Jan 2021 13:37:41 GMT
Thanks you, as we've asked could you please create a jira and commit the
code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglists19@gmail.com>
wrote:

> Here's a very simple reproducer app. I've attached 3 files:
> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
> email as well:
>
> package com.myorg
>
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.hadoop.security.UserGroupInformation
> import org.apache.kafka.clients.producer.ProducerConfig
> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>
> import scala.util.{Failure, Success, Try}
>
> object Spark3Test {
>
>   val isLocal = false
>
>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>
>   val START_DATE_INDEX = 21
>   val END_DATE_INDEX = 40
>
>   def main(args: Array[String]) {
>
>     val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
>     spark.sparkContext.setLogLevel("WARN")
>
>     readKafkaStream(spark)
>       .groupByKey(row => {
>         row.substring(START_DATE_INDEX, END_DATE_INDEX)
>       })
>       .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>         updateAcrossEvents
>       )
>       .filter(row => !row.inProgress)
>       .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>       .writeStream
>       .format("kafka")
>       .option(
>         s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>         "10.29.42.141:9092"
> //        "localhost:9092"
>       )
>       .option("topic", "spark3test")
>       .option("checkpointLocation", "/tmp/checkpoint_5")
>       .outputMode("update")
>       .start()
>     manageStreamingQueries(spark)
>   }
>
>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>
>     val stream = sparkSession.readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>       .option("subscribe", "inputTopic")
>       .option("startingOffsets", "latest")
>       .option("failOnDataLoss", "false")
>       .option("kafkaConsumer.pollTimeoutMs", "120000")
>       .load()
>       .selectExpr("CAST(value AS STRING)")
>       .as[String](Encoders.STRING)
>     stream
>   }
>
>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]):
MyState = {
>     if (!oldState.exists) {
>       println(key)
>       val state = MyState(key)
>       oldState.update(state)
>       oldState.setTimeoutDuration("1 minutes")
>       oldState.get
>     } else {
>       if (oldState.hasTimedOut) {
>         oldState.get.inProgress = false
>         val state = oldState.get
>         println("State timed out for key: " + state.dateTime)
>         oldState.remove()
>         state
>       } else {
>         val state = oldState.get
>         state.count = state.count + 1
>         oldState.update(state)
>         oldState.setTimeoutDuration("1 minutes")
>         oldState.get
>       }
>     }
>   }
>
>   def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession
= {
>     UserGroupInformation.setLoginUser(
>       UserGroupInformation.createRemoteUser("hduser")
>     )
>
>     val builder = SparkSession
>       .builder()
>       .appName(applicationName)
>
>     if (isLocal) {
>       builder.config("spark.master", "local[2]")
>     }
>
>     builder.getOrCreate()
>   }
>
>   def manageStreamingQueries(spark: SparkSession): Unit = {
>
>     val sparkQueryListener = new QueryListener()
>     spark.streams.addListener(sparkQueryListener)
>
>     val shutdownMarker: String = "/tmp/stop_job"
>
>     val timeoutInMilliSeconds = 60000
>     while (!spark.streams.active.isEmpty) {
>       Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
>         case Success(result) =>
>           if (result) {
>             println("A streaming query was terminated successfully")
>             spark.streams.resetTerminated()
>           }
>         case Failure(e) =>
>           println("Query failed with message: " + e.getMessage)
>           e.printStackTrace()
>           spark.streams.resetTerminated()
>       }
>
>       if (checkMarker(shutdownMarker)) {
>         spark.streams.active.foreach(query => {
>           println(s"Stopping streaming query: ${query.id}")
>           query.stop()
>         })
>         spark.stop()
>         removeMarker(shutdownMarker)
>       }
>     }
>     assert(spark.streams.active.isEmpty)
>     spark.streams.removeListener(sparkQueryListener)
>   }
>
>   def checkMarker(markerFile: String): Boolean = {
>     val fs = FileSystem.get(new Configuration())
>     fs.exists(new Path(markerFile))
>   }
>
>   def removeMarker(markerFile: String): Unit = {
>     val fs = FileSystem.get(new Configuration())
>     fs.delete(new Path(markerFile), true)
>   }
>
> }
>
> case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int
= 1)
>
>
>
>
>
>
>
> package com.myorg
>
> import org.apache.spark.sql.streaming.StreamingQueryListener
>
> class QueryListener extends StreamingQueryListener {
>
>   def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
>
>   def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
>     if (event.progress.numInputRows != 0) {
>       println(
>         s"InputRows: ${event.progress.numInputRows}"
>       )
>     }
>   }
>
>   def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
>     println(s"Query with id ${event.id} terminated")
>   }
> }
>
>
>
>
>
>
>
> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
>   <modelVersion>4.0.0</modelVersion>
>   <groupId>com.myorg</groupId>
>   <artifactId>spark-3-conversion</artifactId>
>   <packaging>jar</packaging>
>   <version>1.0-SNAPSHOT</version>
>   <name>spark-3-conversion</name>
>   <url>http://maven.apache.org</url>
>
>   <properties>
>     <spark.version>3.0.0</spark.version>
>     <scala.binary.version>2.12</scala.binary.version>
>     <scala.version>2.12.10</scala.version>
>     <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
>     <skipTests>true</skipTests>
>     <maven.compiler.source>1.5</maven.compiler.source>
>     <maven.compiler.target>1.5</maven.compiler.target>
>     <encoding>UTF-8</encoding>
>   </properties>
>
>
>   <dependencies>
>     <dependency>
>       <groupId>org.scala-lang</groupId>
>       <artifactId>scala-library</artifactId>
>       <version>${scala.version}</version>
>     </dependency>
>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-core_${scala.binary.version}</artifactId>
>       <version>${spark.version}</version>
>       <scope>provided</scope>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>       <version>${spark.version}</version>
>       <scope>provided</scope>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-sql_${scala.binary.version}</artifactId>
>       <version>${spark.version}</version>
>       <scope>provided</scope>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
>       <version>${spark.version}</version>
>     </dependency>
>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>       <version>${spark.version}</version>
>     </dependency>
>
>     <dependency>
>       <groupId>org.slf4j</groupId>
>       <artifactId>slf4j-log4j12</artifactId>
>       <version>1.7.7</version>
>       <scope>runtime</scope>
>     </dependency>
>     <dependency>
>       <groupId>log4j</groupId>
>       <artifactId>log4j</artifactId>
>       <version>1.2.17</version>
>       <scope>compile</scope>
>     </dependency>
>
>     <dependency>
>       <groupId>org.scalatest</groupId>
>       <artifactId>scalatest_${scala.binary.version}</artifactId>
>       <version>3.0.7</version>
>       <scope>test</scope>
>     </dependency>
>
>     <dependency>
>       <groupId>junit</groupId>
>       <artifactId>junit</artifactId>
>       <version>3.8.1</version>
>       <scope>test</scope>
>     </dependency>
>   </dependencies>
>
>   <build>
>     <plugins>
>       <plugin>
>         <groupId>org.apache.maven.plugins</groupId>
>         <artifactId>maven-shade-plugin</artifactId>
>         <version>3.0.0</version>
>         <executions>
>           <!-- Run shade goal on package phase -->
>           <execution>
>             <phase>install</phase>
>             <goals>
>               <goal>shade</goal>
>             </goals>
>           </execution>
>         </executions>
>       </plugin>
>
>       <!-- Scala Compiler -->
>       <plugin>
>         <groupId>net.alchim31.maven</groupId>
>         <artifactId>scala-maven-plugin</artifactId>
>         <version>3.2.2</version>
>         <executions>
>           <execution>
>             <goals>
>               <goal>compile</goal>
>               <goal>testCompile</goal>
>             </goals>
>           </execution>
>         </executions>
>       </plugin>
>
>       <plugin>
>         <groupId>org.codehaus.mojo</groupId>
>         <artifactId>build-helper-maven-plugin</artifactId>
>         <version>1.7</version>
>         <executions>
>           <!-- Add src/main/scala to eclipse build path -->
>           <execution>
>             <id>add-source</id>
>             <phase>generate-sources</phase>
>             <goals>
>               <goal>add-source</goal>
>             </goals>
>             <configuration>
>               <sources>
>                 <source>src/main/scala</source>
>               </sources>
>             </configuration>
>           </execution>
>           <!-- Add src/test/scala to eclipse build path -->
>           <execution>
>             <id>add-test-source</id>
>             <phase>generate-test-sources</phase>
>             <goals>
>               <goal>add-test-source</goal>
>             </goals>
>             <configuration>
>               <sources>
>                 <source>src/test/scala</source>
>               </sources>
>             </configuration>
>           </execution>
>         </executions>
>       </plugin>
>
>       <!-- we disable surefile and enable scalatest so that maven can run our tests
-->
>       <plugin>
>         <groupId>org.apache.maven.plugins</groupId>
>         <artifactId>maven-surefire-plugin</artifactId>
>         <version>2.7</version>
>         <configuration>
>           <skipTests>true</skipTests>
>         </configuration>
>       </plugin>
>       <plugin>
>         <groupId>org.scalatest</groupId>
>         <artifactId>scalatest-maven-plugin</artifactId>
>         <version>1.0</version>
>         <configuration>
>           <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>           <junitxml>.</junitxml>
>           <filereports>WDF TestSuite.txt</filereports>
>         </configuration>
>         <executions>
>           <execution>
>             <id>test</id>
>             <goals>
>               <goal>test</goal>
>             </goals>
>           </execution>
>         </executions>
>       </plugin>
>       <plugin>
>         <groupId>org.scalastyle</groupId>
>         <artifactId>scalastyle-maven-plugin</artifactId>
>         <version>1.0.0</version>
>         <configuration>
>           <verbose>false</verbose>
>           <failOnViolation>true</failOnViolation>
>           <includeTestSourceDirectory>true</includeTestSourceDirectory>
>           <failOnWarning>false</failOnWarning>
>           <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
>           <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
>           <configLocation>lib/scalastyle_config.xml</configLocation>
>           <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
>           <outputEncoding>UTF-8</outputEncoding>
>         </configuration>
>         <executions>
>           <execution>
>             <goals>
>               <goal>check</goal>
>             </goals>
>           </execution>
>         </executions>
>       </plugin>
>       <plugin>
>         <groupId>org.sonarsource.scanner.maven</groupId>
>         <artifactId>sonar-maven-plugin</artifactId>
>         <version>3.6.0.1398</version>
>       </plugin>
>
>     </plugins>
>   </build>
> </project>
>
>
>
> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglists19@gmail.com>
> wrote:
>
>> Ok. I will work on creating a reproducible app. Thanks.
>>
>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
>> wrote:
>>
>>> Just reached this thread. +1 on to create a simple reproducer app and I
>>> suggest to create a jira attaching the full driver and executor logs.
>>> Ping me on the jira and I'll pick this up right away...
>>>
>>> Thanks!
>>>
>>> G
>>>
>>>
>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <
>>> kabhwan.opensource@gmail.com> wrote:
>>>
>>>> Would you mind if I ask for a simple reproducer? Would be nice if you
>>>> could create a repository in Github and push the code including the build
>>>> script.
>>>>
>>>> Thanks in advance!
>>>>
>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <mailinglists19@gmail.com>
>>>> wrote:
>>>>
>>>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>>>>
>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>
>>>>>> Which exact Spark version did you use? Did you make sure the version
>>>>>> for Spark and the version for spark-sql-kafka artifact are the same?
(I
>>>>>> asked this because you've said you've used Spark 3.0 but spark-sql-kafka
>>>>>> dependency pointed to 3.1.0.)
>>>>>>
>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <
>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>
>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data
source
>>>>>>> v2 streaming sinks does not support Update mode. === Streaming
Query ===
>>>>>>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId
=
>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets:
{} Current
>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State:
RUNNABLE at
>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>>> at
>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2
streaming
>>>>>>> sinks does not support Update mode. at
>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>>> at
>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>>> at
>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>>> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>>> ... 1 more
>>>>>>>
>>>>>>>
>>>>>>> *Please see the attached image for more information.*
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <jacek@japila.pl>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Can you post the whole message? I'm trying to find what might
be
>>>>>>>> causing it. A small reproducible example would be of help
too. Thank you.
>>>>>>>>
>>>>>>>> Pozdrawiam,
>>>>>>>> Jacek Laskowski
>>>>>>>> ----
>>>>>>>> https://about.me/JacekLaskowski
>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>>
>>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <
>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming
>>>>>>>>> application to Spark 3.0. I compiled it using the dependency
given below:
>>>>>>>>>
>>>>>>>>> <dependency>
>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>     <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>     <version>3.1.0</version>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Every time I run it under Spark 3.0, I get this message:
*Data
>>>>>>>>> source v2 streaming sinks does not support Update mode*
>>>>>>>>>
>>>>>>>>> I am using '*mapGroupsWithState*' so as per this link
(
>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>>> the only supported Output mode is "*Update*".
>>>>>>>>>
>>>>>>>>> My Sink is a Kafka topic so I am using this:
>>>>>>>>>
>>>>>>>>> .writeStream
>>>>>>>>> .format("kafka")
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What am I missing?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>
>>>>>>

Mime
View raw message