spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Calculate average from Spark stream
Date Mon, 17 May 2021 14:32:56 GMT
Hi Giuseppe,

Your error state --> Required attribute 'value' not found

First can you read your streaming data OK?

Here in my stream in data format in json. I have three columns in json
format

example:

{"rowkey":"f0577406-a7d3-4c52-9998-63835ea72133",
"timestamp":"2021-05-17T15:17:27", "temperature":27}

The first column is UUID, the second is timestamp and third is temperature.

I need to tell SSS how the columns are formatted

I define the schema as follows:

             schema = StructType().add("rowkey",
StringType()).add("timestamp", TimestampType()).add("temperature",
IntegerType())
       checkpoint_path = "file:///ssd/hduser/temperature2/chkpt"
        try:

            # construct a streaming dataframe streamingDataFrame that
subscribes to topic temperature
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", "temperature") \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                *.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))  ## note the value here*

            ## get the individual columns from schema
            resultM = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.timestamp").alias("timestamp") \
                   , col("parsed_value.temperature").alias("temperature"))

            ## Here I do my windowing and tell that I am interested in
avg("temperature") over timestamp

            result = resultM. \
                     withWatermark("timestamp", "5 minutes"). \
                     groupBy(window(resultM.timestamp, "5 minutes", "5
minutes")). \
                     avg('temperature'). \
                     writeStream. \
                     outputMode('complete'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("temperature"). \
                     start()

        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

        #print(result.status)
        #print(result.recentProgress)
        #print(result.lastProgress)

        result.awaitTermination()

 This works. I attach the py code for you. Have a look at it

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 17 May 2021 at 15:00, Giuseppe Ricci <peppepegasus@gmail.com> wrote:

> Hi Mich, Hi all,
>
> Thank you for your precious support..it seems your solution worked!
>
> 21/05/17 15:53:38 WARN HDFSBackedStateStoreProvider: The state for version
> 83 doesn't exist in loadedMaps. Reading snapshot file and delta files if
> needed...Note that this is normal for the first batch of starting query.
> -------------------------------------------
> Batch: 83
> -------------------------------------------
> +------------------------------------------+------------------+
> |window                                    |avg(temperature)  |
> +------------------------------------------+------------------+
> |{2021-05-13 15:02:30, 2021-05-13 15:02:40}|11.90999984741211 |
> |{2021-05-14 16:04:20, 2021-05-14 16:04:30}|12.859999656677246|
> |{2021-05-13 16:04:10, 2021-05-13 16:04:20}|18.649999618530273|
> |{2021-05-14 16:03:30, 2021-05-14 16:03:40}|18.540000915527344|
> |{2021-05-13 16:01:10, 2021-05-13 16:01:20}|19.889999389648438|
> |{2021-05-13 16:01:50, 2021-05-13 16:02:00}|16.489999771118164|
> |{2021-05-14 16:02:30, 2021-05-14 16:02:40}|13.640000343322754|
>
>
> I try to save data on another Kafka topic but my solution it doesn't work:
>
> qk = (resultM.
>       selectExpr("CAST(timestamp AS STRING)", "CAST(temperature AS
> STRING)") \
>       .writeStream \
>       .format("kafka") \
>       .option("kafka.bootstrap.servers", "localhost:9092") \
>       .option('checkpointLocation',
> "/home/kafka/Documenti/confluent/examples-6.1.0-post/clients/cloud/python/kafkaStream")
> \
>       .option("topic", "avgtemperature") \
>       .start())
>
> because I receive the error:
>
> 21/05/17 15:56:29 WARN StreamingQueryManager: Stopping existing streaming
> query [id=81f48019-534c-446e-90a5-a90598883370,
> runId=ad277cbb-e906-4d60-8d9c-0f24285041c6], as a new run is being started.
> 21/05/17 15:56:29 ERROR MicroBatchExecution: Query [id =
> 81f48019-534c-446e-90a5-a90598883370, runId =
> 0fd83640-1176-4695-a7e5-b65717f46a9a] terminated with error
> org.apache.spark.sql.AnalysisException: Required attribute 'value' not
> found
>         at
> org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:59)
>         at
> org.apache.spark.sql.kafka010.KafkaStreamingWrite.<init>(KafkaStreamingWrite.scala:42)
>         at
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable$$anon$2.buildForStreaming(KafkaSourceProvider.scala:411)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:623)
>         at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:144)
>         at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:62)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:321)
>         at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
>
> Is my solution wrong?
> Thanks.
>
> PhD. Giuseppe Ricci
>
>
>
>
> Il giorno sab 15 mag 2021 alle ore 23:47 Mich Talebzadeh <
> mich.talebzadeh@gmail.com> ha scritto:
>
>>
>> Hi,
>>
>> In answer to your question I did some tests using broadly your approach.
>> With regard to your questions:
>>
>> "but it does not work well because it does not give a temperature average
>> as you can see in the attached pic.
>> Why is the average not calculated on temperature?
>> How can I view data in each window of 5 minutes and related average?
>>
>> This is similar to the code you are doing
>>
>>            streamingDataFrame = self.spark \
>>                 .readStream \
>>                 .format("kafka") \
>>                 .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>                 .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>>                 .option("group.id", config['common']['appName']) \
>>                 .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>                 .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>>                 .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>                 .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>>                 .option("subscribe", "temperature") \
>>                 .option("failOnDataLoss", "false") \
>>                 .option("includeHeaders", "false") \
>>                 .option("startingOffsets", "latest") \
>>                 .load()
>>
>>             streamingDataFrame.printSchema()
>>             result = streamingDataFrame. \
>> *                     withWatermark("timestamp", "5 minutes"). \*
>> *                     groupBy(window(streamingDataFrame.timestamp, "5
>> minutes", "5 minutes")). \*
>>  *                    avg(). \*
>>                      writeStream. \
>>                      outputMode('complete'). \
>>                      option("numRows", 100). \
>>                      option("truncate", "false"). \
>>                      format('console'). \
>>                      option('checkpointLocation', checkpoint_path). \
>>                      queryName("temperature"). \
>>                      start()
>>
>> OK
>>
>> To simulate the schema of your data which I believe comprises two keys;
>> timestamp, temperature. I am sending a line of temperature to kafka every
>> minute. Single message every minute, for temperature between 20-30 degrees.
>> An example
>>
>> {"timestamp":"2021-05-15T22:16:31", "temperature":29}
>>
>> So let us print the schema
>>
>> streamingDataFrame.printSchema()
>>
>> root
>>  |-- key: binary (nullable = true)
>>  |-- value: binary (nullable = true)
>>  |-- topic: string (nullable = true)
>>  |-- partition: integer (nullable = true)
>>  |-- offset: long (nullable = true)
>>  |-- timestamp: timestamp (nullable = true)
>>  |-- timestampType: integer (nullable = true)
>>
>> There is no temperature there as you have not created a temperature
>> column from json ( see later), So this is what you get if you run this
>> code. Note the batch cycle is 1 minute in my case
>>
>> -------------------------------------------
>> Batch: 2
>> -------------------------------------------
>>
>> +------------------------------------------+--------------+-----------+------------------+
>> |window
>> |avg(partition)|avg(offset)|avg(timestampType)|
>>
>> +------------------------------------------+--------------+-----------+------------------+
>> |{2021-05-15 22:05:00, 2021-05-15 22:10:00}|7.0           |7071.0
>>  |0.0               |
>> |{2021-05-15 22:00:00, 2021-05-15 22:05:00}|0.0           |7117.0
>>  |0.0               |
>>
>> +------------------------------------------+--------------+-----------+------------------+
>>
>> -------------------------------------------
>> Batch: 3
>> -------------------------------------------
>>
>> +------------------------------------------+--------------+-----------+------------------+
>> |window
>> |avg(partition)|avg(offset)|avg(timestampType)|
>>
>> +------------------------------------------+--------------+-----------+------------------+
>> |{2021-05-15 22:05:00, 2021-05-15 22:10:00}|5.5           |7147.5
>>  |0.0               |
>> |{2021-05-15 22:00:00, 2021-05-15 22:05:00}|0.0           |7117.0
>>  |0.0               |
>>
>> +------------------------------------------+--------------+-----------+------------------+
>>
>> So this is I think what you need to do with your schema
>>
>>
>>         schema = StructType().add("timestamp",
>> TimestampType()).add("temperature", IntegerType())
>>
>>
>>             streamingDataFrame = self.spark \
>>
>>                 .readStream \
>>
>>                 .format("kafka") \
>>
>>                 .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>
>>                 .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>>
>>                 .option("group.id", config['common']['appName']) \
>>
>>                 .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>
>>                 .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>>
>>                 .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>
>>                 .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>>
>>                 .option("subscribe", "temperature") \
>>
>>                 .option("failOnDataLoss", "false") \
>>
>>                 .option("includeHeaders", "true") \
>>
>>                 .option("startingOffsets", "latest") \
>>
>>                 .load() \
>>
>>                 *.select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))*
>>
>>
>>                       # get columns from struct
>>
>>      *       resultM = streamingDataFrame.select( \*
>>
>> *                     col("parsed_value.timestamp").alias("timestamp") \*
>>
>> *                   ,
>> col("parsed_value.temperature").alias("temperature"))*
>>
>>
>>                  result = resultM. \
>>
>>                      withWatermark("timestamp", "5 minutes"). \
>>
>>                      groupBy(window(resultM.timestamp, "5 minutes", "5
>> minutes")). \
>>
>>                      avg(). \
>>
>>                      writeStream. \
>>
>>                      outputMode('complete'). \
>>
>>                      option("numRows", 100). \
>>
>>                      option("truncate", "false"). \
>>
>>                      format('console'). \
>>
>>                      option('checkpointLocation', checkpoint_path). \
>>
>>                      queryName("temperature"). \
>>
>>                      start()
>>
>>
>> And you will get
>>
>>
>>
>> -------------------------------------------
>> Batch: 1
>> -------------------------------------------
>> +------------------------------------------+----------------+
>> |window                                    |avg(temperature)|
>> +------------------------------------------+----------------+
>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|28.0            |
>> +------------------------------------------+----------------+
>>
>> -------------------------------------------
>> Batch: 2
>> -------------------------------------------
>> +------------------------------------------+----------------+
>> |window                                    |avg(temperature)|
>> +------------------------------------------+----------------+
>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>> +------------------------------------------+----------------+
>>
>> Batch: 3
>> -------------------------------------------
>> +------------------------------------------+----------------+
>> |window                                    |avg(temperature)|
>> +------------------------------------------+----------------+
>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|22.0            |
>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>> +------------------------------------------+----------------+
>>
>> Batch: 4
>> -------------------------------------------
>> +------------------------------------------+----------------+
>> |window                                    |avg(temperature)|
>> +------------------------------------------+----------------+
>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|26.0            |
>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>> +------------------------------------------+----------------+
>>
>> Batch: 5
>> -------------------------------------------
>> +------------------------------------------+------------------+
>> |window                                    |avg(temperature)  |
>> +------------------------------------------+------------------+
>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.3              |
>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5              |
>> +------------------------------------------+------------------+
>>
>> Batch: 6
>> -------------------------------------------
>> +------------------------------------------+----------------+
>> |window                                    |avg(temperature)|
>> +------------------------------------------+----------------+
>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.75           |
>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>> +------------------------------------------+----------------+
>>
>> Batch: 7
>> -------------------------------------------
>> +------------------------------------------+----------------+
>> |window                                    |avg(temperature)|
>> +------------------------------------------+----------------+
>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.0            |
>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>> +------------------------------------------+----------------+
>>
>> -------------------------------------------
>> Batch: 8
>> -------------------------------------------
>> +------------------------------------------+----------------+
>> |window                                    |avg(temperature)|
>> +------------------------------------------+----------------+
>> |{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.0            |
>> |{2021-05-15 22:40:00, 2021-05-15 22:45:00}|26.0            |
>> |{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
>> +------------------------------------------+----------------+
>>
>> This should be all you need I believe.
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 11 May 2021 at 14:42, Giuseppe Ricci <peppepegasus@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> As suggested by Jayesh I follow his solution.
>>> I need to have the average temperature at some prefixed minute: 5, 10,
>>> 15 etc. So it seems a tumbling window is the optimal solution (a).
>>> Real sensors may send data with some delay..this can be few seconds (b).
>>> So this is my new code (I used a window of 5 minutes):
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import StringType
>>>
>>> # Spark session & context
>>> spark = (SparkSession
>>>          .builder
>>>          .master('local')
>>>          .appName('TemperatureStreamApp')
>>>          # Add kafka package
>>>          .config("spark.jars.packages",
>>> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
>>>          .getOrCreate())
>>>
>>> sc = spark.sparkContext
>>>
>>> # Create stream dataframe setting kafka server, topic and offset option
>>> df = (spark
>>>   .readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "localhost:9092") # kafka server
>>>   .option("subscribe", "temperature") # topic
>>>   .option("startingOffsets", "earliest") # start from beginning
>>>   .load())
>>>
>>> windowedAvg = df\
>>>     .withWatermark("timestamp", "5 minutes") \
>>>     .groupBy(
>>>     window(df.timestamp, "5 minutes", "5 minutes")).avg()
>>>
>>> query = windowedAvg\
>>>         .writeStream\
>>>         .outputMode('complete')\
>>>         .format('console')\
>>>         .option('truncate', 'false')\
>>>         .start()
>>>
>>> query.awaitTermination()
>>>
>>>
>>> but it does not work well because it does not give a temperature average
>>> as you can see in the attached pic.
>>> Why the average is not calculated on temperature?
>>> How can I view data in each window of 5 minutes and related average?
>>> Thanks for your help.
>>>
>>>
>>> PhD. Giuseppe Ricci
>>>
>>>
>>> Il giorno lun 10 mag 2021 alle ore 18:14 Lalwani, Jayesh <
>>> jlalwani@amazon.com> ha scritto:
>>>
>>>> You don’t need to “launch batches” every 5 minutes. You can launch
>>>> batches every 2 seconds, and aggregate on window for 5 minutes. Spark will
>>>> read data from topic every 2 seconds, and keep the data in memory for 5
>>>> minutes.
>>>>
>>>>
>>>>
>>>> You need to make few decisions
>>>>
>>>>    1. DO you want a tumbling window or a rolling window? A tumbling
>>>>    window of 5 minutes will produce an aggregate every 5 minutes. It will
>>>>    aggregate data for 5 minutes before. A rolling window of 5 miutes/1 minute,
>>>>    will produce an aggregate ever 1 minute. It will aggregate data ever 1
>>>>    minute. For example, let’s say you have data evert 2 seconds. A tumbling
>>>>    window will produce a result on minute 5, 10, 15, 20…. Minute 5 result
will
>>>>    have data from minute 1-4., 15 will have data from 6-10… and so on.
Rolling
>>>>    window will produce data on minute 5, 6, 7, 8, …. Minute 5 will have
>>>>    aggregate from 1-5, minute 6 will have aggregate from 2-6, and so on.
This
>>>>    defines your window. In your code you have
>>>>
>>>>
>>>> window(df_temp.timestamp, "2 minutes", "1 minutes")
>>>>
>>>> This is a rolling window. Here second parameter(2 minutes) is the
>>>> window interval, and third parameter(1 minutes) is the slide interval. In
>>>> the above example, it will produce an aggregate every 1 minute interval for
>>>> 2minute worth of data.
>>>>
>>>> If you define
>>>>
>>>> window(df_temp.timestamp, "2 minutes", "2 minutes")
>>>>
>>>> This is a tumbling window. It will produce an aggregate every 2
>>>> minutes, with 2 minutes worth of data
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>    1. Can you have late data? How late can data arrive? Usually
>>>>    streaming systems send data out of order. Liik, it could happen that you
>>>>    get data for t=11:00:00 AM, and then get data for t=10:59:59AM. This means
>>>>    that the data is late by 1 second. What’s the worst case condition for
late
>>>>    data? You need to define the watermark for late data. In your code, you
>>>>    have defined a watermark of 2 minutes. For aggregations, the watermark
also
>>>>    defines which windows Spark will keep in memory. If you define a watermark
>>>>    of 2 minutes, and you have a rolling window with slide interval of 1
>>>>    minute, Spark will keep 2 windows in memory. Watermark interval affects
how
>>>>    much memory will be used by Spark
>>>>
>>>>
>>>>
>>>> It might help if you try to follow the example in this guide very
>>>> carefully
>>>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time
>>>> That is a pretty good example, but you need to follow it event by event
>>>> very carefully to get all the nuances.
>>>>
>>>>
>>>>
>>>> *From: *Giuseppe Ricci <peppepegasus@gmail.com>
>>>> *Date: *Monday, May 10, 2021 at 11:19 AM
>>>> *To: *"user@spark.apache.org" <user@spark.apache.org>
>>>> *Subject: *[EXTERNAL] Calculate average from Spark stream
>>>>
>>>>
>>>>
>>>> *CAUTION*: This email originated from outside of the organization. Do
>>>> not click links or open attachments unless you can confirm the sender and
>>>> know the content is safe.
>>>>
>>>>
>>>>
>>>> Hi, I'm new on Apache Spark.
>>>>
>>>> I'm trying to read data from an Apache Kafka topic (I have a simulated
>>>> temperature sensor producer which sends data every 2 second) and I need
>>>> every 5 minutes to calculate the average temperature. Reading documentation
>>>> I understand I need to use windows but I'm not able to finalize my code.
>>>> Can some help me?
>>>> How can I launch batches every 5 minutes? My code works one time and
>>>> finishes. Why in the console I can't find any helpful information for
>>>> correct execution? See attached picture.
>>>>
>>>> This is my code:
>>>>
>>>> https://pastebin.com/4S31jEeP
>>>>
>>>>
>>>>
>>>> Thanks for your precious help.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> PhD. Giuseppe Ricci
>>>>
>>>>
>>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>

Mime
View raw message