flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6023) Fix Scala snippet into Process Function (Low-level Operations) Doc
Date Fri, 10 Mar 2017 15:41:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905279#comment-15905279
] 

ASF GitHub Bot commented on FLINK-6023:
---------------------------------------

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3510#discussion_r105422108
  
    --- Diff: docs/dev/stream/process_function.md ---
    @@ -176,56 +176,57 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -import org.apache.flink.api.common.state.ValueState;
    -import org.apache.flink.api.common.state.ValueStateDescriptor;
    -import org.apache.flink.streaming.api.functions.ProcessFunction;
    -import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
    -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
    -import org.apache.flink.util.Collector;
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import org.apache.flink.streaming.api.functions.RichProcessFunction
    +import org.apache.flink.streaming.api.functions.ProcessFunction.Context
    +import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
    +import org.apache.flink.util.Collector
     
     // the source data stream
    -DataStream<Tuple2<String, String>> stream = ...;
    +val stream: DataStream[Tuple2[String, String]] = ...
     
     // apply the process function onto a keyed stream
    -DataStream<Tuple2<String, Long>> result = stream
    -    .keyBy(0)
    -    .process(new CountWithTimeoutFunction());
    +val result: DataStream[Tuple2[String, Long]] = stream
    +  .keyBy(0)
    +  .process(new CountWithTimeoutFunction())
     
     /**
    - * The data type stored in the state
    - */
    +  * The data type stored in the state
    +  */
     case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
     
     /**
    - * The implementation of the ProcessFunction that maintains the count and timeouts
    - */
    -class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
    +  * The implementation of the ProcessFunction that maintains the count and timeouts
    +  */
    +class TimeoutStateFunction extends RichProcessFunction[(String, Long), (String, Long)]
{
     
       /** The state that is maintained by this process function */
    -  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
    -      .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
    +  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    +    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
     
     
       override def processElement(value: (String, Long), ctx: Context, out: Collector[(String,
Long)]): Unit = {
         // initialize or retrieve/update the state
    +    val (key, _) = value
    --- End diff --
    
    Sorry i didn't make myself clear. What IDS complains is the variable name `key` is conflicts
with the following lines: 
    ```case CountWithTimestamp(key, count, _) =>
            CountWithTimestamp(key, count + 1, ctx.timestamp)
    ```
    It's not clear whether you want to use the `key` you just defined or the `key` in the
match pattern.


> Fix Scala snippet into Process Function (Low-level Operations) Doc
> ------------------------------------------------------------------
>
>                 Key: FLINK-6023
>                 URL: https://issues.apache.org/jira/browse/FLINK-6023
>             Project: Flink
>          Issue Type: Bug
>          Components: Documentation
>            Reporter: Mauro Cortellazzi
>            Assignee: Mauro Cortellazzi
>            Priority: Trivial
>             Fix For: 1.3.0, 1.2.1
>
>
> The current `/docs/dev/stream/process_function.md` has some errors in the Scala snippet



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message