Any help?

Need urgent help. Someone please clarify the doubt?

---------- Forwarded message ----------
From: Aakash Basu <aakash.spark.raj@gmail.com>
Date: Thu, Apr 5, 2018 at 2:50 PM
Subject: Spark Structured Streaming Inner Queries fails
To: user <user@spark.apache.org>


Hi,

Why are inner queries not allowed in Spark Streaming? Spark assumes the inner query to be a separate stream altogether and expects it to be triggered with a separate writeStream.start().

Why so?

Error: pyspark.sql.utils.StreamingQueryException: 'Queries with streaming sources must be executed with writeStream.start();;\ntextSocket\n=== Streaming Query ===\nIdentifier: [id = f77611ee-ce1c-4b16-8812-0f1afe05562c, runId = 0bb4d880-1a4d-4a6c-8fe0-2b4977ab52d0]\nCurrent Committed Offsets: {}\nCurrent Available Offsets: {TextSocketSource[host: localhost, port: 9998]: 5}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nProject [col1#3, col2#4, (cast(col2#4 as double) / scalar-subquery#8 []) AS col3#9]\n:  +- Aggregate [avg(cast(col1#3 as double)) AS aver#7]\n:     +- SubqueryAlias ds\n:        +- Project [split(value#1, ,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n:           +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3ac605bf,socket,List(),None,List(),None,Map(header -> true, host -> localhost, path -> csv, port -> 9998),None), textSocket, [value#1]\n+- SubqueryAlias ds\n   +- Project [split(value#1, ,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n      +- StreamingExecutionRelation TextSocketSource[host: localhost, port: 9998], [value#1]\n'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession \
.builder \
.appName("StructuredRunningAvg") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")

id = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value, ",").getItem(1).alias("col2"))


id.createOrReplaceTempView("ds")

final_DF = spark.sql("Select col1, col2, col2/(select avg(col1) as aver from ds) col3 from ds")
query = final_DF \
.writeStream \
.format("console") \
.start()

query.awaitTermination()


Thanks,
Aakash.