spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Beabes <mailinglist...@gmail.com>
Subject Re: Data source v2 streaming sinks does not support Update mode
Date Mon, 18 Jan 2021 13:14:31 GMT
Here's a very simple reproducer app. I've attached 3 files:
SparkTest.scala, QueryListener.scala & pom.xml. Copying contents in the
email as well:

package com.myorg

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}

import scala.util.{Failure, Success, Try}

object Spark3Test {

  val isLocal = false

  implicit val stringEncoder: Encoder[String] = Encoders.STRING
  implicit val myStateEncoder: Encoder[MyState] = Encoders.kryo[MyState]

  val START_DATE_INDEX = 21
  val END_DATE_INDEX = 40

  def main(args: Array[String]) {

    val spark: SparkSession = initializeSparkSession("Spark 3.0
Upgrade", isLocal)
    spark.sparkContext.setLogLevel("WARN")

    readKafkaStream(spark)
      .groupByKey(row => {
        row.substring(START_DATE_INDEX, END_DATE_INDEX)
      })
      .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )
      .filter(row => !row.inProgress)
      .map(row => "key: " + row.dateTime + " " + "count: " + row.count)
      .writeStream
      .format("kafka")
      .option(
        s"kafka.${ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}",
        "10.29.42.141:9092"
//        "localhost:9092"
      )
      .option("topic", "spark3test")
      .option("checkpointLocation", "/tmp/checkpoint_5")
      .outputMode("update")
      .start()
    manageStreamingQueries(spark)
  }

  def readKafkaStream(sparkSession: SparkSession): Dataset[String] = {

    val stream = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "10.29.42.141:9092")
      .option("subscribe", "inputTopic")
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafkaConsumer.pollTimeoutMs", "120000")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String](Encoders.STRING)
    stream
  }

  def updateAcrossEvents(key: String, inputs: Iterator[String],
oldState: GroupState[MyState]): MyState = {
    if (!oldState.exists) {
      println(key)
      val state = MyState(key)
      oldState.update(state)
      oldState.setTimeoutDuration("1 minutes")
      oldState.get
    } else {
      if (oldState.hasTimedOut) {
        oldState.get.inProgress = false
        val state = oldState.get
        println("State timed out for key: " + state.dateTime)
        oldState.remove()
        state
      } else {
        val state = oldState.get
        state.count = state.count + 1
        oldState.update(state)
        oldState.setTimeoutDuration("1 minutes")
        oldState.get
      }
    }
  }

  def initializeSparkSession(applicationName: String, isLocal:
Boolean): SparkSession = {
    UserGroupInformation.setLoginUser(
      UserGroupInformation.createRemoteUser("hduser")
    )

    val builder = SparkSession
      .builder()
      .appName(applicationName)

    if (isLocal) {
      builder.config("spark.master", "local[2]")
    }

    builder.getOrCreate()
  }

  def manageStreamingQueries(spark: SparkSession): Unit = {

    val sparkQueryListener = new QueryListener()
    spark.streams.addListener(sparkQueryListener)

    val shutdownMarker: String = "/tmp/stop_job"

    val timeoutInMilliSeconds = 60000
    while (!spark.streams.active.isEmpty) {
      Try(spark.streams.awaitAnyTermination(timeoutInMilliSeconds)) match {
        case Success(result) =>
          if (result) {
            println("A streaming query was terminated successfully")
            spark.streams.resetTerminated()
          }
        case Failure(e) =>
          println("Query failed with message: " + e.getMessage)
          e.printStackTrace()
          spark.streams.resetTerminated()
      }

      if (checkMarker(shutdownMarker)) {
        spark.streams.active.foreach(query => {
          println(s"Stopping streaming query: ${query.id}")
          query.stop()
        })
        spark.stop()
        removeMarker(shutdownMarker)
      }
    }
    assert(spark.streams.active.isEmpty)
    spark.streams.removeListener(sparkQueryListener)
  }

  def checkMarker(markerFile: String): Boolean = {
    val fs = FileSystem.get(new Configuration())
    fs.exists(new Path(markerFile))
  }

  def removeMarker(markerFile: String): Unit = {
    val fs = FileSystem.get(new Configuration())
    fs.delete(new Path(markerFile), true)
  }

}

case class MyState(var dateTime: String, var inProgress: Boolean =
true, var count: Int = 1)







package com.myorg

import org.apache.spark.sql.streaming.StreamingQueryListener

class QueryListener extends StreamingQueryListener {

  def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}

  def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
    if (event.progress.numInputRows != 0) {
      println(
        s"InputRows: ${event.progress.numInputRows}"
      )
    }
  }

  def onQueryTerminated(event:
StreamingQueryListener.QueryTerminatedEvent): Unit = {
    println(s"Query with id ${event.id} terminated")
  }
}







<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.myorg</groupId>
  <artifactId>spark-3-conversion</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>spark-3-conversion</name>
  <url>http://maven.apache.org</url>

  <properties>
    <spark.version>3.0.0</spark.version>
    <scala.binary.version>2.12</scala.binary.version>
    <scala.version>2.12.10</scala.version>
    <scoverage.plugin.version>1.4.0-RC1</scoverage.plugin.version>
    <skipTests>true</skipTests>
    <maven.compiler.source>1.5</maven.compiler.source>
    <maven.compiler.target>1.5</maven.compiler.target>
    <encoding>UTF-8</encoding>
  </properties>


  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
      <scope>runtime</scope>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.binary.version}</artifactId>
      <version>3.0.7</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.0.0</version>
        <executions>
          <!-- Run shade goal on package phase -->
          <execution>
            <phase>install</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <!-- Scala Compiler -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>build-helper-maven-plugin</artifactId>
        <version>1.7</version>
        <executions>
          <!-- Add src/main/scala to eclipse build path -->
          <execution>
            <id>add-source</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>add-source</goal>
            </goals>
            <configuration>
              <sources>
                <source>src/main/scala</source>
              </sources>
            </configuration>
          </execution>
          <!-- Add src/test/scala to eclipse build path -->
          <execution>
            <id>add-test-source</id>
            <phase>generate-test-sources</phase>
            <goals>
              <goal>add-test-source</goal>
            </goals>
            <configuration>
              <sources>
                <source>src/test/scala</source>
              </sources>
            </configuration>
          </execution>
        </executions>
      </plugin>

      <!-- we disable surefile and enable scalatest so that maven can
run our tests -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.7</version>
        <configuration>
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.scalatest</groupId>
        <artifactId>scalatest-maven-plugin</artifactId>
        <version>1.0</version>
        <configuration>
          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
          <junitxml>.</junitxml>
          <filereports>WDF TestSuite.txt</filereports>
        </configuration>
        <executions>
          <execution>
            <id>test</id>
            <goals>
              <goal>test</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.scalastyle</groupId>
        <artifactId>scalastyle-maven-plugin</artifactId>
        <version>1.0.0</version>
        <configuration>
          <verbose>false</verbose>
          <failOnViolation>true</failOnViolation>
          <includeTestSourceDirectory>true</includeTestSourceDirectory>
          <failOnWarning>false</failOnWarning>
          <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
          <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>
          <configLocation>lib/scalastyle_config.xml</configLocation>
          <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
          <outputEncoding>UTF-8</outputEncoding>
        </configuration>
        <executions>
          <execution>
            <goals>
              <goal>check</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.sonarsource.scanner.maven</groupId>
        <artifactId>sonar-maven-plugin</artifactId>
        <version>3.6.0.1398</version>
      </plugin>

    </plugins>
  </build>
</project>



On Wed, Jan 13, 2021 at 11:26 PM Eric Beabes <mailinglists19@gmail.com>
wrote:

> Ok. I will work on creating a reproducible app. Thanks.
>
> On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi <gabor.g.somogyi@gmail.com>
> wrote:
>
>> Just reached this thread. +1 on to create a simple reproducer app and I
>> suggest to create a jira attaching the full driver and executor logs.
>> Ping me on the jira and I'll pick this up right away...
>>
>> Thanks!
>>
>> G
>>
>>
>> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim <
>> kabhwan.opensource@gmail.com> wrote:
>>
>>> Would you mind if I ask for a simple reproducer? Would be nice if you
>>> could create a repository in Github and push the code including the build
>>> script.
>>>
>>> Thanks in advance!
>>>
>>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes <mailinglists19@gmail.com>
>>> wrote:
>>>
>>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>>>
>>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>>>> kabhwan.opensource@gmail.com> wrote:
>>>>
>>>>> Which exact Spark version did you use? Did you make sure the version
>>>>> for Spark and the version for spark-sql-kafka artifact are the same?
(I
>>>>> asked this because you've said you've used Spark 3.0 but spark-sql-kafka
>>>>> dependency pointed to 3.1.0.)
>>>>>
>>>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes <mailinglists19@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> org.apache.spark.sql.streaming.StreamingQueryException: Data source
>>>>>> v2 streaming sinks does not support Update mode. === Streaming Query
===
>>>>>> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
>>>>>> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets:
{} Current
>>>>>> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE
at
>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
>>>>>> at
>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>>>>>> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
>>>>>> sinks does not support Update mode. at
>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
>>>>>> at
>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
>>>>>> at
>>>>>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
>>>>>> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
>>>>>> ... 1 more
>>>>>>
>>>>>>
>>>>>> *Please see the attached image for more information.*
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski <jacek@japila.pl>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Can you post the whole message? I'm trying to find what might
be
>>>>>>> causing it. A small reproducible example would be of help too.
Thank you.
>>>>>>>
>>>>>>> Pozdrawiam,
>>>>>>> Jacek Laskowski
>>>>>>> ----
>>>>>>> https://about.me/JacekLaskowski
>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>
>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes <
>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>
>>>>>>>> Trying to port my Spark 2.4 based (Structured) streaming
>>>>>>>> application to Spark 3.0. I compiled it using the dependency
given below:
>>>>>>>>
>>>>>>>> <dependency>
>>>>>>>>     <groupId>org.apache.spark</groupId>
>>>>>>>>     <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
>>>>>>>>     <version>3.1.0</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>>
>>>>>>>> Every time I run it under Spark 3.0, I get this message:
*Data
>>>>>>>> source v2 streaming sinks does not support Update mode*
>>>>>>>>
>>>>>>>> I am using '*mapGroupsWithState*' so as per this link (
>>>>>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>>>>>>> the only supported Output mode is "*Update*".
>>>>>>>>
>>>>>>>> My Sink is a Kafka topic so I am using this:
>>>>>>>>
>>>>>>>> .writeStream
>>>>>>>> .format("kafka")
>>>>>>>>
>>>>>>>>
>>>>>>>> What am I missing?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>
>>>>>

Mime
View raw message