Thanks for double checking the version. Please report back with 3.1 version whether it works or not.

G


On Tue, 19 Jan 2021, 07:41 Eric Beabes, <mailinglists19@gmail.com> wrote:
Confirmed. The cluster Admin said his team installed the latest version from Cloudera which comes with Spark 3.0.0-preview2. They are going to try to upgrade it with the Community edition Spark 3.1.0.

Thanks Jungtaek for the tip. Greatly appreciate it.

On Tue, Jan 19, 2021 at 8:45 AM Eric Beabes <mailinglists19@gmail.com> wrote:
>> "Could you please make sure you're not using "3.0.0-preview". 

This could be the reason. I will check with our Hadoop cluster administrator. It's quite possible that they installed the "Preview" mode. Yes, the code works in the Local dev environment.


On Tue, Jan 19, 2021 at 5:29 AM Jungtaek Lim <kabhwan.opensource@gmail.com> wrote:
I see no issue from running this code in local dev. (changed the scope of Spark artifacts to "compile" of course)

Could you please make sure you're not using "3.0.0-preview"? In 3.0.0-preview update mode was restricted (as the error message says) and it was reverted before releasing Spark 3.0.0. Clearing Spark artifacts in your .m2 cache may work.

On Tue, Jan 19, 2021 at 8:50 AM Jungtaek Lim <kabhwan.opensource@gmail.com> wrote:
And also include some test data as well. I quickly looked through the code and the code may require a specific format of the record.

On Tue, Jan 19, 2021 at 12:10 AM German Schiavon <gschiavonspark@gmail.com> wrote:
Hi,

This is the jira and regarding the repo, I believe just commit it to your personal repo and that should be it.

Regards

On Mon, 18 Jan 2021 at 15:46, Eric Beabes <mailinglists19@gmail.com> wrote:
Sorry. Can you please tell me where to create the JIRA? Also is there any specific Github repository I need to commit code into - OR - just in our own? Please let me know. Thanks.

On Mon, Jan 18, 2021 at 7:07 PM Gabor Somogyi <gabor.g.somogyi@gmail.com> wrote:
Thanks you, as we've asked could you please create a jira and commit the code into github?
It would speed things up a lot.

G


On Mon, Jan 18, 2021 at 2:14 PM Eric Beabes <mailinglists19@gmail.com> wrote:
Here's a very simple reproducer app. I've attached 3 files: SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

val isLocal = false

implicit val stringEncoder: Encoder[String] = Encoders.STRING
implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

val START_DATE_INDEX = 21
val END_DATE_INDEX = 40

def main(args: Array[String]) {

val spark: SparkSession = initializeSparkSession("Spark 3.0 Upgrade", isLocal)
spark.sparkContext.setLogLevel("WARN")

readKafkaStream(spark)
.groupByKey(row => {
row.substring(START_DATE_INDEX, END_DATE_INDEX)
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
.filter(row => !row.inProgress)
.map(row => "key: " + row.dateTime + " " + "count: " + row.count)
.writeStream
.format("kafka")
.option(
s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
"10.29.42.141:9092"
// "localhost:9092"
)
.option("topic", "spark3test")
.option("checkpointLocation", "/tmp/checkpoint_5")
.outputMode("update")
.start()
manageStreamingQueries(spark)
}

def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

val stream = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "10.29.42.141:9092")
.option("subscribe", "inputTopic")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("kafkaConsumer.pollTimeoutMs", "120000")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String](Encoders.STRING)
stream
}

def updateAcrossEvents(key: String, inputs: Iterator[String], oldState: GroupState[MyState]): MyState = {
if (!oldState.exists) {
println(key)
val state = MyState(key)
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
} else {
if (oldState.hasTimedOut) {
oldState.get.inProgress = false
val state = oldState.get
println("State timed out for key: " + state.dateTime)
oldState.remove()
state
} else {
val state = oldState.get
state.count = state.count + 1
oldState.update(state)
oldState.setTimeoutDuration("1 minutes")
oldState.get
}
}
}

def initializeSparkSession(applicationName: String, isLocal: Boolean): SparkSession = {
UserGroupInformation.setLoginUser(
UserGroupInformation.createRemoteUser("hduser")
)

val builder = SparkSession
.builder()
.appName(applicationName)

if (isLocal) {
builder.config("spark.master", "local[2]")
}

builder.getOrCreate()
}

def manageStreamingQueries(spark: SparkSession): Unit = {

val sparkQueryListener = new QueryListener()
spark.streams.addListener(sparkQueryListener)

val shutdownMarker: String = "/tmp/stop_job"

val timeoutInMilliSeconds = 60000
while (!spark.streams.active.isEmpty) {
Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
case Success(result) =>
if (result) {
println("A streaming query was terminated successfully")
spark.streams.resetTerminated()
}
case Failure(e) =>
println("Query failed with message: " + e.getMessage)
e.printStackTrace()
spark.streams.resetTerminated()
}

if (checkMarker(shutdownMarker)) {
spark.streams.active.foreach(query => {
println(s"Stopping streaming query: ${query.id}")
query.stop()
})
spark.stop()
removeMarker(shutdownMarker)
}
}
assert(spark.streams.active.isEmpty)
spark.streams.removeListener(sparkQueryListener)
}

def checkMarker(markerFile: String): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path(markerFile))
}

def removeMarker(markerFile: String): Unit = {
val fs = FileSystem.get(new Configuration())
fs.delete(new Path(markerFile), true)
}

}

case class MyState(var dateTime: String, var inProgress: Boolean = true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if (event.progress.numInputRows != 0) {
println(
s"InputRows: ${event.progress.numInputRows}"
)
}
}

def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println(s"Query with id ${event.id} terminated")
}
}






<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myorg</groupId>
<artifactId>spark-3-conversion</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-3-conversion</name>
<url>http://maven.apache.org</url>

<properties>
<spark.version>3.0.0</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
<skipTests>true</skipTests>
<maven.compiler.source>1.5</maven.compiler.source>
<maven.compiler.target>1.5</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>


<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.7</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>install</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- we disable surefile and enable scalatest so that maven can run our tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.7</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
<configLocation>lib/scalastyle_config.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>3.6.0.1398</version>
</plugin>

</plugins>
</build>
</project>


On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglists19@gmail.com> wrote:
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <gabor.g.somogyi@gmail.com> wrote:
Just reached this thread. +1 on to create a simple reproducer app and I suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <kabhwan.opensource@gmail.com> wrote:
Would you mind if I ask for a simple reproducer? Would be nice if you could create a repository in Github and push the code including the build script.

Thanks in advance!

On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <mailinglists19@gmail.com> wrote:
I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.

On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <kabhwan.opensource@gmail.com> wrote:
Which exact Spark version did you use? Did you make sure the version for Spark and the version for spark-sql-kafka artifact are the same? (I asked this because you've said you've used Spark 3.0 but spark-sql-kafka dependency pointed to 3.1.0.) 

On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <mailinglists19@gmail.com> wrote:
org.apache.spark.sql.streaming.StreamingQueryException: Data source v2 streaming sinks does not support Update mode. === Streaming Query === Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId = 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.IllegalArgumentException: Data source v2 streaming sinks does not support Update mode. at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320) ... 1 more


Please see the attached image for more information.


On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <jacek@japila.pl> wrote:
Hi,

Can you post the whole message? I'm trying to find what might be causing it. A small reproducible example would be of help too. Thank you.

On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <mailinglists19@gmail.com> wrote:
Trying to port my Spark 2.4 based (Structured) streaming application to Spark 3.0. I compiled it using the dependency given below:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>3.1.0</version>
</dependency>

Every time I run it under Spark 3.0, I get this message: Data source v2 streaming sinks does not support Update mode

I am using 'mapGroupsWithState' so as per this link (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes), the only supported Output mode is "Update". 

My Sink is a Kafka topic so I am using this:

.writeStream
.format("kafka")

What am I missing?



---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org