spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aakash Basu <aakash.spark....@gmail.com>
Subject Spark Structured Streaming Inner Queries fails
Date Thu, 05 Apr 2018 09:20:44 GMT
Hi,

Why are inner queries not allowed in Spark Streaming? Spark assumes the
inner query to be a separate stream altogether and expects it to be
triggered with a separate writeStream.start().

Why so?

Error: pyspark.sql.utils.StreamingQueryException: 'Queries with streaming
sources must be executed with writeStream.start();;\ntextSocket\n===
Streaming Query ===\nIdentifier: [id =
f77611ee-ce1c-4b16-8812-0f1afe05562c, runId =
0bb4d880-1a4d-4a6c-8fe0-2b4977ab52d0]\nCurrent Committed Offsets:
{}\nCurrent Available Offsets: {TextSocketSource[host: localhost, port:
9998]: 5}\n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical
Plan:\nProject [col1#3, col2#4, (cast(col2#4 as double) / scalar-subquery#8
[]) AS col3#9]\n:  +- Aggregate [avg(cast(col1#3 as double)) AS
aver#7]\n:     +- SubqueryAlias ds\n:        +- Project [split(value#1,
,)[0] AS col1#3, split(value#1, ,)[1] AS col2#4]\n:           +-
StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@3ac605bf,socket,List(),None,List(),None,Map(header
-> true, host -> localhost, path -> csv, port -> 9998),None), textSocket,
[value#1]\n+- SubqueryAlias ds\n   +- Project [split(value#1, ,)[0] AS
col1#3, split(value#1, ,)[1] AS col2#4]\n      +-
StreamingExecutionRelation TextSocketSource[host: localhost, port: 9998],
[value#1]\n'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

spark = SparkSession \
    .builder \
    .appName("StructuredRunningAvg") \
    .getOrCreate()

data = spark \
    .readStream \
    .format("socket") \
    .option("header","true") \
    .option("host", "localhost") \
    .option("port", 9998) \
    .load("csv")

id = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))


id.createOrReplaceTempView("ds")

final_DF = spark.sql("Select col1, col2, col2/(select avg(col1) as
aver from ds) col3 from ds")

query = final_DF \
    .writeStream \
    .format("console") \
    .start()

query.awaitTermination()



Thanks,
Aakash.

Mime
View raw message