From issues-return-34375-apmail-flink-issues-archive=flink.apache.org@flink.apache.org Mon Oct 26 20:02:28 2015 Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0C475182A0 for ; Mon, 26 Oct 2015 20:02:28 +0000 (UTC) Received: (qmail 47414 invoked by uid 500); 26 Oct 2015 20:02:27 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 47352 invoked by uid 500); 26 Oct 2015 20:02:27 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 47328 invoked by uid 99); 26 Oct 2015 20:02:27 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Oct 2015 20:02:27 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id B54022C14E1 for ; Mon, 26 Oct 2015 20:02:27 +0000 (UTC) Date: Mon, 26 Oct 2015 20:02:27 +0000 (UTC) From: "Aljoscha Krettek (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-2922) Add Queryable Window Operator MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/FLINK-2922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14974914#comment-14974914 ] Aljoscha Krettek commented on FLINK-2922: ----------------------------------------- I don't exactly know what you mean by that but this would still require a special kind of operator to support that, that's where the queryable window operator comes in. This is a mockup of what it would look like in practice: {code} DataStream text = env.socketTextStream("localhost", 9999); DataStream query = env.socketTextStream("localhost", 9998); WindowStreamOperator, Tuple2> winStream = text .flatMap(new WordCount.Tokenizer()) .keyBy(0) .countWindow(10) .query(query.keyBy(new IdentityKey())) .apply(new WindowFunction, Tuple2, Tuple, GlobalWindow>() { private static final long serialVersionUID = 1L; @Override public void apply(Tuple tuple, GlobalWindow window, Iterable> values, Collector> out) throws Exception { int sum = 0; for (Tuple2 val : values) { sum += val.f1; } out.collect(Tuple2.of((String) tuple.getField(0), sum)); } }); winStream.print(); // WindowResult // QT = query type // T = window result type DataStream>> queryResults = winStream.getQueryResultStream(); querResults.print(); {code} > Add Queryable Window Operator > ----------------------------- > > Key: FLINK-2922 > URL: https://issues.apache.org/jira/browse/FLINK-2922 > Project: Flink > Issue Type: Improvement > Components: Streaming > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > > The idea is to provide a window operator that allows to query the current window result at any time without discarding the current result. > For example, a user might have an aggregation window operation with tumbling windows of 1 hour. Now, at any time they might be interested in the current aggregated value for the currently in-flight hour window. > The idea is to make the operator a two input operator where normal elements arrive on input one while queries arrive on input two. The query stream must be keyed by the same key as the input stream. If an input arrives for a key the current value for that key is emitted along with the query element so that the user can map the result to the query. -- This message was sent by Atlassian JIRA (v6.3.4#6332)