flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julio Biason <julio.bia...@azion.com>
Subject Side outputs never getting consumed
Date Mon, 02 Apr 2018 19:53:23 GMT
Hey guys,

I have a pipeline that generates two different types of data (but both use
the same trait) and I need to save each on a different sink.

So far, things were working with splits, but it seems using splits with
side outputs (for the late data, which we'll plug a late arrival handling)
causes errors, so I changed everything to side outputs.

To select a side output based on type, I did the following:

class MetricTypeSplitter(accountingTag:OutputTag[Metric],
analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {

  val logger = LoggerFactory.getLogger(this.getClass)

  override def processElement(
    value:Metric,
    ctx:ProcessFunction[Metric, Metric]#Context,
    out:Collector[Metric]
  ): Unit = {
    out.collect(value)
    value match {
      case record:AccountingMetric => {
        logger.info(s"Sending ${record} to Accounting")
        ctx.output(accountingTag, record)
      }
      case record:AnalysingMetric => {
        logger.info(s"Sending ${record} to Analysis")
        ctx.output(analysingTag, record)
      }
      case _ => {
        logger.error(s"Don't know the type of ${value}")
      }
    }
  }
}

And at the end of the pipeline I add the splitter:

    pipeline
      .process(new MetricTypeSplitter(accountTag, analysisTag))

So far, this works and I can see the logs of which tag each metric in being
sent being generated. The second part, in which I capture the side output
and send the data to sink, doesn't seem to work, though:

    pipeline
      .getSideOutput(accountTag)
      .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")

.writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")


And here is the problem: It seems .getSideOutput() is never actually
getting the side output because a the logger in AccoutingSink.toRow() is
never happening and the data is not showing on our database (toRow()
convers the Metric to a Row and accountingSInk.output returns the
JDBCOutputFormat).

Any ideas what I need to do for side outputs to be actually captured?

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
<callto:+5551996209291>*99907 0554*

Mime
View raw message