From dev-return-2074-apmail-samza-dev-archive=samza.apache.org@samza.incubator.apache.org Mon Dec 8 20:39:28 2014 Return-Path: X-Original-To: apmail-samza-dev-archive@minotaur.apache.org Delivered-To: apmail-samza-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 22ADB9870 for ; Mon, 8 Dec 2014 20:39:28 +0000 (UTC) Received: (qmail 52412 invoked by uid 500); 8 Dec 2014 20:39:28 -0000 Delivered-To: apmail-samza-dev-archive@samza.apache.org Received: (qmail 52371 invoked by uid 500); 8 Dec 2014 20:39:27 -0000 Mailing-List: contact dev-help@samza.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@samza.incubator.apache.org Delivered-To: mailing list dev@samza.incubator.apache.org Received: (qmail 51506 invoked by uid 99); 8 Dec 2014 20:39:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Dec 2014 20:39:26 +0000 X-ASF-Spam-Status: No, hits=-2.3 required=5.0 tests=RCVD_IN_DNSWL_MED,SPF_HELO_PASS,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of prvs=41267ae2a=criccomini@linkedin.com designates 69.28.149.80 as permitted sender) Received: from [69.28.149.80] (HELO esv4-mav04.corp.linkedin.com) (69.28.149.80) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Dec 2014 20:39:01 +0000 X-IronPort-AV: E=Sophos;i="5.07,540,1413270000"; d="scan'208";a="163037801" Received: from ESV4-MB03.linkedin.biz ([fe80::1caa:1422:7ef8:5ceb]) by esv4-cas02.linkedin.biz ([172.18.46.142]) with mapi id 14.03.0195.001; Mon, 8 Dec 2014 12:37:16 -0800 From: Chris Riccomini To: "dev@samza.incubator.apache.org" Subject: Re: Coast, and a few implementation questions Thread-Topic: Coast, and a few implementation questions Thread-Index: AQHP+GPQJX6g/dp2Z0y8FYlDNs2Az5xRAC+AgANuBQCAAPYQAIAAm94AgCD7/ACAD2CVAA== Date: Mon, 8 Dec 2014 20:37:16 +0000 Message-ID: References: In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: user-agent: Microsoft-MacOutlook/14.4.5.141003 x-originating-ip: [172.18.46.254] Content-Type: text/plain; charset="Windows-1252" Content-ID: Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-Virus-Checked: Checked by ClamAV on apache.org Hey Ben, This is awesome. Your blog post is really well written. I'm forwarding it on to some folks. My notes: 1. Please, please, open tickets for anything you need. Your pain on the offset management is something I've been mulling over. I don't have a really good idea on how to solve this, but I agree we should figure it out. 2. "But if our task is the only producer for that partition, it=B9s easy to calculate the value of n=8A and by starting from the commit, and then dropping the first n messages of output we create, we can get ourselves back to a consistent state." This is true only in cases where the processing is deterministic. There are a shocking amount of cases where this is not true, including some which are very subtle. If you rely on wall-clock time, for example, your logic will change between processing, so simply dropping N messages could lead to data loss or duplication. Other, more nuanced, non-determinism includes deploying a different (newer) version of code, dependency on external systems, etc. 3. You might want to consider the case where a downstream consumer is reading from a changelog, as well. This is a use case we've seen pop up once in a while. 4. One variation of merge log that we'd considered was to batch reads from an SSP together. This would allow you to shrink your merge log, so you can say, "Read 100 messages from SSP1, 100 messages from SSP2, etc." This allows you to drastically shrink the merge log's message count, which should improve performance. Samza's DefaultMessageChooser supports this style of batching, but it's off by default. 5. You should have a look at Kafka's transactionality proposal, as well (https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+ in+Kafka). This is the only way that I'm aware of that will support all use cases (including non-deterministic ones). Cheers, Chris On 11/28/14 9:47 AM, "Ben Kirwin" wrote: >Sorry for the late reply on this! > >I ended up writing up some of the tools I'm using to get exactly-once >semantics.[0] It's still a bit rough, but I decided that another week >sitting around on my disk wasn't going to do anything to improve it -- >let me know if anything needs clarification. > >That's also not a full answer, since it doesn't explain how those >primitives are used in `coast`. It's intentionally undocumented for >the moment, since it's a work in progress, but here's where the >coordination overhead sits right now: > >- Any transformations / folds / aggregations that require state use >Samza's existing state / changelog machinery. `coast` tracks a couple >offsets up/downstream from the state along with the state itself, so >it adds a extra couple of `long`s per changelog message. >- If messages are grouped differently in the input and output, `coast` >adds the source partition/offset to the output messages -- which is >about 12 bytes of per-message overhead. >- To avoid duplicate messages, `coast` needs to checkpoint the input >and output offsets together. Samza doesn't give user code access to >the offsets, so I've been maintaining this within the task as >additional state. >- For jobs with multiple inputs, `coast` needs to remember the order >in which messages arrived so it can reproduce it if there's a failure. >This 'merge log' itself is not too expensive, but tracking the current >offset in that log has been a surprising pain, since it too needs to >be consistent with the checkpointed offsets. The only way I've found >so far involves having the same task both produce to and consume from >the same 'merge log' stream; this is not quite as awful as it sounds, >but it does create a lot of latency for no good reason. > >So there's a couple places where more flexible offset handling would >substantially simplify things -- I'll think this over a bit more and >open a ticket. If I can work those issues out, though, the main cost >to enabling exactly-once becomes a few extra bytes of per-message >overhead for certain streams -- and I'm happy to live with that. > >As it stands, though, `coast` *does* implement a exactly-once >semantics on top of the 0.8 Samza branch. I have some more cleanup, >testing, and polish to do, but I'm hoping to push out a new version of >`coast` that supports both the exactly-once backend and an >overhead-free at-least-once version. If all goes well, that should >happen sometime next week? > >[0]: http://ben.kirw.in/2014/11/28/kafka-patterns/ > >On Fri, Nov 7, 2014 at 1:05 PM, Ben Kirwin wrote: >>>> but I think coast actually has a pretty good shot at making that >>>>easier >>>>-- it has quite a lot of 'structural' knowledge about the flow of data, >>>>so it should be able to do a pretty good job of inserting the necessary >>>>checks / checkpoints / etc. one DAG node at a time. >>> >>> True. Given that you know exactly what computation is going on, it >>>seems >>> more tractable. I'm curious how you plan to implement exactly once. Do >>>you >>> have any docs? >> >> Nothing worth reading, but I'm hoping to have something reasonable out >> in the next week or two. When that comes together, I'll send it along. > > > >--=20 >Ben Kirwin >http://ben.kirw.in/