beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Simon Waddington (JIRA)" <j...@apache.org>
Subject [jira] [Created] (BEAM-5524) PTransform style guide error
Date Fri, 28 Sep 2018 23:32:00 GMT
Simon Waddington created BEAM-5524:
--------------------------------------

             Summary: PTransform style guide error
                 Key: BEAM-5524
                 URL: https://issues.apache.org/jira/browse/BEAM-5524
             Project: Beam
          Issue Type: Bug
          Components: website
    Affects Versions: 2.6.0
            Reporter: Simon Waddington
            Assignee: Melissa Pashniak


On the page  [https://beam.apache.org/contribute/ptransform-style-guide/] the example for Transforms
with multiple output collections has a method 
{code:java}
PCollectionTuple apply(... input) {
    ...
    PCollection<Moo> moo = ...;
    PCollection<Blah> blah = ...;
    return PCollectionTuple.of(mooTag, moo)
                           .and(blahTag, blah);
  }{code}
I'm no expert - just trying to understand that use case - but it seems to me it should be
overriding the method {{expand}} of {{PTransform}} as in this [ParDoTest.java|https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L301]
code which does a similar thing.
{code:java}
static class MultiFilter
    extends PTransform<PCollection<Integer>, PCollectionTuple> {

  private static final TupleTag<Integer> BY2 = new TupleTag<Integer>("by2"){};
  private static final TupleTag<Integer> BY3 = new TupleTag<Integer>("by3"){};

  @Override
  public PCollectionTuple expand(PCollection<Integer> input) {
    PCollection<Integer> by2 = input.apply("Filter2s", ParDo.of(new FilterFn(2)));
    PCollection<Integer> by3 = input.apply("Filter3s", ParDo.of(new FilterFn(3)));
    return PCollectionTuple.of(BY2, by2).and(BY3, by3);
  }

  static class FilterFn extends DoFn<Integer, Integer> {
    private final int divisor;

    FilterFn(int divisor) {
      this.divisor = divisor;
    }

    @ProcessElement
    public void processElement(@Element Integer element,
                               OutputReceiver<Integer> r) throws Exception {
      if (element % divisor == 0) {
        r.output(element);
      }
    }
  }
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message