flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Side outputs never getting consumed
Date Fri, 06 Apr 2018 16:22:07 GMT
Hi Julio,

thanks for this great example. I could reproduce it on my machine and I 
could find the problem.

You need to store the newly created branch of your pipeline in some 
variable like `val test = pipeline.process()` in order to access the 
side outputs via `test.getSideOutput(outputSimple)`. Right now your 
program expects a a side output from the wrong operator (namely the 
window operation).

Regards,
Timo


Am 04.04.18 um 16:35 schrieb Julio Biason:
> Hey Timo,
>
> To be completely honest, I _think_ they are POJO, although I use case 
> classes (because I want our data to be immutable).
>
> I wrote a sample code, which basically reflects our pipeline: 
> https://github.com/jbiason/FlinkSample/blob/master/src/main/scala/net/juliobiason/SideoutputSample.scala
>
> The thing to notice is that we do the split to side outputs _after_ 
> the window functions -- because we want to split the results just 
> before the sinks (we had a split there instead, but the job would, 
> sometimes, crash because "splits can't be used with side outputs", or 
> something around those lines). Are we correct in assume that there 
> can't be side outputs once a window is processed?
>
> On Tue, Apr 3, 2018 at 10:17 AM, Timo Walther <twalthr@apache.org 
> <mailto:twalthr@apache.org>> wrote:
>
>     Hi Julio,
>
>     I tried to reproduce your problem locally but everything run
>     correctly. Could you share a little example job with us?
>
>     This worked for me:
>
>     class TestingClass {
>        var hello:Int =0 }
>
>     class TestAextends TestingClass {
>        var test:String = _
>     }
>
>     def main(args: Array[String]) {
>
>        // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment
// get input data val text = env.fromElements(WordCountData.WORDS: _*)
>
>        val outputTag =OutputTag[(String, Int)]("side-output")
>        val outputTag2 =OutputTag[TestingClass]("side-output2")
>
>        val counts: DataStream[(String, Int)] = text
>          // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\W+"))
>          .filter(_.nonEmpty)
>          .map((_, 1))
>          // group by the tuple field "0" and sum up tuple field "1" .keyBy(0)
>          .sum(1)
>            .process(new ProcessFunction[(String, Int), (String, Int)] {
>              override def processElement(value: (String, Int), ctx: ProcessFunction[(String,
Int), (String, Int)]#Context, out: Collector[(String, Int)]):Unit = {
>                ctx.output(outputTag, value)
>                ctx.output(outputTag2, new TestingClass)
>                ctx.output(outputTag2, new TestA)
>              }
>            })
>
>        counts.getSideOutput(outputTag).print()
>        counts.getSideOutput(outputTag2).print()
>
>        // execute program env.execute("Streaming WordCount")
>     }
>
>
>     Are the Metric classes proper POJO types?
>
>     Regards,
>     Timo
>
>
>     Am 02.04.18 um 21:53 schrieb Julio Biason:
>>     Hey guys,
>>
>>     I have a pipeline that generates two different types of data (but
>>     both use the same trait) and I need to save each on a different sink.
>>
>>     So far, things were working with splits, but it seems using
>>     splits with side outputs (for the late data, which we'll plug a
>>     late arrival handling) causes errors, so I changed everything to
>>     side outputs.
>>
>>     To select a side output based on type, I did the following:
>>
>>     class MetricTypeSplitter(accountingTag:OutputTag[Metric],
>>     analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric,
>>     Metric] {
>>
>>       val logger = LoggerFactory.getLogger(this.getClass)
>>
>>       override def processElement(
>>         value:Metric,
>>         ctx:ProcessFunction[Metric, Metric]#Context,
>>         out:Collector[Metric]
>>       ): Unit = {
>>         out.collect(value)
>>         value match {
>>           case record:AccountingMetric => {
>>     logger.info <http://logger.info>(s"Sending ${record} to Accounting")
>>             ctx.output(accountingTag, record)
>>           }
>>           case record:AnalysingMetric => {
>>     logger.info <http://logger.info>(s"Sending ${record} to Analysis")
>>             ctx.output(analysingTag, record)
>>           }
>>           case _ => {
>>             logger.error(s"Don't know the type of ${value}")
>>           }
>>         }
>>       }
>>     }
>>
>>     And at the end of the pipeline I add the splitter:
>>
>>         pipeline
>>           .process(new MetricTypeSplitter(accountTag, analysisTag))
>>
>>     So far, this works and I can see the logs of which tag each
>>     metric in being sent being generated. The second part, in which I
>>     capture the side output and send the data to sink, doesn't seem
>>     to work, though:
>>
>>         pipeline
>>           .getSideOutput(accountTag)
>>           .map { tuple => AccountingSink.toRow(tuple)
>>     }.name("Accounting rows")
>>     .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")
>>
>>     And here is the problem: It seems .getSideOutput() is never
>>     actually getting the side output because a the logger in
>>     AccoutingSink.toRow() is never happening and the data is not
>>     showing on our database (toRow() convers the Metric to a Row and
>>     accountingSInk.output returns the JDBCOutputFormat).
>>
>>     Any ideas what I need to do for side outputs to be actually captured?
>>
>>     -- 
>>     *Julio Biason*,Sofware Engineer
>>     *AZION*  | Deliver. Accelerate. Protect.
>>     Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55
>>     51 <callto:+5551996209291>_99907 0554_
>
>
>
>
>
> -- 
> *Julio Biason*,Sofware Engineer
> *AZION*  | Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51 
> <callto:+5551996209291>_99907 0554_



Mime
View raw message