From user-return-1127-apmail-crunch-user-archive=crunch.apache.org@crunch.apache.org Tue Jan 26 20:15:38 2016 Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 22BDD18BD7 for ; Tue, 26 Jan 2016 20:15:38 +0000 (UTC) Received: (qmail 93972 invoked by uid 500); 26 Jan 2016 20:15:38 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 93929 invoked by uid 500); 26 Jan 2016 20:15:38 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 93917 invoked by uid 99); 26 Jan 2016 20:15:37 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2016 20:15:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8C16BC246A for ; Tue, 26 Jan 2016 20:15:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.88 X-Spam-Level: ** X-Spam-Status: No, score=2.88 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id HaIHq4npVVib for ; Tue, 26 Jan 2016 20:15:32 +0000 (UTC) Received: from mail-oi0-f46.google.com (mail-oi0-f46.google.com [209.85.218.46]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id E711131AC0 for ; Tue, 26 Jan 2016 20:15:30 +0000 (UTC) Received: by mail-oi0-f46.google.com with SMTP id o124so112090495oia.3 for ; Tue, 26 Jan 2016 12:15:30 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=u0YZmU9S2UOc7WYzzCvM7/Dgfe8h8OxXY2Hr8OHn7C8=; b=aEIf+9kwJPxRB+nnz3HuPbc4WMWG8vcGXviMvMQL+dlHfc8aDpL/YV0T6FCPoWFR6s oBvKQ9O7CQGn0zBF2RWWl12M3THYdCvVgIxNJiPTmZmCD/fcBGWZwv5T+2FFAdvRSGgK ItypwFr7sSM9+ETacS5BDLeGuMiC3y34nrfRUWwYYM7NoTU9rq/k+UVEvjCynzOW3Z/L srfRjAx0XZ2/pKNuIbBTBfUU2iFjQxkk2z55aAC2Vq9i2LZr9PPJOlwb2Y+k3KqrOm6Z 6EL9WJciIJRszwsBzTDGdSzNRhZZzQYuUxMHi0Rfqt4EkcvtTSleNTlYnXRVcWau4uyD iKeg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=u0YZmU9S2UOc7WYzzCvM7/Dgfe8h8OxXY2Hr8OHn7C8=; b=SO2CG2tbVlXMt3JOVWbPSeBOkHryBngCxb2ByZq/xyRZvE33pv0n8hLtUt8H5HjhmK Lf1wpwOE28vHNPavTbKeKg5c9jLqJpYkz4unbVOuF4m4vyvU5lOZcT+htIpJ7p1/0cw4 95g0OSKK9NvHvGxVEunP0YxEFev2E0WYK/jsNrvWtl0BkMZVsJJIvVF/9Aquuks9fFKn 2rY53ozCIZXFn6VzfOnAq/hbhb7vkiCevXfR2rbVkscOYGB0zhLK8nlQR/KCrg3U1HCj ueFSgLlBhkdYYDYwxQcyT7fVEkqwNzOCtzyyp05IDhacc5mQSYsSxosWdRpNFLn4c2jS LyjA== X-Gm-Message-State: AG10YORmKbxpSWEV5BOVwOqCY8hjnltCDMkOu3mAGcMKv4chC7XSiWlsyMFbxu5b4H2HfzRW7bvywPPlGbhSuQ== X-Received: by 10.202.78.19 with SMTP id c19mr18076183oib.71.1453839329786; Tue, 26 Jan 2016 12:15:29 -0800 (PST) MIME-Version: 1.0 Received: by 10.202.75.210 with HTTP; Tue, 26 Jan 2016 12:14:32 -0800 (PST) In-Reply-To: References: From: Josh Wills Date: Tue, 26 Jan 2016 12:14:32 -0800 Message-ID: Subject: Re: SequentialDo for failed Jobs: bug or feature? To: "user@crunch.apache.org" Content-Type: multipart/alternative; boundary=001a11c18094d001c6052a4258e4 --001a11c18094d001c6052a4258e4 Content-Type: text/plain; charset=UTF-8 No, that seems like a fail-- at least, the part where it fails to let you decide (based on the status of the job) whether/how to execute the PipelineCallable. This seems tied to the idea that the Pipeline interface should give you insight into the PipelineResult object for any job/target/etc through its API-- does that make sense? J On Tue, Jan 26, 2016 at 8:50 AM, Igor Bernstein wrote: > Hi all, > > I was hoping someone can clarify how to handle errors in SequentialDos. I > was really surprised to learn that SequentialDos get called for failed > Jobs. See example below. > > From what I can tell, this happens because > CrunchJobControl#executeReadySeqDoFns runs all PipelineCallables whose > Targets are not "unfinished". CrunchJobControl#getUnfinishedTargets defines > unfinished Targets as Targets that are not produced by waiting, running or > ready Jobs. In other words: Targets that belong to failed Jobs are > considered finished. Is this intentional? > > I can see this being useful in some scenarios, where my PipelineCallable > can recover from a Job, but I don't see a clear way for my PipelineCallable > to detect upstream failure. > > Anyone know if this is a bug and failed targets should be considered as > unfinished or is this feature and I'm missing a way for me to check for > upstream failure? > > Thanks in advance! > > > Example Code > class Main extends Configured implements Tool { > @Override > public int run(String[] args) throws Exception { > MRPipeline pipeline = new MRPipeline(Main.class); > PCollection lines = > pipeline.read(From.textFile("strings.txt")); > > lines > .parallelDo(new BrokenMapper(), Writables.strings()) > .sequentialDo("input", new AfterBroken()); > > return 0; > } > > static class BrokenMapper extends MapFn { > @Override > public String map(String input) { > throw new RuntimeException("I'm broken"); > } > } > > static class AfterBroken extends PipelineCallable { > @Override > protected Void getOutput(Pipeline pipeline) { > return null; > } > > @Override > public Status call() throws Exception { > System.out.println("I should not be called because I don't have a > materialized collection"); > > PCollection lines = (PCollection) > getOnlyPCollection(); > // Surprise! collection hasn't been materialized > Iterable materialize = lines.materialize(); > > return Status.SUCCESS; > } > } > > public static void main(String[] args) throws Exception { > int rc = ToolRunner.run(new Main(), args); > System.exit(rc); > } > } > --001a11c18094d001c6052a4258e4 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
No, that seems like a fail-- at least, the part where it f= ails to let you decide (based on the status of the job) whether/how to exec= ute the PipelineCallable. This seems tied to the idea that the Pipeline int= erface should give you insight into the PipelineResult object for any job/t= arget/etc through its API-- does that make sense?

J

On Tue, Jan 26, = 2016 at 8:50 AM, Igor Bernstein <igorbernstein@spotify.com>= wrote:
Hi all,
I was hoping someone can clarify how to handle errors in = SequentialDos. I was really surprised to learn that SequentialDos get calle= d for failed Jobs. See example below.

From what I = can tell, this happens because CrunchJobControl#executeReadySeqDoFns runs a= ll PipelineCallables whose Targets are not "unfinished". CrunchJo= bControl#getUnfinishedTargets defines unfinished Targets as Targets that ar= e not produced by waiting, running or ready Jobs.=C2=A0 In other words: Tar= gets that belong to failed Jobs are considered finished.=C2=A0 Is this inte= ntional?

I can see this being useful in some scena= rios, where my PipelineCallable can recover from a Job, but I don't see= a clear way for my PipelineCallable to detect upstream failure.=C2=A0

Anyone know if this is a bug and failed targets should= be considered as unfinished or is this feature and I'm missing a way f= or me to check for upstream failure?

Thanks in adv= ance!


Example Code
c= lass Main extends Configured implements Tool {
=C2=A0 @Override
=C2=A0 public int run(String[] args) throws Exception {
= =C2=A0 =C2=A0 MRPipeline pipeline =3D new MRPipeline(Main.class);
=C2=A0 =C2=A0 PCollection<String> lines =3D pipeline.read(From.textF= ile("strings.txt"));

=C2=A0 =C2=A0 lines=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 .parallelDo(new BrokenMapper(), Writ= ables.strings())
=C2=A0 =C2=A0 =C2=A0 =C2=A0 .sequentialDo("= input", new AfterBroken());

=C2=A0 =C2=A0 ret= urn 0;
=C2=A0 }

=C2=A0 static class Brok= enMapper extends MapFn<String, String> {
=C2=A0 =C2=A0 @Ove= rride
=C2=A0 =C2=A0 public String map(String input) {
= =C2=A0 =C2=A0 =C2=A0 throw new RuntimeException("I'm broken")= ;
=C2=A0 =C2=A0 }
=C2=A0 }

=C2= =A0 static class AfterBroken extends PipelineCallable<Void> {
=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 protected Void getOutput= (Pipeline pipeline) {
=C2=A0 =C2=A0 =C2=A0 return null;
=C2=A0 =C2=A0 }

=C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 public Status call() throws Exception {
=C2=A0 =C2= =A0 =C2=A0 System.out.println("I should not be called because I don= 9;t have a materialized collection");

=C2=A0 = =C2=A0 =C2=A0 PCollection<String> lines =3D (PCollection<String>= ;) getOnlyPCollection();
=C2=A0 =C2=A0 =C2=A0 // Surprise! collec= tion hasn't been materialized
=C2=A0 =C2=A0 =C2=A0 Iterable&l= t;String> materialize =3D lines.materialize();

= =C2=A0 =C2=A0 =C2=A0 return Status.SUCCESS;
=C2=A0 =C2=A0 }
=
=C2=A0 }

=C2=A0 public static void main(Strin= g[] args) throws Exception {
=C2=A0 =C2=A0 int rc =3D ToolRunner.= run(new Main(), args);
=C2=A0 =C2=A0 System.exit(rc);
= =C2=A0 }
}

--001a11c18094d001c6052a4258e4--