spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Amit Baghel (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19768) Error for both aggregate and non-aggregate queries in Structured streaming - "This query does not support recovering from checkpoint location"
Date Tue, 28 Feb 2017 14:19:45 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Amit Baghel updated SPARK-19768:
--------------------------------
    Description: 
I am running JavaStructuredKafkaWordCount.java with checkpointLocation. Output mode is "complete".
Below is relevant code.
{code}
 // Generate running word count
    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>()
{
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
      }
    }, Encoders.STRING()).groupBy("value").count();

    // Start running the query that prints the running counts to the console
    StreamingQuery query = wordCounts.writeStream()
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint-data")
      .start();
{code}

This example runs successfully and writes data in checkpoint directory. When I re-run the
program it throws below exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query does not support
recovering from checkpoint location. Delete /tmp/checkpoint-data/offsets to start over.;
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
	at com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}

Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query with output
mode as "append". Please see the code below.
{code}
// no aggregations
    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>()
{
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
      }
    }, Encoders.STRING()).select("value");

    // append mode with console
    StreamingQuery query = wordCounts.writeStream()
      .outputMode("append")
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint-data")
      .start();
{code}

This modified code runs successfully and writes data in checkpoint directory. When I re-run
the program it throws same exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query does not support
recovering from checkpoint location. Delete /tmp/checkpoint-data/offsets to start over.;
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
	at com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}




  was:
I am running JavaStructuredKafkaWordCount.java with checkpointLocation. Output mode is "complete".
Below is relevant code.
{code}
 // Generate running word count
    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>()
{
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
      }
    }, Encoders.STRING()).groupBy("value").count();

    // Start running the query that prints the running counts to the console
    StreamingQuery query = wordCounts.writeStream()
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint-data")
      .start();
{code}

This example runs successfully and writes data in checkpoint directory. When I re-run the
program it throws below exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query does not support
recovering from checkpoint location. Delete /tmp/checkpoint-data/offsets to start over.;
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
	at com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}

Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query with output
mode as "append". Please see the code below.
{code}
// no aggregations
    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>()
{
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
      }
    }, Encoders.STRING()).select("value");

    // append mode with console
    StreamingQuery query = wordCounts.writeStream()
      .outputMode("append")
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint-data/offsets")
      .start();
{code}

This modified code runs successfully and writes data in checkpoint directory. When I re-run
the program it throws same exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query does not support
recovering from checkpoint location. Delete /tmp/checkpoint-data/offsets to start over.;
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
	at com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}





> Error for both  aggregate  and  non-aggregate queries in Structured streaming - "This
query does not support recovering from checkpoint location"
> -------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19768
>                 URL: https://issues.apache.org/jira/browse/SPARK-19768
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Amit Baghel
>
> I am running JavaStructuredKafkaWordCount.java with checkpointLocation. Output mode is
"complete". Below is relevant code.
> {code}
>  // Generate running word count
>     Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>()
{
>       @Override
>       public Iterator<String> call(String x) {
>         return Arrays.asList(x.split(" ")).iterator();
>       }
>     }, Encoders.STRING()).groupBy("value").count();
>     // Start running the query that prints the running counts to the console
>     StreamingQuery query = wordCounts.writeStream()
>       .outputMode("complete")
>       .format("console")
>       .option("checkpointLocation", "/tmp/checkpoint-data")
>       .start();
> {code}
> This example runs successfully and writes data in checkpoint directory. When I re-run
the program it throws below exception
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: This query does not
support recovering from checkpoint location. Delete /tmp/checkpoint-data/offsets to start
over.;
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
> 	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
> 	at com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
> {code}
> Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query with output
mode as "append". Please see the code below.
> {code}
> // no aggregations
>     Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>()
{
>       @Override
>       public Iterator<String> call(String x) {
>         return Arrays.asList(x.split(" ")).iterator();
>       }
>     }, Encoders.STRING()).select("value");
>     // append mode with console
>     StreamingQuery query = wordCounts.writeStream()
>       .outputMode("append")
>       .format("console")
>       .option("checkpointLocation", "/tmp/checkpoint-data")
>       .start();
> {code}
> This modified code runs successfully and writes data in checkpoint directory. When I
re-run the program it throws same exception
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: This query does not
support recovering from checkpoint location. Delete /tmp/checkpoint-data/offsets to start
over.;
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
> 	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
> 	at com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message