flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: SideOutput Issue
Date Wed, 04 Apr 2018 10:21:00 GMT
Hi,

which version of Flink are you using?

Could you provide us with a reproducing example? I tried reproducing it 
based on the information you provided in the following code, but it runs 
fine for me:

     private static final OutputTag<String> tag = new 
OutputTag<String>("test"){};

     public static void main(String[] args) throws Exception {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

         DataStream<String> text1 = env.fromElements("foo");
         DataStream<String> text2 = env.fromElements("bar");

         SingleOutputStreamOperator<String> process = text1.connect(text2)
.process(new CoProcessFunction<String, String, String>() {
@Override
public void processElement1(String value, Context ctx, Collector<String> 
out) throws Exception {
}

@Override
public void processElement2(String value, Context ctx, Collector<String> 
out) throws Exception {
}
         });

         process.getSideOutput(tag).print();

         // execute program
         env.execute("Streaming WordCount");
     }
On 03.04.2018 19:55, Navneeth Krishnan wrote:
> Hi All,
>
> I'm having issues with creating side outputs. There are two input 
> sources (both from kafka) and they are connected and fed into a 
> co-process function. Inside the co-process, the regular data stream 
> outputs a POJO and in processElement2 there is a periodic timer which 
> creates the side output. When I start the job I get the below 
> exception. Is there something that I'm doing wrong?
>
> I used the below example to implement the side output.
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
>
> processElement2
> ctx.output("side-output", POJO);
>
> Job
> dataStream.getSideOutput("side-output").print();
>
>
> 2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO 
> org.apache.flink.runtime.taskmanager.Task  - Co-Flat Map (4/8) 
> (20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
> 2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO 
> org.apache.flink.runtime.taskmanager.Task  - Co-Process (1/8) 
> (fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
> java.lang.NullPointerException: null
> at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
> at 
> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> 2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO 
> org.apache.flink.runtime.taskmanager.Task  - Co-Process (7/8) 
> (a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
> java.lang.NullPointerException: null
> at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.<init>(DirectedOutput.java:74)
> at 
> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.<init>(CopyingDirectedOutput.java:40)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> Thanks



Mime
View raw message