flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vasia Kalavri (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4101) Calculating average in Flink DataStream on window time
Date Thu, 23 Jun 2016 10:04:16 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346204#comment-15346204
] 

Vasia Kalavri commented on FLINK-4101:
--------------------------------------

Hi [~mrakki3110], JIRA is used for reporting bugs or proposing new features. Your question
should be posted either in the user mailing list or SO, as you've already done. I'm closing
this.

> Calculating average in Flink DataStream on window time
> ------------------------------------------------------
>
>                 Key: FLINK-4101
>                 URL: https://issues.apache.org/jira/browse/FLINK-4101
>             Project: Flink
>          Issue Type: Task
>          Components: DataStream API
>    Affects Versions: 1.0.2
>            Reporter: Akshay Shingote
>
> I am using Flink DataStream API where there where racks are available & I want to
calculate "average"of temperature group by rack IDs. My window duration is of 40 seconds &
my window is sliding every 10 seconds...Following is my code where I am calculating sum of
temperatures every 10 seconds for every rackID,but now I want to calculate average temperatures::
> static Properties properties=new Properties();
>     public static Properties getProperties()
>     {
>         properties.setProperty("bootstrap.servers", "54.164.200.104:9092");
>         properties.setProperty("zookeeper.connect", "54.164.200.104:2181");
>         //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder");
>         //properties.setProperty("group.id", "akshay");
>         properties.setProperty("auto.offset.reset", "earliest");
>         return properties;
>     }
>  @SuppressWarnings("rawtypes")
> public static void main(String[] args) throws Exception 
> {
>     StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     Properties props=Program.getProperties();
>     DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent",
new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
>     DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40),
Time.seconds(10)).sum("temperature");
>     env.execute("Temperature Consumer");
> }
> How can I calcluate average temperature for the above example ??



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message