No, that seems like a fail-- at least, the part where it fails to let you decide (based on the status of the job) whether/how to execute the PipelineCallable. This seems tied to the idea that the Pipeline interface should give you insight into the PipelineResult object for any job/target/etc through its API-- does that make sense?

J

On Tue, Jan 26, 2016 at 8:50 AM, Igor Bernstein <igorbernstein@spotify.com> wrote:
Hi all,

I was hoping someone can clarify how to handle errors in SequentialDos. I was really surprised to learn that SequentialDos get called for failed Jobs. See example below.

From what I can tell, this happens because CrunchJobControl#executeReadySeqDoFns runs all PipelineCallables whose Targets are not "unfinished". CrunchJobControl#getUnfinishedTargets defines unfinished Targets as Targets that are not produced by waiting, running or ready Jobs.  In other words: Targets that belong to failed Jobs are considered finished.  Is this intentional?

I can see this being useful in some scenarios, where my PipelineCallable can recover from a Job, but I don't see a clear way for my PipelineCallable to detect upstream failure. 

Anyone know if this is a bug and failed targets should be considered as unfinished or is this feature and I'm missing a way for me to check for upstream failure?

Thanks in advance!


Example Code
class Main extends Configured implements Tool {
  @Override
  public int run(String[] args) throws Exception {
    MRPipeline pipeline = new MRPipeline(Main.class);
    PCollection<String> lines = pipeline.read(From.textFile("strings.txt"));

    lines
        .parallelDo(new BrokenMapper(), Writables.strings())
        .sequentialDo("input", new AfterBroken());

    return 0;
  }

  static class BrokenMapper extends MapFn<String, String> {
    @Override
    public String map(String input) {
      throw new RuntimeException("I'm broken");
    }
  }

  static class AfterBroken extends PipelineCallable<Void> {
    @Override
    protected Void getOutput(Pipeline pipeline) {
      return null;
    }

    @Override
    public Status call() throws Exception {
      System.out.println("I should not be called because I don't have a materialized collection");

      PCollection<String> lines = (PCollection<String>) getOnlyPCollection();
      // Surprise! collection hasn't been materialized
      Iterable<String> materialize = lines.materialize();

      return Status.SUCCESS;
    }
  }

  public static void main(String[] args) throws Exception {
    int rc = ToolRunner.run(new Main(), args);
    System.exit(rc);
  }
}