Hi all,
I was hoping someone can clarify how to handle errors in SequentialDos. I
was really surprised to learn that SequentialDos get called for failed
Jobs. See example below.
>From what I can tell, this happens because
CrunchJobControl#executeReadySeqDoFns runs all PipelineCallables whose
Targets are not "unfinished". CrunchJobControl#getUnfinishedTargets defines
unfinished Targets as Targets that are not produced by waiting, running or
ready Jobs. In other words: Targets that belong to failed Jobs are
considered finished. Is this intentional?
I can see this being useful in some scenarios, where my PipelineCallable
can recover from a Job, but I don't see a clear way for my PipelineCallable
to detect upstream failure.
Anyone know if this is a bug and failed targets should be considered as
unfinished or is this feature and I'm missing a way for me to check for
upstream failure?
Thanks in advance!
Example Code
class Main extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
MRPipeline pipeline = new MRPipeline(Main.class);
PCollection<String> lines = pipeline.read(From.textFile("strings.txt"));
lines
.parallelDo(new BrokenMapper(), Writables.strings())
.sequentialDo("input", new AfterBroken());
return 0;
}
static class BrokenMapper extends MapFn<String, String> {
@Override
public String map(String input) {
throw new RuntimeException("I'm broken");
}
}
static class AfterBroken extends PipelineCallable<Void> {
@Override
protected Void getOutput(Pipeline pipeline) {
return null;
}
@Override
public Status call() throws Exception {
System.out.println("I should not be called because I don't have a
materialized collection");
PCollection<String> lines = (PCollection<String>)
getOnlyPCollection();
// Surprise! collection hasn't been materialized
Iterable<String> materialize = lines.materialize();
return Status.SUCCESS;
}
}
public static void main(String[] args) throws Exception {
int rc = ToolRunner.run(new Main(), args);
System.exit(rc);
}
}
|