spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aakash Basu <aakash.spark....@gmail.com>
Subject Re: [Structured Streaming] More than 1 streaming in a code
Date Mon, 16 Apr 2018 14:55:37 GMT
If I use timestamp based windowing, then my average will not be global
average but grouped by timestamp, which is not my requirement. I want to
recalculate the avg of entire column, every time a new row(s) comes in and
divide the other column with the updated avg.

Let me know, in-case you or anyone else has any soln. for this.

On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh <
Jayesh.Lalwani@capitalone.com> wrote:

> You could do it if you had a timestamp in your data.  You can use windowed
> operations to divide a value by it’s own average over a window. However, in
> structured streaming, you can only window by timestamp columns. You cannot
> do windows aggregations on integers.
>
>
>
> *From: *Aakash Basu <aakash.spark.raj@gmail.com>
> *Date: *Monday, April 16, 2018 at 4:52 AM
> *To: *"Lalwani, Jayesh" <Jayesh.Lalwani@capitalone.com>
> *Cc: *spark receiver <spark.receiver@gmail.com>, Panagiotis Garefalakis <
> pangaref@gmail.com>, user <user@spark.apache.org>
>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hey Jayesh and Others,
>
> Is there then, any other way to come to a solution for this use-case?
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
> Jayesh.Lalwani@capitalone.com> wrote:
>
> Note that what you are trying to do here is join a streaming data frame
> with an aggregated streaming data frame. As per the documentation, joining
> an aggregated streaming data frame with another streaming data frame is not
> supported
>
>
>
>
>
> *From: *spark receiver <spark.receiver@gmail.com>
> *Date: *Friday, April 13, 2018 at 11:49 PM
> *To: *Aakash Basu <aakash.spark.raj@gmail.com>
> *Cc: *Panagiotis Garefalakis <pangaref@gmail.com>, user <
> user@spark.apache.org>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hi Panagiotis ,
>
>
>
> Wondering you solved the problem or not? Coz I met the same issue today.
> I’d appreciate  so much if you could paste the code snippet  if it’s
> working .
>
>
>
> Thanks.
>
>
>
>
>
> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark.raj@gmail.com> 写道:
>
>
>
> Hi Panagiotis,
>
> I did that, but it still prints the result of the first query and awaits
> for new data, doesn't even goes to the next one.
>
> *Data -*
>
> $ nc -lk 9998
>
> 1,2
> 3,4
> 5,6
> 7,8
>
> *Result -*
>
> -------------------------------------------
> Batch: 0
> -------------------------------------------
> +----+
> |aver|
> +----+
> | 3.0|
> +----+
>
> -------------------------------------------
> Batch: 1
> -------------------------------------------
> +----+
> |aver|
> +----+
> | 4.0|
> +----+
>
>
>
> *Updated Code -*
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
>
> spark = SparkSession \
>     .builder \
>     .appName("StructuredNetworkWordCount") \
>     .getOrCreate()
>
> data = spark \
>     .readStream \
>     .format("socket") \
>     .option("header","true") \
>     .option("host", "localhost") \
>     .option("port", 9998) \
>     .load("csv")
>
>
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), split(data.value,
",").getItem(1).alias("col2"))
>
> id_DF.createOrReplaceTempView("ds")
>
> df = spark.sql("select avg(col1) as aver from ds")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from ds")
 # (select aver from abcd)
>
> query2 = df \
>     .writeStream \
>     .format("console") \
>     .outputMode("complete") \
>     .trigger(processingTime='5 seconds') \
>     .start()
>
> query = wordCounts \
>     .writeStream \
>     .format("console") \
>     .trigger(processingTime='5 seconds') \
>     .start()
>
> spark.streams.awaitAnyTermination()
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <pangaref@gmail.com>
> wrote:
>
> Hello Aakash,
>
>
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
>
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
>
>
> I hope this helps.
>
>
>
> Cheers,
>
> Panagiotis
>
>
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark.raj@gmail.com>
> wrote:
>
> Any help?
>
> Need urgent help. Someone please clarify the doubt?
>
>
>
> ---------- Forwarded message ----------
> From: *Aakash Basu* <aakash.spark.raj@gmail.com>
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user <user@spark.apache.org>
>
> Hi,
>
> If I have more than one writeStream in a code, which operates on the same
> readStream data, why does it produce only the first writeStream? I want the
> second one to be also printed on the console.
>
> How to do that?
>
>
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
>
> class test:
>
>
>     spark = SparkSession.builder \
>         .appName("Stream_Col_Oper_Spark") \
>         .getOrCreate()
>
>     data = spark.readStream.format("kafka") \
>         .option("startingOffsets", "latest") \
>         .option("kafka.bootstrap.servers", "localhost:9092") \
>         .option("subscribe", "test1") \
>         .load()
>
>     ID = data.select('value') \
>         .withColumn('value', data.value.cast("string")) \
>         .withColumn("Col1", split(col("value"), ",").getItem(0)) \
>         .withColumn("Col2", split(col("value"), ",").getItem(1)) \
>         .drop('value')
>
>     ID.createOrReplaceTempView("transformed_Stream_DF")
>
>     df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
>
>     df.createOrReplaceTempView("abcd")
>
>     wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 from
transformed_Stream_DF")
>
>
>     # -----------------------#
>
>     query1 = df \
>         .writeStream \
>         .format("console") \
>         .outputMode("complete") \
>         .trigger(processingTime='3 seconds') \
>         .start()
>
>     query1.awaitTermination()
>     # -----------------------#
>
>     query2 = wordCounts \
>         .writeStream \
>         .format("console") \
>         .trigger(processingTime='3 seconds') \
>         .start()
>
>     query2.awaitTermination()
>
>     # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>
>
>
> Thanks,
>
> Aakash.
>
>
>
>
>
>
>
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Mime
View raw message