spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aakash Basu <aakash.spark....@gmail.com>
Subject Re: [Structured Streaming Query] Calculate Running Avg from Kafka feed using SQL query
Date Mon, 02 Apr 2018 14:07:27 GMT
Any help, guys?

On Mon, Apr 2, 2018 at 1:01 PM, Aakash Basu <aakash.spark.raj@gmail.com>
wrote:

> Hi,
>
> This is a very interesting requirement, where I am getting stuck at a few
> places.
>
> *Requirement* -
>
> Col1        Col2
> 1              10
> 2              11
> 3              12
> 4              13
> 5              14
>
>
>
> *I have to calculate avg of col1 and then divide each row of col2 by that
> avg. And, the Avg should be updated with every new data being fed through
> Kafka into Spark Streaming.*
>
> *Avg(Col1) = Running Avg*
> *Col2 = Col2/Avg(Col1)*
>
>
> *Queries* *-*
>
>
> *1) I am currently trying to simply run a inner query inside a query and
> print Avg with other Col value and then later do the calculation. But,
> getting error.*
>
> Query -
>
> select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg from transformed_Stream_DF
t
>
> Error -
>
> pyspark.sql.utils.StreamingQueryException: u'Queries with streaming
> sources must be executed with writeStream.start();
>
> Even though, I already have writeStream.start(); in my code, it is
> probably throwing the error because of the inner select query (I think
> Spark is assuming it as another query altogether which require its own
> writeStream.start. Any help?
>
>
> *2) How to go about it? *I have another point in mind, i.e, querying the
> table to get the avg and store it in a variable. In the second query simply
> pass the variable and divide the second column to produce appropriate
> result. But, is it the right approach?
>
> *3) Final question*: How to do the calculation over the entire data and
> not the latest, do I need to keep appending somewhere and repeatedly use
> it? My average and all the rows of the Col2 shall change with every new
> incoming data.
>
>
> *Code -*
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
>     spark = SparkSession.builder \
>         .appName("Stream_Col_Oper_Spark") \
>         .getOrCreate()
>
>     data = spark.readStream.format("kafka") \
>         .option("startingOffsets", "latest") \
>         .option("kafka.bootstrap.servers", "localhost:9092") \
>         .option("subscribe", "test1") \
>         .load()
>
>     ID = data.select('value') \
>         .withColumn('value', data.value.cast("string")) \
>         .withColumn("Col1", split(col("value"), ",").getItem(0)) \
>         .withColumn("Col2", split(col("value"), ",").getItem(1)) \
>         .drop('value')
>
>     ID.createOrReplaceTempView("transformed_Stream_DF")
>     aggregate_func = spark.sql(
>         "select t.Col2 , (Select AVG(Col1) as Avg from transformed_Stream_DF) as myAvg
from transformed_Stream_DF t")  #  (Col2/(AVG(Col1)) as Col3)")
>
>     # -----------For Console Print-----------
>
>     query = aggregate_func \
>         .writeStream \
>         .format("console") \
>         .start()
>     # .outputMode("complete") \
>     # -----------Console Print ends-----------
>
>     query.awaitTermination()
>     # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>
>
>
>
> Thanks,
> Aakash.
>

Mime
View raw message