No, that seems like a fail-- at least, the part where it fails to let you decide (based on the status of the job) whether/how to execute the PipelineCallable. This seems tied to the idea that the Pipeline interface should give you insight into the PipelineResult object for any job/target/etc through its API-- does that make sense? J On Tue, Jan 26, 2016 at 8:50 AM, Igor Bernstein wrote: > Hi all, > > I was hoping someone can clarify how to handle errors in SequentialDos. I > was really surprised to learn that SequentialDos get called for failed > Jobs. See example below. > > From what I can tell, this happens because > CrunchJobControl#executeReadySeqDoFns runs all PipelineCallables whose > Targets are not "unfinished". CrunchJobControl#getUnfinishedTargets defines > unfinished Targets as Targets that are not produced by waiting, running or > ready Jobs. In other words: Targets that belong to failed Jobs are > considered finished. Is this intentional? > > I can see this being useful in some scenarios, where my PipelineCallable > can recover from a Job, but I don't see a clear way for my PipelineCallable > to detect upstream failure. > > Anyone know if this is a bug and failed targets should be considered as > unfinished or is this feature and I'm missing a way for me to check for > upstream failure? > > Thanks in advance! > > > Example Code > class Main extends Configured implements Tool { > @Override > public int run(String[] args) throws Exception { > MRPipeline pipeline = new MRPipeline(Main.class); > PCollection lines = > pipeline.read(From.textFile("strings.txt")); > > lines > .parallelDo(new BrokenMapper(), Writables.strings()) > .sequentialDo("input", new AfterBroken()); > > return 0; > } > > static class BrokenMapper extends MapFn { > @Override > public String map(String input) { > throw new RuntimeException("I'm broken"); > } > } > > static class AfterBroken extends PipelineCallable { > @Override > protected Void getOutput(Pipeline pipeline) { > return null; > } > > @Override > public Status call() throws Exception { > System.out.println("I should not be called because I don't have a > materialized collection"); > > PCollection lines = (PCollection) > getOnlyPCollection(); > // Surprise! collection hasn't been materialized > Iterable materialize = lines.materialize(); > > return Status.SUCCESS; > } > } > > public static void main(String[] args) throws Exception { > int rc = ToolRunner.run(new Main(), args); > System.exit(rc); > } > } >