spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Beabes <mailinglist...@gmail.com>
Subject Re: Data source v2 streaming sinks does not support Update mode
Date Mon, 18 Jan 2021 14:45:13 GMT
Sorry. Can you please tell me where to create the JIRA? Also is there any
specific Github repository I need to commit code into - OR - just in our
own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
wrote:

> Thanks you, as we've asked could you please create a jira and commit the
> code into github?
> It would speed things up a lot.
>
> G
>
>
> On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglists19@gmail.com>
> wrote:
>
>> Here's a very simple reproducer app. I've attached 3 files:
>> SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
>> email as well:
>>
>> package com.myorg
>>
>> import org.apache.hadoop.conf.Configuration
>> import org.apache.hadoop.fs.{FileSystem, Path}
>> import org.apache.hadoop.security.UserGroupInformation
>> import org.apache.kafka.clients.producer.ProducerConfig
>> import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
>> import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
>>
>> import scala.util.{Failure, Success, Try}
>>
>> object Spark3Test {
>>
>>   val isLocal = false
>>
>>   implicit val stringEncoder: Encoder[String] = Encoders.STRING
>>   implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]
>>
>>   val START_DATE_INDEX = 21
>>   val END_DATE_INDEX = 40
>>
>>   def main(args: Array[String]) {
>>
>>     val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
>>     spark.sparkContext.setLogLevel("WARN")
>>
>>     readKafkaStream(spark)
>>       .groupByKey(row => {
>>         row.substring(START_DATE_INDEX, END_DATE_INDEX)
>>       })
>>       .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>         updateAcrossEvents
>>       )
>>       .filter(row => !row.inProgress)
>>       .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
>>       .writeStream
>>       .format("kafka")
>>       .option(
>>         s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
>>         "10.29.42.141:9092"
>> //        "localhost:9092"
>>       )
>>       .option("topic", "spark3test")
>>       .option("checkpointLocation", "/tmp/checkpoint_5")
>>       .outputMode("update")
>>       .start()
>>     manageStreamingQueries(spark)
>>   }
>>
>>   def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {
>>
>>     val stream = sparkSession.readStream
>>       .format("kafka")
>>       .option("kafka.bootstrap.servers", "10.29.42.141:9092")
>>       .option("subscribe", "inputTopic")
>>       .option("startingOffsets", "latest")
>>       .option("failOnDataLoss", "false")
>>       .option("kafkaConsumer.pollTimeoutMs", "120000")
>>       .load()
>>       .selectExpr("CAST(value AS STRING)")
>>       .as[String](Encoders.STRING)
>>     stream
>>   }
>>
>>   def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]):
MyState = {
>>     if (!oldState.exists) {
>>       println(key)
>>       val state = MyState(key)
>>       oldState.update(state)
>>       oldState.setTimeoutDuration("1 minutes")
>>       oldState.get
>>     } else {
>>       if (oldState.hasTimedOut) {
>>         oldState.get.inProgress = false
>>         val state = oldState.get
>>         println("State timed out for key: " + state.dateTime)
>>         oldState.remove()
>>         state
>>       } else {
>>         val state = oldState.get
>>         state.count = state.count + 1
>>         oldState.update(state)
>>         oldState.setTimeoutDuration("1 minutes")
>>         oldState.get
>>       }
>>     }
>>   }
>>
>>   def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession
= {
>>     UserGroupInformation.setLoginUser(
>>       UserGroupInformation.createRemoteUser("hduser")
>>     )
>>
>>     val builder = SparkSession
>>       .builder()
>>       .appName(applicationName)
>>
>>     if (isLocal) {
>>       builder.config("spark.master", "local[2]")
>>     }
>>
>>     builder.getOrCreate()
>>   }
>>
>>   def manageStreamingQueries(spark: SparkSession): Unit = {
>>
>>     val sparkQueryListener = new QueryListener()
>>     spark.streams.addListener(sparkQueryListener)
>>
>>     val shutdownMarker: String = "/tmp/stop_job"
>>
>>     val timeoutInMilliSeconds = 60000
>>     while (!spark.streams.active.isEmpty) {
>>       Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
>>         case Success(result) =>
>>           if (result) {
>>             println("A streaming query was terminated successfully")
>>             spark.streams.resetTerminated()
>>           }
>>         case Failure(e) =>
>>           println("Query failed with message: " + e.getMessage)
>>           e.printStackTrace()
>>           spark.streams.resetTerminated()
>>       }
>>
>>       if (checkMarker(shutdownMarker)) {
>>         spark.streams.active.foreach(query => {
>>           println(s"Stopping streaming query: ${query.id}")
>>           query.stop()
>>         })
>>         spark.stop()
>>         removeMarker(shutdownMarker)
>>       }
>>     }
>>     assert(spark.streams.active.isEmpty)
>>     spark.streams.removeListener(sparkQueryListener)
>>   }
>>
>>   def checkMarker(markerFile: String): Boolean = {
>>     val fs = FileSystem.get(new Configuration())
>>     fs.exists(new Path(markerFile))
>>   }
>>
>>   def removeMarker(markerFile: String): Unit = {
>>     val fs = FileSystem.get(new Configuration())
>>     fs.delete(new Path(markerFile), true)
>>   }
>>
>> }
>>
>> case class MyState(var dateTime: String, var inProgress: Boolean = true, var count:
Int = 1)
>>
>>
>>
>>
>>
>>
>>
>> package com.myorg
>>
>> import org.apache.spark.sql.streaming.StreamingQueryListener
>>
>> class QueryListener extends StreamingQueryListener {
>>
>>   def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
>>
>>   def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
>>     if (event.progress.numInputRows != 0) {
>>       println(
>>         s"InputRows: ${event.progress.numInputRows}"
>>       )
>>     }
>>   }
>>
>>   def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit
= {
>>     println(s"Query with id ${event.id} terminated")
>>   }
>> }
>>
>>
>>
>>
>>
>>
>>
>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
>>   <modelVersion>4.0.0</modelVersion>
>>   <groupId>com.myorg</groupId>
>>   <artifactId>spark-3-conversion</artifactId>
>>   <packaging>jar</packaging>
>>   <version>1.0-SNAPSHOT</version>
>>   <name>spark-3-conversion</name>
>>   <url>http://maven.apache.org</url>
>>
>>   <properties>
>>     <spark.version>3.0.0</spark.version>
>>     <scala.binary.version>2.12</scala.binary.version>
>>     <scala.version>2.12.10</scala.version>
>>     <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
>>     <skipTests>true</skipTests>
>>     <maven.compiler.source>1.5</maven.compiler.source>
>>     <maven.compiler.target>1.5</maven.compiler.target>
>>     <encoding>UTF-8</encoding>
>>   </properties>
>>
>>
>>   <dependencies>
>>     <dependency>
>>       <groupId>org.scala-lang</groupId>
>>       <artifactId>scala-library</artifactId>
>>       <version>${scala.version}</version>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-core_${scala.binary.version}</artifactId>
>>       <version>${spark.version}</version>
>>       <scope>provided</scope>
>>     </dependency>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-streaming_${scala.binary.version}</artifactId>
>>       <version>${spark.version}</version>
>>       <scope>provided</scope>
>>     </dependency>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-sql_${scala.binary.version}</artifactId>
>>       <version>${spark.version}</version>
>>       <scope>provided</scope>
>>     </dependency>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
>>       <version>${spark.version}</version>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>       <version>${spark.version}</version>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.slf4j</groupId>
>>       <artifactId>slf4j-log4j12</artifactId>
>>       <version>1.7.7</version>
>>       <scope>runtime</scope>
>>     </dependency>
>>     <dependency>
>>       <groupId>log4j</groupId>
>>       <artifactId>log4j</artifactId>
>>       <version>1.2.17</version>
>>       <scope>compile</scope>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.scalatest</groupId>
>>       <artifactId>scalatest_${scala.binary.version}</artifactId>
>>       <version>3.0.7</version>
>>       <scope>test</scope>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>junit</groupId>
>>       <artifactId>junit</artifactId>
>>       <version>3.8.1</version>
>>       <scope>test</scope>
>>     </dependency>
>>   </dependencies>
>>
>>   <build>
>>     <plugins>
>>       <plugin>
>>         <groupId>org.apache.maven.plugins</groupId>
>>         <artifactId>maven-shade-plugin</artifactId>
>>         <version>3.0.0</version>
>>         <executions>
>>           <!-- Run shade goal on package phase -->
>>           <execution>
>>             <phase>install</phase>
>>             <goals>
>>               <goal>shade</goal>
>>             </goals>
>>           </execution>
>>         </executions>
>>       </plugin>
>>
>>       <!-- Scala Compiler -->
>>       <plugin>
>>         <groupId>net.alchim31.maven</groupId>
>>         <artifactId>scala-maven-plugin</artifactId>
>>         <version>3.2.2</version>
>>         <executions>
>>           <execution>
>>             <goals>
>>               <goal>compile</goal>
>>               <goal>testCompile</goal>
>>             </goals>
>>           </execution>
>>         </executions>
>>       </plugin>
>>
>>       <plugin>
>>         <groupId>org.codehaus.mojo</groupId>
>>         <artifactId>build-helper-maven-plugin</artifactId>
>>         <version>1.7</version>
>>         <executions>
>>           <!-- Add src/main/scala to eclipse build path -->
>>           <execution>
>>             <id>add-source</id>
>>             <phase>generate-sources</phase>
>>             <goals>
>>               <goal>add-source</goal>
>>             </goals>
>>             <configuration>
>>               <sources>
>>                 <source>src/main/scala</source>
>>               </sources>
>>             </configuration>
>>           </execution>
>>           <!-- Add src/test/scala to eclipse build path -->
>>           <execution>
>>             <id>add-test-source</id>
>>             <phase>generate-test-sources</phase>
>>             <goals>
>>               <goal>add-test-source</goal>
>>             </goals>
>>             <configuration>
>>               <sources>
>>                 <source>src/test/scala</source>
>>               </sources>
>>             </configuration>
>>           </execution>
>>         </executions>
>>       </plugin>
>>
>>       <!-- we disable surefile and enable scalatest so that maven can run our
tests -->
>>       <plugin>
>>         <groupId>org.apache.maven.plugins</groupId>
>>         <artifactId>maven-surefire-plugin</artifactId>
>>         <version>2.7</version>
>>         <configuration>
>>           <skipTests>true</skipTests>
>>         </configuration>
>>       </plugin>
>>       <plugin>
>>         <groupId>org.scalatest</groupId>
>>         <artifactId>scalatest-maven-plugin</artifactId>
>>         <version>1.0</version>
>>         <configuration>
>>           <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
>>           <junitxml>.</junitxml>
>>           <filereports>WDF TestSuite.txt</filereports>
>>         </configuration>
>>         <executions>
>>           <execution>
>>             <id>test</id>
>>             <goals>
>>               <goal>test</goal>
>>             </goals>
>>           </execution>
>>         </executions>
>>       </plugin>
>>       <plugin>
>>         <groupId>org.scalastyle</groupId>
>>         <artifactId>scalastyle-maven-plugin</artifactId>
>>         <version>1.0.0</version>
>>         <configuration>
>>           <verbose>false</verbose>
>>           <failOnViolation>true</failOnViolation>
>>           <includeTestSourceDirectory>true</includeTestSourceDirectory>
>>           <failOnWarning>false</failOnWarning>
>>           <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
>>           <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
>>           <configLocation>lib/scalastyle_config.xml</configLocation>
>>           <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
>>           <outputEncoding>UTF-8</outputEncoding>
>>         </configuration>
>>         <executions>
>>           <execution>
>>             <goals>
>>               <goal>check</goal>
>>             </goals>
>>           </execution>
>>         </executions>
>>       </plugin>
>>       <plugin>
>>         <groupId>org.sonarsource.scanner.maven</groupId>
>>         <artifactId>sonar-maven-plugin</artifactId>
>>         <version>3.6.0.1398</version>
>>       </plugin>
>>
>>     </plugins>
>>   </build>
>> </project>
>>
>>
>>
>> On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglists19@gmail.com>
>> wrote:
>>
>>> Ok. I will work on creating a reproducible app. Thanks.
>>>
>>> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
>>> wrote:
>>>
>>>> Just reached this thread. +1 on to create a simple reproducer app and I
>>>> suggest to create a jira attaching the full driver and executor logs.
>>>> Ping me on the jira and I'll pick this up right away...
>>>>
>>>> Thanks!
>>>>
>>>> G
>>>>
>>>>
>>>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <
>>>> kabhwan.opensource@gmail.com> wrote:
>>>>
>>>>> Would you mind if I ask for a simple reproducer? Would be nice if you
>>>>> could create a repository in Github and push the code including the build
>>>>> script.
>>>>>
>>>>> Thanks in advance!
>>>>>
>>>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <mailinglists19@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>>>>>
>>>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>>>>>> kabhwan.opensource@gmail.com> wrote:
>>>>>>
>>>>>>> Which exact Spark version did you use? Did you make sure the
version
>>>>>>> for Spark and the version for spark-sql-kafka artifact are the
same? (I
>>>>>>> asked this because you've said you've used Spark 3.0 but spark-sql-kafka
>>>>>>> dependency pointed to 3.1.0.)
>>>>>>>
>>>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <
>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>
>>>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data
source
>>>>>>>> v2 streaming sinks does not support Update mode. === Streaming
Query ===
>>>>>>>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId
=
>>>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets:
{} Current
>>>>>>>> Available Offsets: {} Current State: INITIALIZING Thread
State: RUNNABLE at
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>>>> at
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Data source
v2 streaming
>>>>>>>> sinks does not support Update mode. at
>>>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>>>> at
>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>>>> at
>>>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>>>> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>>>> ... 1 more
>>>>>>>>
>>>>>>>>
>>>>>>>> *Please see the attached image for more information.*
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <jacek@japila.pl>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Can you post the whole message? I'm trying to find what
might be
>>>>>>>>> causing it. A small reproducible example would be of
help too. Thank you.
>>>>>>>>>
>>>>>>>>> Pozdrawiam,
>>>>>>>>> Jacek Laskowski
>>>>>>>>> ----
>>>>>>>>> https://about.me/JacekLaskowski
>>>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>>>
>>>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <
>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming
>>>>>>>>>> application to Spark 3.0. I compiled it using the
dependency given below:
>>>>>>>>>>
>>>>>>>>>> <dependency>
>>>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>>>     <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>>>     <version>3.1.0</version>
>>>>>>>>>> </dependency>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Every time I run it under Spark 3.0, I get this message:
*Data
>>>>>>>>>> source v2 streaming sinks does not support Update
mode*
>>>>>>>>>>
>>>>>>>>>> I am using '*mapGroupsWithState*' so as per this
link (
>>>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>>>> the only supported Output mode is "*Update*".
>>>>>>>>>>
>>>>>>>>>> My Sink is a Kafka topic so I am using this:
>>>>>>>>>>
>>>>>>>>>> .writeStream
>>>>>>>>>> .format("kafka")
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What am I missing?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>
>>>>>>>

Mime
View raw message