beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [beam] branch master updated: [BEAM-7154] Updating Go SDK errors (Part 3)
Date Wed, 15 May 2019 19:38:05 GMT
This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c92fe0  [BEAM-7154] Updating Go SDK errors (Part 3)
     new 4f9361b  Merge pull request #8560 from youngoli/beam7154-2
7c92fe0 is described below

commit 7c92fe0bc3ba73320ee26b6323eb01884381afcc
Author: Daniel Oliveira <daniel.o.programmer@gmail.com>
AuthorDate: Sun May 12 20:51:46 2019 -0700

    [BEAM-7154] Updating Go SDK errors (Part 3)
---
 sdks/go/pkg/beam/core/funcx/fn.go                  |  10 +-
 sdks/go/pkg/beam/core/funcx/signature.go           |  15 ++-
 sdks/go/pkg/beam/core/graph/bind.go                |  60 +++++----
 sdks/go/pkg/beam/core/graph/coder/coder.go         |  13 +-
 sdks/go/pkg/beam/core/graph/coder/registry.go      |   7 +-
 sdks/go/pkg/beam/core/graph/coder/varint.go        |   2 +-
 sdks/go/pkg/beam/core/graph/edge.go                |  41 ++++--
 sdks/go/pkg/beam/core/graph/fn.go                  |  16 +--
 sdks/go/pkg/beam/core/graph/graph.go               |   7 +-
 sdks/go/pkg/beam/core/runtime/coderx/float.go      |   5 +-
 sdks/go/pkg/beam/core/runtime/coderx/varint.go     |   9 +-
 sdks/go/pkg/beam/core/runtime/exec/coder.go        |   3 +-
 sdks/go/pkg/beam/core/runtime/exec/cogbk.go        |   4 +-
 sdks/go/pkg/beam/core/runtime/exec/combine.go      |  23 ++--
 sdks/go/pkg/beam/core/runtime/exec/combine_test.go |   3 +-
 sdks/go/pkg/beam/core/runtime/exec/datasink.go     |   3 +-
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |  17 +--
 sdks/go/pkg/beam/core/runtime/exec/fn.go           |  15 ++-
 sdks/go/pkg/beam/core/runtime/exec/input.go        |   3 +-
 sdks/go/pkg/beam/core/runtime/exec/pardo.go        |   9 +-
 sdks/go/pkg/beam/core/runtime/exec/plan.go         |  11 +-
 sdks/go/pkg/beam/core/runtime/exec/translate.go    |  33 ++---
 sdks/go/pkg/beam/core/runtime/exec/unit_test.go    |  12 +-
 sdks/go/pkg/beam/core/runtime/exec/util.go         |   5 +-
 sdks/go/pkg/beam/core/runtime/graphx/coder.go      |  28 ++--
 sdks/go/pkg/beam/core/runtime/graphx/dataflow.go   |  27 ++--
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go  | 149 ++++++++++++++-------
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |   3 +-
 sdks/go/pkg/beam/core/runtime/harness/datamgr.go   |  12 +-
 sdks/go/pkg/beam/core/runtime/harness/harness.go   |   7 +-
 sdks/go/pkg/beam/core/runtime/harness/logging.go   |   3 +-
 sdks/go/pkg/beam/core/runtime/harness/session.go   |  13 +-
 sdks/go/pkg/beam/core/runtime/harness/statemgr.go  |  14 +-
 sdks/go/pkg/beam/core/runtime/pipelinex/replace.go |   3 +-
 sdks/go/pkg/beam/core/runtime/symbols.go           |   4 +-
 sdks/go/pkg/beam/core/typex/fulltype.go            |  12 +-
 sdks/go/pkg/beam/core/util/dot/dot.go              |   5 +-
 sdks/go/pkg/beam/core/util/hooks/hooks.go          |  12 +-
 sdks/go/pkg/beam/core/util/ioutilx/read.go         |   3 +-
 sdks/go/pkg/beam/core/util/protox/any.go           |   9 +-
 sdks/go/pkg/beam/core/util/protox/base64.go        |   4 +-
 sdks/go/pkg/beam/core/util/reflectx/call.go        |   4 +-
 sdks/go/pkg/beam/core/util/reflectx/json.go        |   5 +-
 sdks/go/pkg/beam/core/util/symtab/symtab.go        |  15 ++-
 44 files changed, 376 insertions(+), 282 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index 48129a5..c924782 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -16,12 +16,12 @@
 package funcx
 
 import (
-	"errors"
 	"fmt"
 	"reflect"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Note that we can't tell the difference between K, V and V, S before binding.
@@ -250,7 +250,7 @@ func New(fn reflectx.Func) (*Fn, error) {
 		case IsReIter(t):
 			kind = FnReIter
 		default:
-			return nil, fmt.Errorf("bad parameter type for %s: %v", fn.Name(), t)
+			return nil, errors.Errorf("bad parameter type for %s: %v", fn.Name(), t)
 		}
 
 		param = append(param, FnParam{Kind: kind, T: t})
@@ -269,7 +269,7 @@ func New(fn reflectx.Func) (*Fn, error) {
 		case typex.IsContainer(t), typex.IsConcrete(t), typex.IsUniversal(t):
 			kind = RetValue
 		default:
-			return nil, fmt.Errorf("bad return type for %s: %v", fn.Name(), t)
+			return nil, errors.Errorf("bad return type for %s: %v", fn.Name(), t)
 		}
 
 		ret = append(ret, ReturnParam{Kind: kind, T: t})
@@ -314,14 +314,14 @@ func validateOrder(u *Fn) error {
 	// Validate the parameter ordering.
 	for i, p := range u.Param {
 		if paramState, err = nextParamState(paramState, p.Kind); err != nil {
-			return fmt.Errorf("%s at parameter %d for %s", err.Error(), i, u.Fn.Name())
+			return errors.WithContextf(err, "validating parameter %d for %s", i, u.Fn.Name())
 		}
 	}
 	// Validate the return value ordering.
 	retState := rsStart
 	for i, r := range u.Ret {
 		if retState, err = nextRetState(retState, r.Kind); err != nil {
-			return fmt.Errorf("%s for return value %d for %s", err.Error(), i, u.Fn.Name())
+			return errors.WithContextf(err, "validating return value %d for %s", i, u.Fn.Name())
 		}
 	}
 	return nil
diff --git a/sdks/go/pkg/beam/core/funcx/signature.go b/sdks/go/pkg/beam/core/funcx/signature.go
index 6caa5cd..8bcd386 100644
--- a/sdks/go/pkg/beam/core/funcx/signature.go
+++ b/sdks/go/pkg/beam/core/funcx/signature.go
@@ -22,6 +22,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Signature is a concise representation of a group of function types. The
@@ -118,7 +119,7 @@ func Satisfy(fn interface{}, sig *Signature) error {
 	default:
 		value := reflect.ValueOf(fn)
 		if value.Kind() != reflect.Func {
-			return fmt.Errorf("not a function: %v", value)
+			return errors.Errorf("not a function: %v", value)
 		}
 		typ = value.Type()
 	}
@@ -129,11 +130,11 @@ func Satisfy(fn interface{}, sig *Signature) error {
 		out = append(out, typ.Out(i))
 	}
 	if len(in) < len(sig.Args) || len(out) < len(sig.Return) {
-		return fmt.Errorf("not enough required parameters: %v", typ)
+		return errors.Errorf("not enough required parameters: %v", typ)
 	}
 
 	if len(in) > len(sig.Args)+len(sig.OptArgs) || len(out) > len(sig.Return)+len(sig.OptReturn) {
-		return fmt.Errorf("too many parameters: %v", typ)
+		return errors.Errorf("too many parameters: %v", typ)
 	}
 
 	// (1) Create generic binding. If inconsistent, reject fn. We do not allow
@@ -170,7 +171,7 @@ func bind(list, models []reflect.Type, m map[string]reflect.Type) error {
 
 		name := list[i].Name()
 		if current, ok := m[name]; ok && current != t {
-			return fmt.Errorf("bind conflict for %v: %v != %v", name, current, t)
+			return errors.Errorf("bind conflict for %v: %v != %v", name, current, t)
 		}
 		m[name] = t
 	}
@@ -210,7 +211,7 @@ func matchOpt(list, models []reflect.Type, m map[string]reflect.Type) error {
 			// Substitute optional types, if bound.
 			subst, ok := m[t.Name()]
 			if !ok {
-				return fmt.Errorf("optional generic parameter not bound %v", t.Name())
+				return errors.Errorf("optional generic parameter not bound %v", t.Name())
 			}
 			t = subst
 		}
@@ -219,7 +220,7 @@ func matchOpt(list, models []reflect.Type, m map[string]reflect.Type) error {
 		}
 
 		if i == len(models) {
-			return fmt.Errorf("failed to match optional parameter %v", t)
+			return errors.Errorf("failed to match optional parameter %v", t)
 		}
 	}
 	return nil
@@ -228,6 +229,6 @@ func matchOpt(list, models []reflect.Type, m map[string]reflect.Type) error {
 // MustSatisfy panics if the given fn does not satisfy the signature.
 func MustSatisfy(fn interface{}, sig *Signature) {
 	if err := Satisfy(fn, sig); err != nil {
-		panic(fmt.Sprintf("fn does not satisfy signature %v: %v", sig, err))
+		panic(errors.Wrapf(err, "fn does not satisfy signature %v", sig))
 	}
 }
diff --git a/sdks/go/pkg/beam/core/graph/bind.go b/sdks/go/pkg/beam/core/graph/bind.go
index 3aeb3d2..77a6d69 100644
--- a/sdks/go/pkg/beam/core/graph/bind.go
+++ b/sdks/go/pkg/beam/core/graph/bind.go
@@ -16,11 +16,11 @@
 package graph
 
 import (
-	"fmt"
 	"reflect"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // TODO(herohde) 4/21/2017: Bind is where most user mistakes will likely show
@@ -62,27 +62,28 @@ import (
 func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in ...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, []typex.FullType, error) {
 	inbound, kinds, err := findInbound(fn, in...)
 	if err != nil {
-		return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", fn.Fn.Name(), err)
+		return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
 	}
 	outbound, err := findOutbound(fn)
 	if err != nil {
-		return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", fn.Fn.Name(), err)
+		return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
 	}
 
 	subst, err := typex.Bind(inbound, in)
 	if err != nil {
-		return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", fn.Fn.Name(), err)
+		return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
 	}
 	for k, v := range typedefs {
 		if substK, exists := subst[k]; exists {
-			return nil, nil, nil, nil, fmt.Errorf("binding fn %v: cannot substitute type %v with %v, already defined as %v", fn.Fn.Name(), k, v, substK)
+			err := errors.Errorf("cannot substitute type %v with %v, already defined as %v", k, v, substK)
+			return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
 		}
 		subst[k] = v
 	}
 
 	out, err := typex.Substitute(outbound, subst)
 	if err != nil {
-		return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", fn.Fn.Name(), err)
+		return nil, nil, nil, nil, errors.WithContextf(err, "binding fn %v", fn.Fn.Name())
 	}
 	return inbound, kinds, outbound, out, nil
 }
@@ -102,7 +103,7 @@ func findOutbound(fn *funcx.Fn) ([]typex.FullType, error) {
 	case 2:
 		outbound = append(outbound, typex.NewKV(typex.New(ret[0]), typex.New(ret[1])))
 	default:
-		return nil, fmt.Errorf("too many return values: %v", ret)
+		return nil, errors.Errorf("too many return values: %v", ret)
 	}
 
 	for _, param := range params {
@@ -135,26 +136,29 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType) ([]typex.FullType, []InputK
 	for _, input := range in {
 		arity, err := inboundArity(input, index == 0)
 		if err != nil {
-			return nil, nil, fmt.Errorf("binding params %v to input %v: %v", params, input, err)
+			return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params, input)
 		}
 		if len(params)-index < arity {
-			return nil, nil, fmt.Errorf("binding params %v to input %v: too few params", params[index:], input)
+			err := errors.New("too few params")
+			return nil, nil, errors.WithContextf(err, "binding params %v to input %v", params[index:], input)
 		}
 
 		paramsToBind := params[index : index+arity]
 		elm, kind, err := tryBindInbound(input, paramsToBind, index == 0)
 		if err != nil {
-			return nil, nil, fmt.Errorf("binding params %v to input %v: %v", paramsToBind, input, err)
+			return nil, nil, errors.WithContextf(err, "binding params %v to input %v", paramsToBind, input)
 		}
 		inbound = append(inbound, elm)
 		kinds = append(kinds, kind)
 		index += arity
 	}
 	if index < len(params) {
-		return nil, nil, fmt.Errorf("binding params %v to inputs %v: too few inputs. Forgot an input or to annotate options?", params, in)
+		err := errors.New("too few inputs: forgot an input or to annotate options?")
+		return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in)
 	}
 	if index > len(params) {
-		return nil, nil, fmt.Errorf("binding params %v to inputs %v: too many inputs", params, in)
+		err := errors.New("too many inputs")
+		return nil, nil, errors.WithContextf(err, "binding params %v to inputs %v:", params, in)
 	}
 	return inbound, kinds, nil
 }
@@ -188,7 +192,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.
 				values, _ := funcx.UnfoldIter(args[0].T)
 				trimmed := trimIllegal(values)
 				if len(trimmed) != 1 {
-					return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0])
+					return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
 				}
 
 				kind = Iter
@@ -198,14 +202,14 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.
 				values, _ := funcx.UnfoldReIter(args[0].T)
 				trimmed := trimIllegal(values)
 				if len(trimmed) != 1 {
-					return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0])
+					return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
 				}
 
 				kind = ReIter
 				other = typex.New(trimmed[0])
 
 			default:
-				return nil, kind, fmt.Errorf("unexpected param kind: %v", arg)
+				return nil, kind, errors.Errorf("unexpected param kind: %v", arg)
 			}
 		}
 	case typex.Composite:
@@ -213,10 +217,10 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.
 		case typex.KVType:
 			if isMain {
 				if args[0].Kind != funcx.FnValue {
-					return nil, kind, fmt.Errorf("key of %v cannot bind to %v", t, args[0])
+					return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0])
 				}
 				if args[1].Kind != funcx.FnValue {
-					return nil, kind, fmt.Errorf("value of %v cannot bind to %v", t, args[1])
+					return nil, kind, errors.Errorf("value of %v cannot bind to %v", t, args[1])
 				}
 				other = typex.NewKV(typex.New(args[0].T), typex.New(args[1].T))
 			} else {
@@ -227,7 +231,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.
 					values, _ := funcx.UnfoldIter(args[0].T)
 					trimmed := trimIllegal(values)
 					if len(trimmed) != 2 {
-						return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0])
+						return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
 					}
 
 					kind = Iter
@@ -237,20 +241,20 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.
 					values, _ := funcx.UnfoldReIter(args[0].T)
 					trimmed := trimIllegal(values)
 					if len(trimmed) != 2 {
-						return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0])
+						return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
 					}
 
 					kind = ReIter
 					other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
 
 				default:
-					return nil, kind, fmt.Errorf("%v cannot bind to %v", t, args[0])
+					return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
 				}
 			}
 
 		case typex.CoGBKType:
 			if args[0].Kind != funcx.FnValue {
-				return nil, kind, fmt.Errorf("key of %v cannot bind to %v", t, args[0])
+				return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0])
 			}
 
 			components := []typex.FullType{typex.New(args[0].T)}
@@ -261,7 +265,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.
 					values, _ := funcx.UnfoldIter(args[i].T)
 					trimmed := trimIllegal(values)
 					if len(trimmed) != 1 {
-						return nil, kind, fmt.Errorf("values of %v cannot bind to %v", t, args[i])
+						return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
 					}
 					components = append(components, typex.New(trimmed[0]))
 
@@ -269,25 +273,25 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.
 					values, _ := funcx.UnfoldReIter(args[i].T)
 					trimmed := trimIllegal(values)
 					if len(trimmed) != 1 {
-						return nil, kind, fmt.Errorf("values of %v cannot bind to %v", t, args[i])
+						return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
 					}
 					components = append(components, typex.New(trimmed[0]))
 				default:
-					return nil, kind, fmt.Errorf("values of %v cannot bind to %v", t, args[i])
+					return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
 				}
 			}
 			other = typex.NewCoGBK(components...)
 
 		default:
-			return nil, kind, fmt.Errorf("unexpected inbound type: %v", t.Type())
+			return nil, kind, errors.Errorf("unexpected inbound type: %v", t.Type())
 		}
 
 	default:
-		return nil, kind, fmt.Errorf("unexpected inbound class: %v", t.Class())
+		return nil, kind, errors.Errorf("unexpected inbound class: %v", t.Class())
 	}
 
 	if !typex.IsStructurallyAssignable(t, other) {
-		return nil, kind, fmt.Errorf("%v is not assignable to %v", t, other)
+		return nil, kind, errors.Errorf("%v is not assignable to %v", t, other)
 	}
 	return other, kind, nil
 }
@@ -304,7 +308,7 @@ func inboundArity(t typex.FullType, isMain bool) (int, error) {
 		case typex.CoGBKType:
 			return len(t.Components()), nil
 		default:
-			return 0, fmt.Errorf("unexpected composite inbound type: %v", t.Type())
+			return 0, errors.Errorf("unexpected composite inbound type: %v", t.Type())
 		}
 	}
 	return 1, nil
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go b/sdks/go/pkg/beam/core/graph/coder/coder.go
index cfcca35..4de630a 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // CustomCoder contains possibly untyped encode/decode user functions that are
@@ -106,7 +107,7 @@ type ElementDecoder interface {
 func validateEncoder(t reflect.Type, encode interface{}) error {
 	// Check if it uses the real type in question.
 	if err := funcx.Satisfy(encode, funcx.Replace(encodeSig, typex.TType, t)); err != nil {
-		return fmt.Errorf("validateEncoder: incorrect signature: %v", err)
+		return errors.WithContext(err, "validateEncoder: validating signature")
 	}
 	// TODO(lostluck): 2019.02.03 - Determine if there are encode allocation bottlenecks.
 	return nil
@@ -115,7 +116,7 @@ func validateEncoder(t reflect.Type, encode interface{}) error {
 func validateDecoder(t reflect.Type, decode interface{}) error {
 	// Check if it uses the real type in question.
 	if err := funcx.Satisfy(decode, funcx.Replace(decodeSig, typex.TType, t)); err != nil {
-		return fmt.Errorf("validateDecoder: incorrect signature: %v", err)
+		return errors.WithContext(err, "validateDecoder: validating signature")
 	}
 	// TODO(lostluck): 2019.02.03 - Expand cases to avoid []byte -> interface{} conversion
 	// in exec, & a beam Decoder interface.
@@ -126,19 +127,19 @@ func validateDecoder(t reflect.Type, decode interface{}) error {
 // particular encoding strategy.
 func NewCustomCoder(id string, t reflect.Type, encode, decode interface{}) (*CustomCoder, error) {
 	if err := validateEncoder(t, encode); err != nil {
-		return nil, fmt.Errorf("NewCustomCoder: %v", err)
+		return nil, errors.WithContext(err, "NewCustomCoder")
 	}
 	enc, err := funcx.New(reflectx.MakeFunc(encode))
 	if err != nil {
-		return nil, fmt.Errorf("bad encode: %v", err)
+		return nil, errors.Wrap(err, "bad encode")
 	}
 	if err := validateDecoder(t, decode); err != nil {
-		return nil, fmt.Errorf("NewCustomCoder: %v", err)
+		return nil, errors.WithContext(err, "NewCustomCoder")
 	}
 
 	dec, err := funcx.New(reflectx.MakeFunc(decode))
 	if err != nil {
-		return nil, fmt.Errorf("bad decode: %v", err)
+		return nil, errors.Wrap(err, "bad decode")
 	}
 
 	c := &CustomCoder{
diff --git a/sdks/go/pkg/beam/core/graph/coder/registry.go b/sdks/go/pkg/beam/core/graph/coder/registry.go
index d95258d..5be9556 100644
--- a/sdks/go/pkg/beam/core/graph/coder/registry.go
+++ b/sdks/go/pkg/beam/core/graph/coder/registry.go
@@ -16,8 +16,9 @@
 package coder
 
 import (
-	"fmt"
 	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var (
@@ -45,7 +46,7 @@ func RegisterCoder(t reflect.Type, enc, dec interface{}) {
 	key := tkey(t)
 
 	if _, err := NewCustomCoder(t.String(), t, enc, dec); err != nil {
-		panic(fmt.Sprintf("RegisterCoder failed for type %v: %v", t, err))
+		panic(errors.Wrapf(err, "RegisterCoder failed for type %v", t))
 	}
 
 	if t.Kind() == reflect.Interface {
@@ -72,7 +73,7 @@ func RegisterCoder(t reflect.Type, enc, dec interface{}) {
 		cc, err := NewCustomCoder(name, rt, enc, dec)
 		if err != nil {
 			// An error on look up shouldn't happen after the validation.
-			panic(fmt.Sprintf("Creating %v CustomCoder for type %v failed: %v", name, rt, err))
+			panic(errors.Wrapf(err, "Creating %v CustomCoder for type %v failed", name, rt))
 		}
 		return cc
 	}
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go b/sdks/go/pkg/beam/core/graph/coder/varint.go
index 1c93d1d..4f311a0 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -16,10 +16,10 @@
 package coder
 
 import (
-	"errors"
 	"io"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // ErrVarIntTooLong indicates a data corruption issue that needs special
diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go
index 24b57fe..846dceb 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Opcode represents a primitive Beam instruction kind.
@@ -187,10 +188,12 @@ func (e *MultiEdge) String() string {
 func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
 	if len(ns) == 0 {
 		// TODO(BEAM-7086) Reduce the repetition in the context of all the errors in this file.
-		return nil, fmt.Errorf("creating new CoGBK in scope %v: needs at least 1 input", s)
+		err := errors.New("needs at least 1 input")
+		return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
 	}
 	if !typex.IsKV(ns[0].Type()) {
-		return nil, fmt.Errorf("creating new CoGBK in scope %v: input type must be KV: %v", s, ns[0])
+		err := errors.Errorf("input type must be KV: %v", ns[0])
+		return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
 	}
 
 	// (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> -> CoGBK<T,U,..,Z>.
@@ -203,16 +206,20 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
 	for i := 1; i < len(ns); i++ {
 		n := ns[i]
 		if !typex.IsKV(n.Type()) {
-			return nil, fmt.Errorf("creating new CoGBK in scope %v: input type must be KV: %v", s, n)
+			err := errors.Errorf("input type must be KV: %v", n)
+			return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
 		}
 		if !n.Coder.Components[0].Equals(c) {
-			return nil, fmt.Errorf("creating new CoGBK in scope %v: key coder for %v is %v, want %v", s, n, n.Coder.Components[0], c)
+			err := errors.Errorf("key coder for %v is %v, want %v", n, n.Coder.Components[0], c)
+			return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
 		}
 		if !w.Equals(n.WindowingStrategy()) {
-			return nil, fmt.Errorf("creating new CoGBK in scope %v: mismatched CoGBK windowing strategies: %v, want %v", s, n.WindowingStrategy(), w)
+			err := errors.Errorf("mismatched CoGBK windowing strategies: %v, want %v", n.WindowingStrategy(), w)
+			return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
 		}
 		if bounded != n.Bounded() {
-			return nil, fmt.Errorf("creating new CoGBK in scope %v: unmatched CoGBK boundedness: %v, want %v", s, n.Bounded(), bounded)
+			err := errors.Errorf("unmatched CoGBK boundedness: %v, want %v", n.Bounded(), bounded)
+			return nil, errors.WithContextf(err, "creating new CoGBK in scope %v", s)
 		}
 
 		comp = append(comp, n.Type().Components()[1])
@@ -236,7 +243,8 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
 // the shared input type.
 func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
 	if len(in) < 2 {
-		return nil, fmt.Errorf("creating new Flatten in scope %v: Flatten needs at least 2 input, got %v", s, len(in))
+		err := errors.Errorf("Flatten needs at least 2 input, got %v", len(in))
+		return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
 	}
 	t := in[0].Type()
 	w := inputWindow(in)
@@ -252,14 +260,17 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
 	}
 	for _, n := range in {
 		if !typex.IsEqual(t, n.Type()) {
-			return nil, fmt.Errorf("creating new Flatten in scope %v: mismatched Flatten input types: %v, want %v", s, n.Type(), t)
+			err := errors.Errorf("mismatched Flatten input types: %v, want %v", n.Type(), t)
+			return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
 		}
 		if !w.Equals(n.WindowingStrategy()) {
-			return nil, fmt.Errorf("creating new Flatten in scope %v: mismatched Flatten window types: %v, want %v", s, n.WindowingStrategy(), w)
+			err := errors.Errorf("mismatched Flatten window types: %v, want %v", n.WindowingStrategy(), w)
+			return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
 		}
 	}
 	if typex.IsCoGBK(t) {
-		return nil, fmt.Errorf("creating new Flatten in scope %v: Flatten input type cannot be CoGBK: %v", s, t)
+		err := errors.Errorf("Flatten input type cannot be CoGBK: %v", t)
+		return nil, errors.WithContextf(err, "creating new Flatten in scope %v", s)
 	}
 
 	edge := g.NewEdge(s)
@@ -300,7 +311,7 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, typedefs ma
 
 	inbound, kinds, outbound, out, err := Bind(u.ProcessElementFn(), typedefs, NodeTypes(in)...)
 	if err != nil {
-		return nil, fmt.Errorf("creating new DoFn in scope %v: %v", s, err)
+		return nil, errors.WithContextf(err, "creating new DoFn in scope %v", s)
 	}
 
 	edge := g.NewEdge(s)
@@ -329,10 +340,12 @@ const CombinePerKeyScope = "CombinePerKey"
 func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*MultiEdge, error) {
 	inT := in.Type()
 	if !typex.IsCoGBK(inT) {
-		return nil, fmt.Errorf("creating new Combine in scope %v: Combine requires CoGBK type: %v", s, inT)
+		err := errors.Errorf("Combine requires CoGBK type: %v", inT)
+		return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
 	}
 	if len(inT.Components()) > 2 {
-		return nil, fmt.Errorf("creating new Combine in scope %v: Combine cannot follow multi-input CoGBK: %v", s, inT)
+		err := errors.Errorf("Combine cannot follow multi-input CoGBK: %v", inT)
+		return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
 	}
 
 	// Create a synthetic function for binding purposes. It takes main input
@@ -381,7 +394,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) (*M
 
 	inbound, kinds, outbound, out, err := Bind(synth, nil, inT)
 	if err != nil {
-		return nil, fmt.Errorf("creating new Combine in scope %v: %v", s, err)
+		return nil, errors.WithContextf(err, "creating new Combine in scope %v", s)
 	}
 
 	edge := g.NewEdge(s)
diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go
index 7418c8c..d174c4f 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -94,7 +94,7 @@ func NewFn(fn interface{}) (*Fn, error) {
 
 	case reflect.Ptr:
 		if val.Elem().Kind() != reflect.Struct {
-			return nil, fmt.Errorf("value %v must be ptr to struct", fn)
+			return nil, errors.Errorf("value %v must be ptr to struct", fn)
 		}
 
 		// Note that a ptr receiver is necessary if struct fields are updated in the
@@ -107,7 +107,7 @@ func NewFn(fn interface{}) (*Fn, error) {
 			for name, mfn := range methodsFuncs {
 				f, err := funcx.New(mfn)
 				if err != nil {
-					return nil, fmt.Errorf("method %v invalid: %v", name, err)
+					return nil, errors.Wrapf(err, "method %v invalid", name)
 				}
 				methods[name] = f
 			}
@@ -133,14 +133,14 @@ func NewFn(fn interface{}) (*Fn, error) {
 
 			f, err := funcx.New(reflectx.MakeFunc(val.Method(i).Interface()))
 			if err != nil {
-				return nil, fmt.Errorf("method %v invalid: %v", m.Name, err)
+				return nil, errors.Wrapf(err, "method %v invalid", m.Name)
 			}
 			methods[m.Name] = f
 		}
 		return &Fn{Recv: fn, methods: methods}, nil
 
 	default:
-		return nil, fmt.Errorf("value %v must be function or (ptr to) struct", fn)
+		return nil, errors.Errorf("value %v must be function or (ptr to) struct", fn)
 	}
 }
 
@@ -220,7 +220,7 @@ func AsDoFn(fn *Fn) (*DoFn, error) {
 	}
 
 	if _, ok := fn.methods[processElementName]; !ok {
-		return nil, fmt.Errorf("graph.AsDoFn: failed to find %v method: %v", processElementName, fn)
+		return nil, errors.Errorf("graph.AsDoFn: failed to find %v method: %v", processElementName, fn)
 	}
 
 	// TODO(herohde) 5/18/2017: validate the signatures, incl. consistency.
@@ -296,7 +296,7 @@ func AsCombineFn(fn *Fn) (*CombineFn, error) {
 
 	mergeFn, ok := fn.methods[mergeAccumulatorsName]
 	if !ok {
-		return nil, fmt.Errorf("%v: failed to find required %v method on type: %v", fnKind, mergeAccumulatorsName, fn.Name())
+		return nil, errors.Errorf("%v: failed to find required %v method on type: %v", fnKind, mergeAccumulatorsName, fn.Name())
 	}
 
 	// CombineFn methods must satisfy the following:
@@ -306,7 +306,7 @@ func AsCombineFn(fn *Fn) (*CombineFn, error) {
 	// ExtractOutput func(A) (O, error?)
 	// This means that the other signatures *must* match the type used in MergeAccumulators.
 	if len(mergeFn.Ret) <= 0 {
-		return nil, fmt.Errorf("%v: %v requires at least 1 return value. : %v", fnKind, mergeAccumulatorsName, mergeFn)
+		return nil, errors.Errorf("%v: %v requires at least 1 return value. : %v", fnKind, mergeAccumulatorsName, mergeFn)
 	}
 	accumType := mergeFn.Ret[0].T
 
@@ -359,7 +359,7 @@ func verifyValidNames(fnKind string, fn *Fn, names ...string) error {
 
 	for key := range fn.methods {
 		if !m[key] {
-			return fmt.Errorf("%s: unexpected exported method %v present. Valid methods are: %v", fnKind, key, names)
+			return errors.Errorf("%s: unexpected exported method %v present. Valid methods are: %v", fnKind, key, names)
 		}
 	}
 	return nil
diff --git a/sdks/go/pkg/beam/core/graph/graph.go b/sdks/go/pkg/beam/core/graph/graph.go
index dd68885..fb80cba 100644
--- a/sdks/go/pkg/beam/core/graph/graph.go
+++ b/sdks/go/pkg/beam/core/graph/graph.go
@@ -21,6 +21,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Graph represents an in-progress deferred execution graph and is easily
@@ -87,7 +88,7 @@ func (g *Graph) Build() ([]*MultiEdge, []*Node, error) {
 	for _, n := range g.nodes {
 		nodes[n] = true
 		if n.Coder == nil {
-			return nil, nil, fmt.Errorf("node %v in graph has undefined coder", n.id)
+			return nil, nil, errors.Errorf("node %v in graph has undefined coder", n.id)
 		}
 	}
 	// Build a map of all nodes that are reachable by g.edges.
@@ -102,12 +103,12 @@ func (g *Graph) Build() ([]*MultiEdge, []*Node, error) {
 	}
 	for n := range nodes {
 		if _, ok := reachable[n]; !ok {
-			return nil, nil, fmt.Errorf("node %v in graph is unconnected", n.id)
+			return nil, nil, errors.Errorf("node %v in graph is unconnected", n.id)
 		}
 	}
 	for n, e := range reachable {
 		if _, ok := nodes[n]; !ok {
-			return nil, nil, fmt.Errorf("node %v is reachable by edge %v, but it's not in same graph", n.id, e.id)
+			return nil, nil, errors.Errorf("node %v is reachable by edge %v, but it's not in same graph", n.id, e.id)
 		}
 	}
 	return g.edges, g.nodes, nil
diff --git a/sdks/go/pkg/beam/core/runtime/coderx/float.go b/sdks/go/pkg/beam/core/runtime/coderx/float.go
index 3179f73..b502d05 100644
--- a/sdks/go/pkg/beam/core/runtime/coderx/float.go
+++ b/sdks/go/pkg/beam/core/runtime/coderx/float.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 func encFloat(v typex.T) []byte {
@@ -43,7 +44,7 @@ func encFloat(v typex.T) []byte {
 func decFloat(t reflect.Type, data []byte) (typex.T, error) {
 	uval, err := decVarUintZ(reflectx.Uint64, data)
 	if err != nil {
-		return nil, fmt.Errorf("invalid float encoding for: %v", data)
+		return nil, errors.Errorf("invalid float encoding for: %v", data)
 	}
 
 	n := math.Float64frombits(bits.ReverseBytes64(uval.(uint64)))
@@ -64,6 +65,6 @@ func NewFloat(t reflect.Type) (*coder.CustomCoder, error) {
 	case reflect.Float32, reflect.Float64:
 		return coder.NewCustomCoder("float", t, encFloat, decFloat)
 	default:
-		return nil, fmt.Errorf("not a float type: %v", t)
+		return nil, errors.Errorf("not a float type: %v", t)
 	}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/coderx/varint.go b/sdks/go/pkg/beam/core/runtime/coderx/varint.go
index c0b2b79..07e4ce1 100644
--- a/sdks/go/pkg/beam/core/runtime/coderx/varint.go
+++ b/sdks/go/pkg/beam/core/runtime/coderx/varint.go
@@ -22,6 +22,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // NewVarIntZ returns a varint coder for the given integer type. It uses a zig-zag scheme,
@@ -31,7 +32,7 @@ func NewVarIntZ(t reflect.Type) (*coder.CustomCoder, error) {
 	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
 		return coder.NewCustomCoder("varintz", t, encVarIntZ, decVarIntZ)
 	default:
-		return nil, fmt.Errorf("not a signed integer type: %v", t)
+		return nil, errors.Errorf("not a signed integer type: %v", t)
 	}
 }
 
@@ -42,7 +43,7 @@ func NewVarUintZ(t reflect.Type) (*coder.CustomCoder, error) {
 	case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
 		return coder.NewCustomCoder("varuintz", t, encVarUintZ, decVarUintZ)
 	default:
-		return nil, fmt.Errorf("not a unsigned integer type: %v", t)
+		return nil, errors.Errorf("not a unsigned integer type: %v", t)
 	}
 }
 
@@ -70,7 +71,7 @@ func encVarIntZ(v typex.T) []byte {
 func decVarIntZ(t reflect.Type, data []byte) (typex.T, error) {
 	n, size := binary.Varint(data)
 	if size <= 0 {
-		return nil, fmt.Errorf("invalid varintz encoding for: %v", data)
+		return nil, errors.Errorf("invalid varintz encoding for: %v", data)
 	}
 	switch t.Kind() {
 	case reflect.Int:
@@ -112,7 +113,7 @@ func encVarUintZ(v typex.T) []byte {
 func decVarUintZ(t reflect.Type, data []byte) (typex.T, error) {
 	n, size := binary.Uvarint(data)
 	if size <= 0 {
-		return nil, fmt.Errorf("invalid varuintz encoding for: %v", data)
+		return nil, errors.Errorf("invalid varuintz encoding for: %v", data)
 	}
 	switch t.Kind() {
 	case reflect.Uint:
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index dfe2ac1..a55d7f7 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // NOTE(herohde) 4/30/2017: The main complication is CoGBK results, which have
@@ -119,7 +120,7 @@ func (*bytesEncoder) Encode(val *FullValue, w io.Writer) error {
 	var data []byte
 	data, ok := val.Elm.([]byte)
 	if !ok {
-		return fmt.Errorf("received unknown value type: want []byte, got %T", val.Elm)
+		return errors.Errorf("received unknown value type: want []byte, got %T", val.Elm)
 	}
 	size := len(data)
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
index 40f5636..5f207a2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
@@ -19,6 +19,8 @@ import (
 	"bytes"
 	"context"
 	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // TODO(BEAM-490): This file contains support for the handling of CoGBK
@@ -167,7 +169,7 @@ func (f *filterStream) Read() (*FullValue, error) {
 
 		v, err := f.dec.Decode(bytes.NewReader(value))
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode union value '%v' for key %v: %v", value, key, err)
+			return nil, errors.Wrapf(err, "failed to decode union value '%v' for key %v", value, key)
 		}
 		v.Timestamp = elm.Timestamp
 		v.Windows = elm.Windows
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index ef8c3cd..7815ac1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 )
 
@@ -64,7 +65,7 @@ func (n *Combine) ID() UnitID {
 // Up initializes this CombineFn and runs its SetupFn() method.
 func (n *Combine) Up(ctx context.Context) error {
 	if n.status != Initializing {
-		return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status)
+		return errors.Errorf("invalid status for combine %v: %v", n.UID, n.status)
 	}
 	n.status = Up
 
@@ -103,7 +104,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{}) (inte
 	in := &MainInput{Key: FullValue{Elm: a}}
 	val, err := n.mergeInv.InvokeWithoutEventTime(ctx, in, b)
 	if err != nil {
-		return nil, n.fail(fmt.Errorf("MergeAccumulators failed: %v", err))
+		return nil, n.fail(errors.WithContext(err, "invoking MergeAccumulators"))
 	}
 	return val.Elm, nil
 }
@@ -111,7 +112,7 @@ func (n *Combine) mergeAccumulators(ctx context.Context, a, b interface{}) (inte
 // StartBundle initializes processing this bundle for combines.
 func (n *Combine) StartBundle(ctx context.Context, id string, data DataContext) error {
 	if n.status != Up {
-		return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status)
+		return errors.Errorf("invalid status for combine %v: %v", n.UID, n.status)
 	}
 	n.status = Active
 
@@ -130,7 +131,7 @@ func (n *Combine) StartBundle(ctx context.Context, id string, data DataContext)
 // AddInput, MergeAccumulators, and ExtractOutput functions.
 func (n *Combine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status)
+		return errors.Errorf("invalid status for combine %v: %v", n.UID, n.status)
 	}
 
 	// Note that we do not explicitly call merge, although it may
@@ -173,7 +174,7 @@ func (n *Combine) ProcessElement(ctx context.Context, value *FullValue, values .
 // FinishBundle completes this node's processing of a bundle.
 func (n *Combine) FinishBundle(ctx context.Context) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for combine %v: %v", n.UID, n.status)
+		return errors.Errorf("invalid status for combine %v: %v", n.UID, n.status)
 	}
 	n.status = Up
 	if n.createAccumInv != nil {
@@ -221,7 +222,7 @@ func (n *Combine) newAccum(ctx context.Context, key interface{}) (interface{}, e
 
 	val, err := n.createAccumInv.InvokeWithoutEventTime(ctx, opt)
 	if err != nil {
-		return nil, fmt.Errorf("CreateAccumulator failed: %v", err)
+		return nil, n.fail(errors.WithContext(err, "invoking CreateAccumulator"))
 	}
 	return val.Elm, nil
 }
@@ -264,7 +265,7 @@ func (n *Combine) addInput(ctx context.Context, accum, key, value interface{}, t
 
 	val, err := n.addInputInv.InvokeWithoutEventTime(ctx, opt, v)
 	if err != nil {
-		return nil, n.fail(fmt.Errorf("AddInput failed: %v", err))
+		return nil, n.fail(errors.WithContext(err, "invoking AddInput"))
 	}
 	return val.Elm, err
 }
@@ -278,7 +279,7 @@ func (n *Combine) extract(ctx context.Context, accum interface{}) (interface{},
 
 	val, err := n.extractOutputInv.InvokeWithoutEventTime(ctx, nil, accum)
 	if err != nil {
-		return nil, n.fail(fmt.Errorf("ExtractOutput failed: %v", err))
+		return nil, n.fail(errors.WithContext(err, "invoking ExtractOutput"))
 	}
 	return val.Elm, err
 }
@@ -335,7 +336,7 @@ func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataCon
 // policy is used.
 func (n *LiftedCombine) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for precombine %v: %v", n.UID, n.status)
+		return errors.Errorf("invalid status for precombine %v: %v", n.UID, n.status)
 	}
 
 	key, err := n.keyHash.Hash(value.Elm)
@@ -435,7 +436,7 @@ func (n *MergeAccumulators) String() string {
 // runs the MergeAccumulatorsFn over them repeatedly.
 func (n *MergeAccumulators) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for combine merge %v: %v", n.UID, n.status)
+		return errors.Errorf("invalid status for combine merge %v: %v", n.UID, n.status)
 	}
 	a, err := n.newAccum(n.Combine.ctx, value.Elm)
 	if err != nil {
@@ -490,7 +491,7 @@ func (n *ExtractOutput) String() string {
 // ProcessElement accepts an accumulator value, and extracts the final return type from it.
 func (n *ExtractOutput) ProcessElement(ctx context.Context, value *FullValue, values ...ReStream) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for combine extract %v: %v", n.UID, n.status)
+		return errors.Errorf("invalid status for combine extract %v: %v", n.UID, n.status)
 	}
 	out, err := n.extract(n.Combine.ctx, value.Elm2)
 	if err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
index c206fc0..c60ab52 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
@@ -30,6 +30,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var intInput = []interface{}{int(1), int(2), int(3), int(4), int(5), int(6)}
@@ -315,7 +316,7 @@ func (*MyErrorCombine) MergeAccumulators(a, b int64) (int64, error) {
 func intCoder(t reflect.Type) *coder.Coder {
 	c, err := coderx.NewVarIntZ(t)
 	if err != nil {
-		panic(fmt.Sprintf("Couldn't get VarInt coder for %v: %v", t, err))
+		panic(errors.Wrapf(err, "Couldn't get VarInt coder for %v", t))
 	}
 	return &coder.Coder{Kind: coder.Custom, T: typex.New(t), Custom: c}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
index 66eb380..e11c47a 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
@@ -24,6 +24,7 @@ import (
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -71,7 +72,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value *FullValue, values
 		return err
 	}
 	if err := n.enc.Encode(value, &b); err != nil {
-		return fmt.Errorf("failed to encode element %v with coder %v: %v", value, n.enc, err)
+		return errors.WithContextf(err, "encoding element %v with coder %v", value, n.enc)
 	}
 	if _, err := n.w.Write(b.Bytes()); err != nil {
 		return err
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index a6e141c..5d40b4a 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -23,6 +23,7 @@ import (
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -74,14 +75,14 @@ func (n *DataSource) Process(ctx context.Context) error {
 				if err == io.EOF {
 					return nil
 				}
-				return fmt.Errorf("source failed: %v", err)
+				return errors.Wrap(err, "source failed")
 			}
 
 			// Decode key
 
 			key, err := ck.Decode(r)
 			if err != nil {
-				return fmt.Errorf("source decode failed: %v", err)
+				return errors.Wrap(err, "source decode failed")
 			}
 			key.Timestamp = t
 			key.Windows = ws
@@ -94,7 +95,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 
 			size, err := coder.DecodeInt32(r)
 			if err != nil {
-				return fmt.Errorf("stream size decoding failed: %v", err)
+				return errors.Wrap(err, "stream size decoding failed")
 			}
 
 			if size > -1 {
@@ -106,7 +107,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 				for i := int32(0); i < size; i++ {
 					value, err := cv.Decode(r)
 					if err != nil {
-						return fmt.Errorf("stream value decode failed: %v", err)
+						return errors.Wrap(err, "stream value decode failed")
 					}
 					buf = append(buf, *value)
 				}
@@ -116,7 +117,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 				for {
 					chunk, err := coder.DecodeVarUint64(r)
 					if err != nil {
-						return fmt.Errorf("stream chunk size decoding failed: %v", err)
+						return errors.Wrap(err, "stream chunk size decoding failed")
 					}
 
 					// log.Printf("Chunk size=%v", chunk)
@@ -129,7 +130,7 @@ func (n *DataSource) Process(ctx context.Context) error {
 					for i := uint64(0); i < chunk; i++ {
 						value, err := cv.Decode(r)
 						if err != nil {
-							return fmt.Errorf("stream value decode failed: %v", err)
+							return errors.Wrap(err, "stream value decode failed")
 						}
 						buf = append(buf, *value)
 					}
@@ -152,12 +153,12 @@ func (n *DataSource) Process(ctx context.Context) error {
 				if err == io.EOF {
 					return nil
 				}
-				return fmt.Errorf("source failed: %v", err)
+				return errors.Wrap(err, "source failed")
 			}
 
 			elm, err := ec.Decode(r)
 			if err != nil {
-				return fmt.Errorf("source decode failed: %v", err)
+				return errors.Wrap(err, "source decode failed")
 			}
 			elm.Timestamp = t
 			elm.Windows = ws
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 0b13744..b6c479c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -25,6 +25,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 //go:generate specialize --input=fn_arity.tmpl
@@ -127,7 +128,7 @@ func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts typex.EventT
 	}
 	if n.wndIdx >= 0 {
 		if len(ws) != 1 {
-			return nil, fmt.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows)
+			return nil, errors.Errorf("DoFns that observe windows must be invoked with single window: %v", opt.Key.Windows)
 		}
 		args[n.wndIdx] = ws[0]
 	}
@@ -157,7 +158,7 @@ func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts typex.EventT
 			param := fn.Param[in[i]]
 
 			if param.Kind != funcx.FnIter {
-				return nil, fmt.Errorf("GBK/CoGBK result values must be iterable: %v", param)
+				return nil, errors.Errorf("GBK/CoGBK result values must be iterable: %v", param)
 			}
 
 			// TODO(herohde) 12/12/2017: allow form conversion on GBK results?
@@ -250,11 +251,11 @@ func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, side []ReStream) ([]Reusa
 	}
 
 	if len(in) != len(side)+1 {
-		return nil, fmt.Errorf("found %v inbound, want %v", len(in), len(side)+1)
+		return nil, errors.Errorf("found %v inbound, want %v", len(in), len(side)+1)
 	}
 	param := fn.Params(funcx.FnValue | funcx.FnIter | funcx.FnReIter)
 	if len(param) <= len(side) {
-		return nil, fmt.Errorf("found %v params, want >%v", len(param), len(side))
+		return nil, errors.Errorf("found %v params, want >%v", len(param), len(side))
 	}
 
 	// The side input are last of the above params, so we can compute the offset easily.
@@ -264,7 +265,7 @@ func makeSideInputs(fn *funcx.Fn, in []*graph.Inbound, side []ReStream) ([]Reusa
 	for i := 0; i < len(side); i++ {
 		s, err := makeSideInput(in[i+1].Kind, fn.Param[param[i+offset]].T, side[i])
 		if err != nil {
-			return nil, fmt.Errorf("failed to make side input %v: %v", i, err)
+			return nil, errors.WithContextf(err, "making side input %v", i)
 		}
 		ret = append(ret, s)
 	}
@@ -282,7 +283,7 @@ func makeEmitters(fn *funcx.Fn, nodes []Node) ([]ReusableEmitter, error) {
 	}
 	out := fn.Params(funcx.FnEmit)
 	if len(out) != len(nodes)-offset {
-		return nil, fmt.Errorf("found %v emitters, want %v", len(out), len(nodes)-offset)
+		return nil, errors.Errorf("found %v emitters, want %v", len(out), len(nodes)-offset)
 	}
 
 	var ret []ReusableEmitter
@@ -302,7 +303,7 @@ func makeSideInput(kind graph.InputKind, t reflect.Type, values ReStream) (Reusa
 			return nil, err
 		}
 		if len(elms) != 1 {
-			return nil, fmt.Errorf("singleton side input %v for %v ill-defined", kind, t)
+			return nil, errors.Errorf("singleton side input %v for %v ill-defined", kind, t)
 		}
 		return &fixedValue{val: Convert(elms[0].Elm, t)}, nil
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/input.go b/sdks/go/pkg/beam/core/runtime/exec/input.go
index c6ddb25..74851c7 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/input.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/input.go
@@ -23,6 +23,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // TODO(herohde) 4/26/2017: SideInput representation? We want it to be amenable
@@ -158,7 +159,7 @@ func (v *iterValue) invoke(args []reflect.Value) []reflect.Value {
 		if err == io.EOF {
 			return []reflect.Value{reflect.ValueOf(false)}
 		}
-		panic(fmt.Sprintf("broken stream: %v", err))
+		panic(errors.Wrap(err, "broken stream"))
 	}
 
 	// We expect 1-3 out parameters: func (*int, *string) bool.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 49f74c5..612e7ff 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 )
 
@@ -69,7 +70,7 @@ func (n *ParDo) ID() UnitID {
 // Up initializes this ParDo and does one-time DoFn setup.
 func (n *ParDo) Up(ctx context.Context) error {
 	if n.status != Initializing {
-		return fmt.Errorf("invalid status for pardo %v: %v, want Initializing", n.UID, n.status)
+		return errors.Errorf("invalid status for pardo %v: %v, want Initializing", n.UID, n.status)
 	}
 	n.status = Up
 	n.inv = newInvoker(n.Fn.ProcessElementFn())
@@ -89,7 +90,7 @@ func (n *ParDo) Up(ctx context.Context) error {
 // StartBundle does pre-bundle processing operation for the DoFn.
 func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error {
 	if n.status != Up {
-		return fmt.Errorf("invalid status for pardo %v: %v, want Up", n.UID, n.status)
+		return errors.Errorf("invalid status for pardo %v: %v, want Up", n.UID, n.status)
 	}
 	n.status = Active
 	n.side = data.SideInput
@@ -113,7 +114,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) er
 // ProcessElement processes each parallel element with the DoFn.
 func (n *ParDo) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status)
+		return errors.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status)
 	}
 	// If the function observes windows, we must invoke it for each window. The expected fast path
 	// is that either there is a single window or the function doesn't observes windows.
@@ -162,7 +163,7 @@ func mustExplodeWindows(fn *funcx.Fn, elm *FullValue, usesSideInput bool) bool {
 // persisted at this point.
 func (n *ParDo) FinishBundle(ctx context.Context) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status)
+		return errors.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status)
 	}
 	n.status = Up
 	n.inv.Reset()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index 2436f08..5031c3e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -23,6 +23,7 @@ import (
 	"strings"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 )
 
@@ -55,7 +56,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
 
 	for _, u := range units {
 		if u == nil {
-			return nil, fmt.Errorf("no <nil> units")
+			return nil, errors.Errorf("no <nil> units")
 		}
 		if r, ok := u.(Root); ok {
 			roots = append(roots, r)
@@ -68,7 +69,7 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
 		}
 	}
 	if len(roots) == 0 {
-		return nil, fmt.Errorf("no root units")
+		return nil, errors.Errorf("no root units")
 	}
 
 	return &Plan{
@@ -102,7 +103,7 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro
 	}
 
 	if p.status != Up {
-		return fmt.Errorf("invalid status for plan %v: %v", p.id, p.status)
+		return errors.Errorf("invalid status for plan %v: %v", p.id, p.status)
 	}
 
 	// Process bundle. If there are any kinds of failures, we bail and mark the plan broken.
@@ -149,9 +150,9 @@ func (p *Plan) Down(ctx context.Context) error {
 	case 0:
 		return nil
 	case 1:
-		return fmt.Errorf("plan %v failed: %v", p.id, errs[0])
+		return errors.Wrapf(errs[0], "plan %v failed", p.id)
 	default:
-		return fmt.Errorf("plan %v failed with multiple errors: %v", p.id, errs)
+		return errors.Errorf("plan %v failed with multiple errors: %v", p.id, errs)
 	}
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index df69a9a..e3981d3 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -29,6 +29,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/stringx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/golang/protobuf/proto"
@@ -55,7 +56,7 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) {
 			continue
 		}
 		if len(transform.GetOutputs()) != 1 {
-			return nil, fmt.Errorf("expected one output from DataSource, got %v", transform.GetOutputs())
+			return nil, errors.Errorf("expected one output from DataSource, got %v", transform.GetOutputs())
 		}
 
 		port, cid, err := unmarshalPort(transform.GetSpec().GetPayload())
@@ -85,7 +86,7 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) (*Plan, error) {
 					return nil, err
 				}
 				if !coder.IsW(u.Coder) {
-					return nil, fmt.Errorf("unwindowed coder %v on DataSource %v: %v", cid, id, u.Coder)
+					return nil, errors.Errorf("unwindowed coder %v on DataSource %v: %v", cid, id, u.Coder)
 				}
 			}
 		}
@@ -165,7 +166,7 @@ func (b *builder) makeWindowingStrategy(id string) (*window.WindowingStrategy, e
 
 	ws, ok := b.desc.GetWindowingStrategies()[id]
 	if !ok {
-		return nil, fmt.Errorf("windowing strategy %v not found", id)
+		return nil, errors.Errorf("windowing strategy %v not found", id)
 	}
 	wfn, err := unmarshalWindowFn(ws.GetWindowFn().GetSpec())
 	if err != nil {
@@ -219,7 +220,7 @@ func unmarshalWindowFn(wfn *pb.FunctionSpec) (*window.Fn, error) {
 		return window.NewSessions(gap), nil
 
 	default:
-		return nil, fmt.Errorf("unsupported window type: %v", urn)
+		return nil, errors.Errorf("unsupported window type: %v", urn)
 	}
 }
 
@@ -238,7 +239,7 @@ func (b *builder) makePCollections(out []string) ([]Node, error) {
 func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, *coder.WindowCoder, error) {
 	col, ok := b.desc.GetPcollections()[id]
 	if !ok {
-		return nil, nil, fmt.Errorf("pcollection %v not found", id)
+		return nil, nil, errors.Errorf("pcollection %v not found", id)
 	}
 	c, err := b.coders.Coder(col.CoderId)
 	if err != nil {
@@ -254,7 +255,7 @@ func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, *coder.Windo
 
 	ws, ok := b.desc.GetWindowingStrategies()[col.GetWindowingStrategyId()]
 	if !ok {
-		return nil, nil, fmt.Errorf("windowing strategy %v not found", id)
+		return nil, nil, errors.Errorf("windowing strategy %v not found", id)
 	}
 	wc, err := b.coders.WindowCoder(ws.GetWindowCoderId())
 	if err != nil {
@@ -342,13 +343,13 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 		case graphx.URNParDo:
 			var pardo pb.ParDoPayload
 			if err := proto.Unmarshal(payload, &pardo); err != nil {
-				return nil, fmt.Errorf("invalid ParDo payload for %v: %v", transform, err)
+				return nil, errors.Wrapf(err, "invalid ParDo payload for %v", transform)
 			}
 			data = string(pardo.GetDoFn().GetSpec().GetPayload())
 		case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract:
 			var cmb pb.CombinePayload
 			if err := proto.Unmarshal(payload, &cmb); err != nil {
-				return nil, fmt.Errorf("invalid CombinePayload payload for %v: %v", transform, err)
+				return nil, errors.Wrapf(err, "invalid CombinePayload payload for %v", transform)
 			}
 			data = string(cmb.GetCombineFn().GetSpec().GetPayload())
 		default:
@@ -363,7 +364,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 
 		var tp v1.TransformPayload
 		if err := protox.DecodeBase64(data, &tp); err != nil {
-			return nil, fmt.Errorf("invalid transform payload for %v: %v", transform, err)
+			return nil, errors.Wrapf(err, "invalid transform payload for %v", transform)
 		}
 
 		switch tpUrn := tp.GetUrn(); tpUrn {
@@ -422,14 +423,14 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 				case urnPerKeyCombinePre:
 					inputs := unmarshalKeyedValues(transform.GetInputs())
 					if len(inputs) != 1 {
-						return nil, fmt.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
+						return nil, errors.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
 					}
 					ec, _, err := b.makeCoderForPCollection(inputs[0])
 					if err != nil {
 						return nil, err
 					}
 					if !coder.IsKV(ec) {
-						return nil, fmt.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec)
+						return nil, errors.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec)
 					}
 					u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0]}
 				case urnPerKeyCombineMerge:
@@ -452,7 +453,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 				return nil, err
 			}
 			if !coder.IsKV(c) {
-				return nil, fmt.Errorf("unexpected inject coder: %v", c)
+				return nil, errors.Errorf("unexpected inject coder: %v", c)
 			}
 			u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), ValueEncoder: MakeElementEncoder(c.Components[1]), Out: out[0]}
 
@@ -466,7 +467,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 				return nil, err
 			}
 			if !coder.IsCoGBK(c) {
-				return nil, fmt.Errorf("unexpected expand coder: %v", c)
+				return nil, errors.Errorf("unexpected expand coder: %v", c)
 			}
 
 			var decoders []ElementDecoder
@@ -476,13 +477,13 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 			u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: out[0]}
 
 		default:
-			return nil, fmt.Errorf("unexpected payload: %v", tp)
+			return nil, errors.Errorf("unexpected payload: %v", tp)
 		}
 
 	case graphx.URNWindow:
 		var wp pb.WindowIntoPayload
 		if err := proto.Unmarshal(payload, &wp); err != nil {
-			return nil, fmt.Errorf("invalid WindowInto payload for %v: %v", transform, err)
+			return nil, errors.Wrapf(err, "invalid WindowInto payload for %v", transform)
 		}
 		wfn, err := unmarshalWindowFn(wp.GetWindowFn().GetSpec())
 		if err != nil {
@@ -516,7 +517,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 					return nil, err
 				}
 				if !coder.IsW(sink.Coder) {
-					return nil, fmt.Errorf("unwindowed coder %v on DataSink %v: %v", cid, id, sink.Coder)
+					return nil, errors.Errorf("unwindowed coder %v on DataSink %v: %v", cid, id, sink.Coder)
 				}
 			}
 		}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
index aa0934a..0d17746 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
@@ -17,9 +17,9 @@ package exec
 
 import (
 	"context"
-	"fmt"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // CaptureNode is a test Node that captures all elements for verification. It also
@@ -37,7 +37,7 @@ func (n *CaptureNode) ID() UnitID {
 
 func (n *CaptureNode) Up(ctx context.Context) error {
 	if n.status != Initializing {
-		return fmt.Errorf("invalid status for %v: %v, want Initializing", n.UID, n.status)
+		return errors.Errorf("invalid status for %v: %v, want Initializing", n.UID, n.status)
 	}
 	n.status = Up
 	return nil
@@ -45,7 +45,7 @@ func (n *CaptureNode) Up(ctx context.Context) error {
 
 func (n *CaptureNode) StartBundle(ctx context.Context, id string, data DataContext) error {
 	if n.status != Up {
-		return fmt.Errorf("invalid status for %v: %v, want Up", n.UID, n.status)
+		return errors.Errorf("invalid status for %v: %v, want Up", n.UID, n.status)
 	}
 	n.status = Active
 	return nil
@@ -53,7 +53,7 @@ func (n *CaptureNode) StartBundle(ctx context.Context, id string, data DataConte
 
 func (n *CaptureNode) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status)
+		return errors.Errorf("invalid status for pardo %v: %v, want Active", n.UID, n.status)
 	}
 
 	n.Elements = append(n.Elements, *elm)
@@ -62,7 +62,7 @@ func (n *CaptureNode) ProcessElement(ctx context.Context, elm *FullValue, values
 
 func (n *CaptureNode) FinishBundle(ctx context.Context) error {
 	if n.status != Active {
-		return fmt.Errorf("invalid status for %v: %v, want Active", n.UID, n.status)
+		return errors.Errorf("invalid status for %v: %v, want Active", n.UID, n.status)
 	}
 	n.status = Up
 	return nil
@@ -70,7 +70,7 @@ func (n *CaptureNode) FinishBundle(ctx context.Context) error {
 
 func (n *CaptureNode) Down(ctx context.Context) error {
 	if n.status != Up {
-		return fmt.Errorf("invalid status for %v: %v, want Up", n.UID, n.status)
+		return errors.Errorf("invalid status for %v: %v, want Up", n.UID, n.status)
 	}
 	n.status = Down
 	return nil
diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go b/sdks/go/pkg/beam/core/runtime/exec/util.go
index 1fdfd76..16682b9 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/util.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/util.go
@@ -17,8 +17,9 @@ package exec
 
 import (
 	"context"
-	"fmt"
 	"runtime/debug"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // GenID is a simple UnitID generator.
@@ -36,7 +37,7 @@ func (g *GenID) New() UnitID {
 func callNoPanic(ctx context.Context, fn func(context.Context) error) (err error) {
 	defer func() {
 		if r := recover(); r != nil {
-			err = fmt.Errorf("panic: %v %s", r, debug.Stack())
+			err = errors.Errorf("panic: %v %s", r, debug.Stack())
 		}
 	}()
 	return fn(ctx)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index 43458f9..50e4591 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/golang/protobuf/proto"
 )
@@ -61,7 +62,7 @@ func UnmarshalCoders(ids []string, m map[string]*pb.Coder) ([]*coder.Coder, erro
 	for _, id := range ids {
 		c, err := b.Coder(id)
 		if err != nil {
-			return nil, fmt.Errorf("failed to unmarshal coder %v: %v", id, err)
+			return nil, err
 		}
 		coders = append(coders, c)
 	}
@@ -105,12 +106,13 @@ func (b *CoderUnmarshaller) Coder(id string) (*coder.Coder, error) {
 	}
 	c, ok := b.models[id]
 	if !ok {
-		return nil, fmt.Errorf("coder with id %v not found", id)
+		err := errors.Errorf("coder with id %v not found", id)
+		return nil, errors.WithContextf(err, "unmarshalling coder %v", id)
 	}
 
 	ret, err := b.makeCoder(c)
 	if err != nil {
-		return nil, fmt.Errorf("failed to unmarshal coder %v: %v", id, err)
+		return nil, errors.WithContextf(err, "unmarshalling coder %v", id)
 	}
 
 	b.coders[id] = ret
@@ -157,7 +159,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 
 	case urnKVCoder:
 		if len(components) != 2 {
-			return nil, fmt.Errorf("could not unmarshal KV coder from %v, want exactly 2 components but have %d", c, len(components))
+			return nil, errors.Errorf("could not unmarshal KV coder from %v, want exactly 2 components but have %d", c, len(components))
 		}
 
 		key, err := b.Coder(components[0])
@@ -206,7 +208,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 
 	case urnLengthPrefixCoder:
 		if len(components) != 1 {
-			return nil, fmt.Errorf("could not unmarshal length prefix coder from %v, want a single sub component but have %d", c, len(components))
+			return nil, errors.Errorf("could not unmarshal length prefix coder from %v, want a single sub component but have %d", c, len(components))
 		}
 
 		elm, err := b.peek(components[0])
@@ -217,7 +219,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 		// the portable pipeline model directly (BEAM-2885)
 		if elm.GetSpec().GetSpec().GetUrn() != "" && elm.GetSpec().GetSpec().GetUrn() != urnCustomCoder {
 			// TODO(herohde) 11/17/2017: revisit this restriction
-			return nil, fmt.Errorf("could not unmarshal length prefix coder from %v, want a custom coder as a sub component but got %v", c, elm)
+			return nil, errors.Errorf("could not unmarshal length prefix coder from %v, want a custom coder as a sub component but got %v", c, elm)
 		}
 
 		var ref v1.CustomCoder
@@ -233,7 +235,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 
 	case urnWindowedValueCoder:
 		if len(components) != 2 {
-			return nil, fmt.Errorf("could not unmarshal windowed value coder from %v, expected two components but got %d", c, len(components))
+			return nil, errors.Errorf("could not unmarshal windowed value coder from %v, expected two components but got %d", c, len(components))
 		}
 
 		elm, err := b.Coder(components[0])
@@ -248,7 +250,7 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 		return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}, nil
 
 	case streamType:
-		return nil, fmt.Errorf("could not unmarshal stream type coder from %v, stream must be pair value", c)
+		return nil, errors.Errorf("could not unmarshal stream type coder from %v, stream must be pair value", c)
 
 	case "":
 		// TODO(herohde) 11/27/2017: we still see CoderRefs from Dataflow. Handle that
@@ -258,23 +260,23 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) (*coder.Coder, error) {
 
 		var ref CoderRef
 		if err := json.Unmarshal(payload, &ref); err != nil {
-			return nil, fmt.Errorf("could not unmarshal CoderRef from %v, failed to decode urn-less coder's payload \"%v\": %v", c, string(payload), err)
+			return nil, errors.Wrapf(err, "could not unmarshal CoderRef from %v, failed to decode urn-less coder's payload \"%v\"", c, string(payload))
 		}
 		c, err := DecodeCoderRef(&ref)
 		if err != nil {
-			return nil, fmt.Errorf("could not unmarshal CoderRef from %v, failed to decode CoderRef \"%v\": %v", c, string(payload), err)
+			return nil, errors.Wrapf(err, "could not unmarshal CoderRef from %v, failed to decode CoderRef \"%v\"", c, string(payload))
 		}
 		return c, nil
 
 	default:
-		return nil, fmt.Errorf("could not unmarshal coder from %v, unknown URN %v", c, urn)
+		return nil, errors.Errorf("could not unmarshal coder from %v, unknown URN %v", c, urn)
 	}
 }
 
 func (b *CoderUnmarshaller) peek(id string) (*pb.Coder, error) {
 	c, ok := b.models[id]
 	if !ok {
-		return nil, fmt.Errorf("coder with id %v not found", id)
+		return nil, errors.Errorf("coder with id %v not found", id)
 	}
 	return c, nil
 }
@@ -326,7 +328,7 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
 		}
 		data, err := protox.EncodeBase64(ref)
 		if err != nil {
-			panic(fmt.Sprintf("Failed to marshal custom coder %v: %v", c, err))
+			panic(errors.Wrapf(err, "Failed to marshal custom coder %v", c))
 		}
 		inner := b.internCoder(&pb.Coder{
 			Spec: &pb.SdkFunctionSpec{
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
index 52d5f6a..fd29f40 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/dataflow.go
@@ -16,12 +16,11 @@
 package graphx
 
 import (
-	"fmt"
-
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // TODO(herohde) 7/17/2018: move CoderRef to dataflowlib once Dataflow
@@ -95,7 +94,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
 
 	case coder.KV:
 		if len(c.Components) != 2 {
-			return nil, fmt.Errorf("bad KV: %v", c)
+			return nil, errors.Errorf("bad KV: %v", c)
 		}
 
 		key, err := EncodeCoderRef(c.Components[0])
@@ -110,7 +109,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
 
 	case coder.CoGBK:
 		if len(c.Components) < 2 {
-			return nil, fmt.Errorf("bad CoGBK: %v", c)
+			return nil, errors.Errorf("bad CoGBK: %v", c)
 		}
 
 		refs, err := EncodeCoderRefs(c.Components)
@@ -131,7 +130,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
 
 	case coder.WindowedValue:
 		if len(c.Components) != 1 || c.Window == nil {
-			return nil, fmt.Errorf("bad windowed value: %v", c)
+			return nil, errors.Errorf("bad windowed value: %v", c)
 		}
 
 		elm, err := EncodeCoderRef(c.Components[0])
@@ -152,7 +151,7 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
 		return &CoderRef{Type: varIntType}, nil
 
 	default:
-		return nil, fmt.Errorf("bad coder kind: %v", c.Kind)
+		return nil, errors.Errorf("bad coder kind: %v", c.Kind)
 	}
 }
 
@@ -180,7 +179,7 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
 
 	case pairType:
 		if len(c.Components) != 2 {
-			return nil, fmt.Errorf("bad pair: %+v", c)
+			return nil, errors.Errorf("bad pair: %+v", c)
 		}
 
 		key, err := DecodeCoderRef(c.Components[0])
@@ -223,12 +222,12 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
 
 	case lengthPrefixType:
 		if len(c.Components) != 1 {
-			return nil, fmt.Errorf("bad length prefix: %+v", c)
+			return nil, errors.Errorf("bad length prefix: %+v", c)
 		}
 
 		var ref v1.CustomCoder
 		if err := protox.DecodeBase64(c.Components[0].Type, &ref); err != nil {
-			return nil, fmt.Errorf("base64 decode for %v failed: %v", c.Components[0].Type, err)
+			return nil, errors.Wrapf(err, "base64 decode for %v failed", c.Components[0].Type)
 		}
 		custom, err := decodeCustomCoder(&ref)
 		if err != nil {
@@ -239,7 +238,7 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
 
 	case windowedValueType:
 		if len(c.Components) != 2 {
-			return nil, fmt.Errorf("bad windowed value: %+v", c)
+			return nil, errors.Errorf("bad windowed value: %+v", c)
 		}
 
 		elm, err := DecodeCoderRef(c.Components[0])
@@ -255,10 +254,10 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
 		return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}, nil
 
 	case streamType:
-		return nil, fmt.Errorf("stream must be pair value: %+v", c)
+		return nil, errors.Errorf("stream must be pair value: %+v", c)
 
 	default:
-		return nil, fmt.Errorf("custom coders must be length prefixed: %+v", c)
+		return nil, errors.Errorf("custom coders must be length prefixed: %+v", c)
 	}
 }
 
@@ -283,7 +282,7 @@ func encodeWindowCoder(w *coder.WindowCoder) (*CoderRef, error) {
 	case coder.IntervalWindow:
 		return &CoderRef{Type: intervalWindowType}, nil
 	default:
-		return nil, fmt.Errorf("bad window kind: %v", w.Kind)
+		return nil, errors.Errorf("bad window kind: %v", w.Kind)
 	}
 }
 
@@ -296,6 +295,6 @@ func decodeWindowCoder(w *CoderRef) (*coder.WindowCoder, error) {
 	case intervalWindowType:
 		return coder.NewIntervalWindow(), nil
 	default:
-		return nil, fmt.Errorf("bad window: %v", w.Type)
+		return nil, errors.Errorf("bad window: %v", w.Type)
 	}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 5f0e944..61e5174 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -30,6 +30,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var genFnType = reflect.TypeOf((*func(string, reflect.Type, []byte) reflectx.Func)(nil)).Elem()
@@ -43,14 +44,16 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error) {
 	if edge.DoFn != nil {
 		ref, err := encodeFn((*graph.Fn)(edge.DoFn))
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode userfn %v, bad userfn: %v", edge, err)
+			wrapped := errors.Wrap(err, "bad userfn")
+			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
 		}
 		ret.Fn = ref
 	}
 	if edge.CombineFn != nil {
 		ref, err := encodeFn((*graph.Fn)(edge.CombineFn))
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode userfn %v, bad combinefn: %v", edge, err)
+			wrapped := errors.Wrap(err, "bad combinefn")
+			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
 		}
 		ret.Fn = ref
 	}
@@ -62,14 +65,16 @@ func EncodeMultiEdge(edge *graph.MultiEdge) (*v1.MultiEdge, error) {
 		kind := encodeInputKind(in.Kind)
 		t, err := encodeFullType(in.Type)
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode userfn %v, bad input type: %v", edge, err)
+			wrapped := errors.Wrap(err, "bad input type")
+			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
 		}
 		ret.Inbound = append(ret.Inbound, &v1.MultiEdge_Inbound{Kind: kind, Type: t})
 	}
 	for _, out := range edge.Output {
 		t, err := encodeFullType(out.Type)
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode userfn %v, bad output type: %v", edge, err)
+			wrapped := errors.Wrap(err, "bad output type")
+			return nil, errors.WithContextf(wrapped, "encoding userfn %v", edge)
 		}
 		ret.Outbound = append(ret.Outbound, &v1.MultiEdge_Outbound{Type: t})
 	}
@@ -91,7 +96,8 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [
 		var err error
 		u, err = decodeFn(edge.Fn)
 		if err != nil {
-			return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad function: %v", edge, err)
+			wrapped := errors.Wrap(err, "bad function")
+			return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
 		}
 	}
 	if edge.WindowFn != nil {
@@ -100,18 +106,21 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [
 	for _, in := range edge.Inbound {
 		kind, err := decodeInputKind(in.Kind)
 		if err != nil {
-			return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad input kind: %v", edge, err)
+			wrapped := errors.Wrap(err, "bad input kind")
+			return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
 		}
 		t, err := decodeFullType(in.Type)
 		if err != nil {
-			return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad input type: %v", edge, err)
+			wrapped := errors.Wrap(err, "bad input type")
+			return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
 		}
 		inbound = append(inbound, &graph.Inbound{Kind: kind, Type: t})
 	}
 	for _, out := range edge.Outbound {
 		t, err := decodeFullType(out.Type)
 		if err != nil {
-			return "", nil, nil, nil, nil, fmt.Errorf("failed to decode userfn %v, bad output type: %v", edge, err)
+			wrapped := errors.Wrap(err, "bad output type")
+			return "", nil, nil, nil, nil, errors.WithContextf(wrapped, "decoding userfn %v", edge)
 		}
 		outbound = append(outbound, &graph.Outbound{Type: t})
 	}
@@ -122,15 +131,17 @@ func DecodeMultiEdge(edge *v1.MultiEdge) (graph.Opcode, *graph.Fn, *window.Fn, [
 func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) {
 	t, err := encodeType(c.Type)
 	if err != nil {
-		return nil, fmt.Errorf("failed to encode custom coder %v for type %v: %v", c, c.Type, err)
+		return nil, errors.WithContextf(err, "encoding custom coder %v for type %v", c, c.Type)
 	}
 	enc, err := encodeUserFn(c.Enc)
 	if err != nil {
-		return nil, fmt.Errorf("failed to encode custom coder %v, bad encoding function: %v", c, err)
+		wrapped := errors.Wrap(err, "bad encoding function")
+		return nil, errors.WithContextf(wrapped, "encoding custom coder %v", c)
 	}
 	dec, err := encodeUserFn(c.Dec)
 	if err != nil {
-		return nil, fmt.Errorf("failed to encode custom coder %v, bad decoding function: %v", c, err)
+		wrapped := errors.Wrap(err, "bad decoding function")
+		return nil, errors.WithContextf(wrapped, "encoding custom coder %v", c)
 	}
 
 	ret := &v1.CustomCoder{
@@ -145,20 +156,22 @@ func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error) {
 func decodeCustomCoder(c *v1.CustomCoder) (*coder.CustomCoder, error) {
 	t, err := decodeType(c.Type)
 	if err != nil {
-		return nil, fmt.Errorf("failed to decode custom coder %v for type %v: %v", c, c.Type, err)
+		return nil, errors.WithContextf(err, "decoding custom coder %v for type %v", c, c.Type)
 	}
 	enc, err := decodeUserFn(c.Enc)
 	if err != nil {
-		return nil, fmt.Errorf("failed to decode custom coder %v, bad encoding function: %v", c, err)
+		wrapped := errors.Wrap(err, "bad encoding function")
+		return nil, errors.WithContextf(wrapped, "decoding custom coder %v", c)
 	}
 	dec, err := decodeUserFn(c.Dec)
 	if err != nil {
-		return nil, fmt.Errorf("failed to decode custom coder %v, bad decoding function: %v", c, err)
+		wrapped := errors.Wrap(err, "bad decoding function")
+		return nil, errors.WithContextf(wrapped, "decoding custom coder %v", c)
 	}
 
 	ret, err := coder.NewCustomCoder(c.Name, t, enc, dec)
 	if err != nil {
-		return nil, fmt.Errorf("failed to decode custom coder %v: %v", c, err)
+		return nil, errors.WithContextf(err, "decoding custom coder %v", c)
 	}
 	return ret, nil
 }
@@ -195,7 +208,8 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
 		gen := reflectx.FunctionName(u.DynFn.Gen)
 		t, err := encodeType(u.DynFn.T)
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode dynamic DoFn %v, bad function type: %v", u, err)
+			wrapped := errors.Wrap(err, "bad function type")
+			return nil, errors.WithContextf(wrapped, "encoding dynamic DoFn %v", u)
 		}
 		return &v1.Fn{Dynfn: &v1.DynFn{
 			Name: u.DynFn.Name,
@@ -207,7 +221,8 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
 	case u.Fn != nil:
 		fn, err := encodeUserFn(u.Fn)
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode DoFn %v, bad userfn: %v", u, err)
+			wrapped := errors.Wrap(err, "bad userfn")
+			return nil, errors.WithContextf(wrapped, "encoding DoFn %v", u)
 		}
 		return &v1.Fn{Fn: fn}, nil
 
@@ -215,19 +230,23 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
 		t := reflect.TypeOf(u.Recv)
 		k, ok := runtime.TypeKey(reflectx.SkipPtr(t))
 		if !ok {
-			return nil, fmt.Errorf("failed to encode structural DoFn %v, failed to create TypeKey for receiver type %T", u, u.Recv)
+			err := errors.Errorf("failed to create TypeKey for receiver type %T", u.Recv)
+			return nil, errors.WithContextf(err, "encoding structural DoFn %v", u)
 		}
 		if _, ok := runtime.LookupType(k); !ok {
-			return nil, fmt.Errorf("failed to encode structural DoFn %v, receiver type %v must be registered", u, t)
+			err := errors.Errorf("receiver type %v must be registered", t)
+			return nil, errors.WithContextf(err, "encoding structural DoFn %v", u)
 		}
 		typ, err := encodeType(t)
 		if err != nil {
-			panic(fmt.Sprintf("Failed to encode structural DoFn %v, failed to encode receiver type %T: %v", u, u.Recv, err))
+			wrapped := errors.Wrapf(err, "failed to encode receiver type %T", u.Recv)
+			panic(errors.WithContextf(wrapped, "encoding structural DoFn %v", u))
 		}
 
 		data, err := json.Marshal(u.Recv)
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode structural DoFn %v, failed to marshal receiver %v: %v", u, u.Recv, err)
+			wrapped := errors.Wrapf(err, "failed to marshal receiver %v", u.Recv)
+			return nil, errors.WithContextf(wrapped, "encoding structural DoFn %v", u)
 		}
 		return &v1.Fn{Type: typ, Opt: string(data)}, nil
 
@@ -240,12 +259,14 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) {
 	if u.Dynfn != nil {
 		gen, err := runtime.ResolveFunction(u.Dynfn.Gen, genFnType)
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode dynamic DoFn %v, bad symbol %v: %v", u, u.Dynfn.Gen, err)
+			wrapped := errors.Wrapf(err, "bad symbol %v", u.Dynfn.Gen)
+			return nil, errors.WithContextf(wrapped, "decoding dynamic DoFn %v", u)
 		}
 
 		t, err := decodeType(u.Dynfn.Type)
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode dynamic DoFn %v, bad type: %v", u, err)
+			wrapped := errors.Wrap(err, "bad type")
+			return nil, errors.WithContextf(wrapped, "failed to decode dynamic DoFn %v", u)
 		}
 		return graph.NewFn(&graph.DynFn{
 			Name: u.Dynfn.Name,
@@ -257,22 +278,26 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) {
 	if u.Fn != nil {
 		fn, err := decodeUserFn(u.Fn)
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode DoFn %v, failed to decode userfn: %v", u, err)
+			wrapped := errors.Wrap(err, "failed to decode userfn")
+			return nil, errors.WithContextf(wrapped, "decoding DoFn %v", u)
 		}
 		fx, err := funcx.New(reflectx.MakeFunc(fn))
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode DoFn %v, failed to construct userfn: %v", u, err)
+			wrapped := errors.Wrap(err, "failed to construct userfn")
+			return nil, errors.WithContextf(wrapped, "decoding DoFn %v", u)
 		}
 		return &graph.Fn{Fn: fx}, nil
 	}
 
 	t, err := decodeType(u.Type)
 	if err != nil {
-		return nil, fmt.Errorf("failed to decode structural DoFn %v, bad type: %v", u, err)
+		wrapped := errors.Wrap(err, "bad type")
+		return nil, errors.WithContextf(wrapped, "decoding structural DoFn %v", u)
 	}
 	fn, err := reflectx.UnmarshalJSON(t, u.Opt)
 	if err != nil {
-		return nil, fmt.Errorf("failed to decode structural DoFn %v, bad struct encoding: %v", u, err)
+		wrapped := errors.Wrap(err, "bad struct encoding")
+		return nil, errors.WithContextf(wrapped, "decoding structural DoFn %v", u)
 	}
 	return graph.NewFn(fn)
 }
@@ -286,7 +311,8 @@ func encodeUserFn(u *funcx.Fn) (*v1.UserFn, error) {
 	symbol := u.Fn.Name()
 	t, err := encodeType(u.Fn.Type())
 	if err != nil {
-		return nil, fmt.Errorf("failed to encode userfn %v, bad function type: %v", u, err)
+		wrapped := errors.Wrap(err, "bad function type")
+		return nil, errors.WithContextf(wrapped, "encoding userfn %v", u)
 	}
 	return &v1.UserFn{Name: symbol, Type: t}, nil
 }
@@ -319,7 +345,8 @@ func encodeFullType(t typex.FullType) (*v1.FullType, error) {
 
 	prim, err := encodeType(t.Type())
 	if err != nil {
-		return nil, fmt.Errorf("failed to encode full type %v, bad type: %v", t, err)
+		wrapped := errors.Wrap(err, "bad type")
+		return nil, errors.WithContextf(wrapped, "encoding full type %v", t)
 	}
 	return &v1.FullType{Type: prim, Components: components}, nil
 }
@@ -336,7 +363,8 @@ func decodeFullType(t *v1.FullType) (typex.FullType, error) {
 
 	prim, err := decodeType(t.Type)
 	if err != nil {
-		return nil, fmt.Errorf("failed to decode full type %v, bad type: %v", t, err)
+		wrapped := errors.Wrap(err, "bad type")
+		return nil, errors.WithContextf(wrapped, "decoding full type %v", t)
 	}
 	return typex.New(prim, components...), nil
 }
@@ -388,7 +416,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 	case reflect.Slice:
 		elm, err := encodeType(t.Elem())
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode slice %v, bad element type: %v", t, err)
+			wrapped := errors.Wrap(err, "bad element type")
+			return nil, errors.WithContextf(wrapped, "encoding slice %v", t)
 		}
 		return &v1.Type{Kind: v1.Type_SLICE, Element: elm}, nil
 
@@ -399,7 +428,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 
 			fType, err := encodeType(f.Type)
 			if err != nil {
-				return nil, fmt.Errorf("failed to encode struct %v, bad field type: %v", t, err)
+				wrapped := errors.Wrap(err, "bad field type")
+				return nil, errors.WithContextf(wrapped, "encoding struct %v", t)
 			}
 
 			field := &v1.Type_StructField{
@@ -420,7 +450,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 		for i := 0; i < t.NumIn(); i++ {
 			param, err := encodeType(t.In(i))
 			if err != nil {
-				return nil, fmt.Errorf("failed to encode function %v, bad parameter type: %v", t, err)
+				wrapped := errors.Wrap(err, "bad parameter type")
+				return nil, errors.WithContextf(wrapped, "encoding function %v", t)
 			}
 			in = append(in, param)
 		}
@@ -428,7 +459,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 		for i := 0; i < t.NumOut(); i++ {
 			ret, err := encodeType(t.Out(i))
 			if err != nil {
-				return nil, fmt.Errorf("failed to encode function %v, bad return type: %v", t, err)
+				wrapped := errors.Wrap(err, "bad return type")
+				return nil, errors.WithContextf(wrapped, "encoding function %v", t)
 			}
 			out = append(out, ret)
 		}
@@ -437,7 +469,8 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 	case reflect.Chan:
 		elm, err := encodeType(t.Elem())
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode channel %v, bad element type: %v", t, err)
+			wrapped := errors.Wrap(err, "bad element type")
+			return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
 		}
 		dir := encodeChanDir(t.ChanDir())
 		return &v1.Type{Kind: v1.Type_CHAN, Element: elm, ChanDir: dir}, nil
@@ -445,12 +478,13 @@ func encodeType(t reflect.Type) (*v1.Type, error) {
 	case reflect.Ptr:
 		elm, err := encodeType(t.Elem())
 		if err != nil {
-			return nil, fmt.Errorf("failed to encode pointer %v, bad base type: %v", t, err)
+			wrapped := errors.Wrap(err, "bad base type")
+			return nil, errors.WithContextf(wrapped, "encoding pointer %v", t)
 		}
 		return &v1.Type{Kind: v1.Type_PTR, Element: elm}, nil
 
 	default:
-		return nil, fmt.Errorf("unencodable type %v", t)
+		return nil, errors.Errorf("unencodable type %v", t)
 	}
 }
 
@@ -496,7 +530,8 @@ func tryEncodeSpecial(t reflect.Type) (v1.Type_Special, bool) {
 
 func decodeType(t *v1.Type) (reflect.Type, error) {
 	if t == nil {
-		return nil, fmt.Errorf("failed to decode type %v, empty type", t)
+		err := errors.New("empty type")
+		return nil, errors.WithContextf(err, "decoding type %v", t)
 	}
 
 	switch t.Kind {
@@ -532,7 +567,8 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
 	case v1.Type_SLICE:
 		elm, err := decodeType(t.GetElement())
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err)
+			wrapped := errors.Wrap(err, "bad element")
+			return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad element: %v", t)
 		}
 		return reflect.SliceOf(elm), nil
 
@@ -541,7 +577,8 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
 		for _, f := range t.Fields {
 			fType, err := decodeType(f.Type)
 			if err != nil {
-				return nil, fmt.Errorf("failed to decode type %v, bad field type: %v", t, err)
+				wrapped := errors.Wrap(err, "bad field type")
+				return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad field type: %v", t)
 			}
 
 			field := reflect.StructField{
@@ -560,48 +597,56 @@ func decodeType(t *v1.Type) (reflect.Type, error) {
 	case v1.Type_FUNC:
 		in, err := decodeTypes(t.GetParameterTypes())
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode type %v, bad parameter type: %v", t, err)
+			wrapped := errors.Wrap(err, "bad parameter type")
+			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
 		}
 		out, err := decodeTypes(t.GetReturnTypes())
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode type %v, bad return type: %v", t, err)
+			wrapped := errors.Wrap(err, "bad return type")
+			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
 		}
 		return reflect.FuncOf(in, out, t.GetIsVariadic()), nil
 
 	case v1.Type_CHAN:
 		elm, err := decodeType(t.GetElement())
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err)
+			wrapped := errors.Wrap(err, "bad element")
+			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
 		}
 		dir, err := decodeChanDir(t.GetChanDir())
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode type %v, bad channel direction: %v", t, err)
+			wrapped := errors.Wrap(err, "bad channel direction")
+			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
 		}
 		return reflect.ChanOf(dir, elm), nil
 
 	case v1.Type_PTR:
 		elm, err := decodeType(t.GetElement())
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err)
+			wrapped := errors.Wrap(err, "bad element")
+			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
 		}
 		return reflect.PtrTo(elm), nil
 
 	case v1.Type_SPECIAL:
 		ret, err := decodeSpecial(t.Special)
 		if err != nil {
-			return nil, fmt.Errorf("failed to decode type %v, bad element: %v", t, err)
+			wrapped := errors.Wrap(err, "bad element")
+			return nil, errors.WithContextf(wrapped, "decoding type %v", t)
 		}
 		return ret, nil
 
 	case v1.Type_EXTERNAL:
 		ret, ok := runtime.LookupType(t.ExternalKey)
 		if !ok {
-			return nil, fmt.Errorf("failed to decode type %v, external key not found %v", t, t.ExternalKey)
+			err := errors.Errorf("external key not found %v", t.ExternalKey)
+			return nil, errors.WithContextf(err, "decoding type %v", t)
 		}
 		return ret, nil
 
 	default:
-		return nil, fmt.Errorf("failed to decode type %v, unexpected type kind %v", t, t.Kind)
+		err := errors.Errorf("unexpected type kind %v", t.Kind)
+		return nil, errors.WithContextf(err, "failed to decode type %v", t)
 	}
 }
 
@@ -641,7 +686,7 @@ func decodeSpecial(s v1.Type_Special) (reflect.Type, error) {
 		return typex.ZType, nil
 
 	default:
-		return nil, fmt.Errorf("failed to decode special type, unknown type %v", s)
+		return nil, errors.Errorf("failed to decode special type, unknown type %v", s)
 	}
 }
 
@@ -695,7 +740,8 @@ func decodeChanDir(dir v1.Type_ChanDir) (reflect.ChanDir, error) {
 	case v1.Type_BOTH:
 		return reflect.BothDir, nil
 	default:
-		return reflect.BothDir, fmt.Errorf("failed to decode channel direction, invalid value: %v", dir)
+		err := errors.Errorf("invalid value: %v", dir)
+		return reflect.BothDir, errors.WithContext(err, "decoding channel direction")
 	}
 }
 
@@ -737,6 +783,7 @@ func decodeInputKind(k v1.MultiEdge_Inbound_InputKind) (graph.InputKind, error)
 	case v1.MultiEdge_Inbound_REITER:
 		return graph.ReIter, nil
 	default:
-		return graph.Main, fmt.Errorf("failed to decode input kind, invalid value: %v", k)
+		err := errors.Errorf("invalid value: %v", k)
+		return graph.Main, errors.WithContext(err, "decoding input kind")
 	}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 16328a7..3798be4 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -24,6 +24,7 @@ import (
 	v1 "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
@@ -567,7 +568,7 @@ func makeWindowCoder(w *window.Fn) *coder.WindowCoder {
 func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) string {
 	ref, err := EncodeMultiEdge(edge)
 	if err != nil {
-		panic(fmt.Sprintf("Failed to serialize %v: %v", edge, err))
+		panic(errors.Wrapf(err, "Failed to serialize %v", edge))
 	}
 	return protox.MustEncodeBase64(&v1.TransformPayload{
 		Urn:  URNDoFn,
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index 5c25be0..2d985d4 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -17,12 +17,12 @@ package harness
 
 import (
 	"context"
-	"fmt"
 	"io"
 	"sync"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 )
@@ -69,7 +69,7 @@ func (s *ScopedDataManager) open(ctx context.Context, port exec.Port) (*DataChan
 	s.mu.Lock()
 	if s.closed {
 		s.mu.Unlock()
-		return nil, fmt.Errorf("instruction %v no longer processing", s.instID)
+		return nil, errors.Errorf("instruction %v no longer processing", s.instID)
 	}
 	local := s.mgr
 	s.mu.Unlock()
@@ -147,12 +147,12 @@ type DataChannel struct {
 func newDataChannel(ctx context.Context, port exec.Port) (*DataChannel, error) {
 	cc, err := dial(ctx, port.URL, 15*time.Second)
 	if err != nil {
-		return nil, fmt.Errorf("failed to connect: %v", err)
+		return nil, errors.Wrap(err, "failed to connect")
 	}
 	client, err := pb.NewBeamFnDataClient(cc).Data(ctx)
 	if err != nil {
 		cc.Close()
-		return nil, fmt.Errorf("failed to connect to data service: %v", err)
+		return nil, errors.Wrap(err, "failed to connect to data service")
 	}
 	return makeDataChannel(ctx, port.URL, client), nil
 }
@@ -187,7 +187,7 @@ func (c *DataChannel) read(ctx context.Context) {
 				log.Warnf(ctx, "DataChannel %v closed", c.id)
 				return
 			}
-			panic(fmt.Errorf("channel %v bad: %v", c.id, err))
+			panic(errors.Wrapf(err, "channel %v bad", c.id))
 		}
 
 		recordStreamReceive(msg)
@@ -374,7 +374,7 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
 		l := len(w.buf)
 		// We can't fit this message into the buffer. We need to flush the buffer
 		if err := w.Flush(); err != nil {
-			return 0, fmt.Errorf("datamgr.go: error flushing buffer of length %d: %v", l, err)
+			return 0, errors.Wrapf(err, "datamgr.go: error flushing buffer of length %d", l)
 		}
 	}
 
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 08ee7dc..0c40f27 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -25,6 +25,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
@@ -49,13 +50,13 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 
 	conn, err := dial(ctx, controlEndpoint, 60*time.Second)
 	if err != nil {
-		return fmt.Errorf("Failed to connect: %v", err)
+		return errors.Wrap(err, "failed to connect")
 	}
 	defer conn.Close()
 
 	client, err := fnpb.NewBeamFnControlClient(conn).Control(ctx)
 	if err != nil {
-		return fmt.Errorf("Failed to connect to control service: %v", err)
+		return errors.Wrapf(err, "failed to connect to control service")
 	}
 
 	log.Debugf(ctx, "Successfully connected to control @ %v", controlEndpoint)
@@ -101,7 +102,7 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 				recordFooter()
 				return nil
 			}
-			return fmt.Errorf("recv failed: %v", err)
+			return errors.Wrapf(err, "recv failed")
 		}
 
 		// Launch a goroutine to handle the control message.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/logging.go b/sdks/go/pkg/beam/core/runtime/harness/logging.go
index f7608cd..ad24148 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/logging.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/logging.go
@@ -22,6 +22,7 @@ import (
 	"runtime"
 	"time"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	"github.com/golang/protobuf/ptypes"
@@ -148,5 +149,5 @@ func (w *remoteWriter) connect(ctx context.Context) error {
 
 		// fmt.Fprintf(os.Stderr, "SENT: %v\n", msg)
 	}
-	return fmt.Errorf("internal: buffer closed?")
+	return errors.New("internal: buffer closed?")
 }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/session.go b/sdks/go/pkg/beam/core/runtime/harness/session.go
index 7fecf9f..d783533 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/session.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/session.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	"github.com/golang/protobuf/proto"
 )
@@ -68,7 +69,7 @@ func recordMessage(opcode session.Kind, pb *session.Entry) error {
 	body := bufPool.Get().(*proto.Buffer)
 	defer bufPool.Put(body)
 	if err := body.Marshal(pb); err != nil {
-		return fmt.Errorf("Unable to marshal message for session recording: %v", err)
+		return errors.Wrap(err, "unable to marshal message for session recording")
 	}
 
 	eh := &session.EntryHeader{
@@ -79,13 +80,13 @@ func recordMessage(opcode session.Kind, pb *session.Entry) error {
 	hdr := bufPool.Get().(*proto.Buffer)
 	defer bufPool.Put(hdr)
 	if err := hdr.Marshal(eh); err != nil {
-		return fmt.Errorf("Unable to marshal message header for session recording: %v", err)
+		return errors.Wrap(err, "unable to marshal message header for session recording")
 	}
 
 	l := bufPool.Get().(*proto.Buffer)
 	defer bufPool.Put(l)
 	if err := l.EncodeVarint(uint64(len(hdr.Bytes()))); err != nil {
-		return fmt.Errorf("Unable to write entry header length: %v", err)
+		return errors.Wrap(err, "unable to write entry header length")
 	}
 
 	// Acquire the lock to write the file.
@@ -93,13 +94,13 @@ func recordMessage(opcode session.Kind, pb *session.Entry) error {
 	defer sessionLock.Unlock()
 
 	if _, err := capture.Write(l.Bytes()); err != nil {
-		return fmt.Errorf("Unable to write entry header length: %v", err)
+		return errors.Wrap(err, "unable to write entry header length")
 	}
 	if _, err := capture.Write(hdr.Bytes()); err != nil {
-		return fmt.Errorf("Unable to write entry header: %v", err)
+		return errors.Wrap(err, "unable to write entry header")
 	}
 	if _, err := capture.Write(body.Bytes()); err != nil {
-		return fmt.Errorf("Unable to write entry body: %v", err)
+		return errors.Wrap(err, "unable to write entry body")
 	}
 	return nil
 }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index 6445618..f5d3102 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -24,10 +24,10 @@ import (
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	"github.com/golang/protobuf/proto"
-	"github.com/pkg/errors"
 )
 
 // ScopedSideInputReader scopes the global gRPC state manager to a single instruction
@@ -55,7 +55,7 @@ func (s *ScopedSideInputReader) Open(ctx context.Context, id exec.StreamID, key,
 	s.mu.Lock()
 	if s.closed {
 		s.mu.Unlock()
-		return nil, fmt.Errorf("instruction %v no longer processing", s.instID)
+		return nil, errors.Errorf("instruction %v no longer processing", s.instID)
 	}
 	ret := newSideInputReader(ch, id.Target, s.instID, key, w)
 	s.opened = append(s.opened, ret)
@@ -67,7 +67,7 @@ func (s *ScopedSideInputReader) open(ctx context.Context, port exec.Port) (*Stat
 	s.mu.Lock()
 	if s.closed {
 		s.mu.Unlock()
-		return nil, fmt.Errorf("instruction %v no longer processing", s.instID)
+		return nil, errors.Errorf("instruction %v no longer processing", s.instID)
 	}
 	local := s.mgr
 	s.mu.Unlock()
@@ -129,7 +129,7 @@ func (r *sideInputReader) Read(buf []byte) (int, error) {
 		r.mu.Lock()
 		if r.closed {
 			r.mu.Unlock()
-			return 0, fmt.Errorf("side input closed")
+			return 0, errors.New("side input closed")
 		}
 		local := r.ch
 		r.mu.Unlock()
@@ -219,12 +219,12 @@ type StateChannel struct {
 func newStateChannel(ctx context.Context, port exec.Port) (*StateChannel, error) {
 	cc, err := dial(ctx, port.URL, 15*time.Second)
 	if err != nil {
-		return nil, fmt.Errorf("failed to connect: %v", err)
+		return nil, errors.Wrap(err, "failed to connect")
 	}
 	client, err := pb.NewBeamFnStateClient(cc).State(ctx)
 	if err != nil {
 		cc.Close()
-		return nil, fmt.Errorf("failed to connect to data service: %v", err)
+		return nil, errors.Wrap(err, "failed to connect to data service")
 	}
 
 	ret := &StateChannel{
@@ -248,7 +248,7 @@ func (c *StateChannel) read(ctx context.Context) {
 				log.Warnf(ctx, "StateChannel %v closed", c.id)
 				return
 			}
-			panic(fmt.Errorf("state channel %v bad: %v", c.id, err))
+			panic(errors.Wrapf(err, "state channel %v bad", c.id))
 		}
 
 		c.mu.Lock()
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
index 56f14b3..ad1175c 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
@@ -22,6 +22,7 @@ import (
 	"sort"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
@@ -44,7 +45,7 @@ func Update(p *pb.Pipeline, values *pb.Components) (*pb.Pipeline, error) {
 // subtransform list.
 func Normalize(p *pb.Pipeline) (*pb.Pipeline, error) {
 	if len(p.GetComponents().GetTransforms()) == 0 {
-		return nil, fmt.Errorf("empty pipeline")
+		return nil, errors.New("empty pipeline")
 	}
 
 	ret := shallowClonePipeline(p)
diff --git a/sdks/go/pkg/beam/core/runtime/symbols.go b/sdks/go/pkg/beam/core/runtime/symbols.go
index f5089f1..6dbefad 100644
--- a/sdks/go/pkg/beam/core/runtime/symbols.go
+++ b/sdks/go/pkg/beam/core/runtime/symbols.go
@@ -16,13 +16,13 @@
 package runtime
 
 import (
-	"fmt"
 	"os"
 	"reflect"
 	"sync"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/symtab"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var (
@@ -105,5 +105,5 @@ func ResolveFunction(name string, t reflect.Type) (interface{}, error) {
 type failResolver bool
 
 func (p failResolver) Sym2Addr(name string) (uintptr, error) {
-	return 0, fmt.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
+	return 0, errors.Errorf("%v not found. Use runtime.RegisterFunction in unit tests", name)
 }
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go b/sdks/go/pkg/beam/core/typex/fulltype.go
index 5066023..a96e6f9 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -21,6 +21,8 @@ import (
 	"fmt"
 	"reflect"
 	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // FullType represents the tree structure of data types processed by the graph.
@@ -298,7 +300,7 @@ func IsBound(t FullType) bool {
 // produce {"T" -> string}.
 func Bind(types, models []FullType) (map[string]reflect.Type, error) {
 	if len(types) != len(models) {
-		return nil, fmt.Errorf("typex.Bind: invalid number of models: %v, want %v", len(models), len(types))
+		return nil, errors.Errorf("typex.Bind: invalid number of models: %v, want %v", len(models), len(types))
 	}
 
 	m := make(map[string]reflect.Type)
@@ -307,7 +309,7 @@ func Bind(types, models []FullType) (map[string]reflect.Type, error) {
 		model := models[i]
 
 		if !IsStructurallyAssignable(model, t) {
-			return nil, fmt.Errorf("typex.Bind: %v is not assignable to %v", model, t)
+			return nil, errors.Errorf("typex.Bind: %v is not assignable to %v", model, t)
 		}
 		if err := walk(t, model, m); err != nil {
 			return nil, err
@@ -326,7 +328,7 @@ func walk(t, model FullType, m map[string]reflect.Type) error {
 
 		name := t.Type().Name()
 		if current, ok := m[name]; ok && current != model.Type() {
-			return fmt.Errorf("bind conflict for %v: %v != %v", name, current, model.Type())
+			return errors.Errorf("bind conflict for %v: %v != %v", name, current, model.Type())
 		}
 		m[name] = model.Type()
 		return nil
@@ -363,7 +365,7 @@ func substitute(t FullType, m map[string]reflect.Type) (FullType, error) {
 		name := t.Type().Name()
 		repl, ok := m[name]
 		if !ok {
-			return nil, fmt.Errorf("substituting type %v: type not bound", name)
+			return nil, errors.Errorf("substituting type %v: type not bound", name)
 		}
 		return New(repl), nil
 	case Container:
@@ -374,7 +376,7 @@ func substitute(t FullType, m map[string]reflect.Type) (FullType, error) {
 		if IsList(t.Type()) {
 			return New(reflect.SliceOf(comp[0].Type()), comp...), nil
 		}
-		return nil, fmt.Errorf("unexpected aggregate %v, only slices allowed", t)
+		return nil, errors.Errorf("unexpected aggregate %v, only slices allowed", t)
 	case Composite:
 		comp, err := substituteList(t.Components(), m)
 		if err != nil {
diff --git a/sdks/go/pkg/beam/core/util/dot/dot.go b/sdks/go/pkg/beam/core/util/dot/dot.go
index e92e501..d4d09a2 100644
--- a/sdks/go/pkg/beam/core/util/dot/dot.go
+++ b/sdks/go/pkg/beam/core/util/dot/dot.go
@@ -22,6 +22,7 @@ import (
 	"text/template"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var (
@@ -114,14 +115,14 @@ func Render(edges []*graph.MultiEdge, nodes []*graph.Node, w io.Writer) error {
 		for _, ib := range edge.Input {
 			err := edgeTmpl.Execute(w, struct{ From, To string }{ib.From.String(), e})
 			if err != nil {
-				return fmt.Errorf("render DOT failed: %v", err)
+				return errors.Wrap(err, "render DOT failed")
 			}
 		}
 		for _, ob := range edge.Output {
 			uniqNodes[ob.To].From = ob
 			err := edgeTmpl.Execute(w, struct{ From, To string }{e, ob.To.String()})
 			if err != nil {
-				return fmt.Errorf("render DOT failed: %v", err)
+				return errors.Wrap(err, "render DOT failed")
 			}
 		}
 	}
diff --git a/sdks/go/pkg/beam/core/util/hooks/hooks.go b/sdks/go/pkg/beam/core/util/hooks/hooks.go
index db31792..9c873aa 100644
--- a/sdks/go/pkg/beam/core/util/hooks/hooks.go
+++ b/sdks/go/pkg/beam/core/util/hooks/hooks.go
@@ -31,10 +31,10 @@ import (
 	"context"
 	"encoding/csv"
 	"encoding/json"
-	"fmt"
 	"strings"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 )
@@ -126,7 +126,7 @@ func SerializeHooksToOptions() {
 	data, err := json.Marshal(enabledHooks)
 	if err != nil {
 		// Shouldn't happen, since all the data is strings.
-		panic(fmt.Sprintf("Couldn't serialize hooks: %v", err))
+		panic(errors.Wrap(err, "Couldn't serialize hooks"))
 	}
 	runtime.GlobalOptions.Set("hooks", string(data))
 }
@@ -141,7 +141,7 @@ func DeserializeHooksFromOptions(ctx context.Context) {
 	}
 	if err := json.Unmarshal([]byte(cfg), &enabledHooks); err != nil {
 		// Shouldn't happen, since all the data is strings.
-		panic(fmt.Sprintf("DeserializeHooks failed on input %q: %v", cfg, err))
+		panic(errors.Wrapf(err, "DeserializeHooks failed on input %q", cfg))
 	}
 
 	for h, opts := range enabledHooks {
@@ -155,7 +155,7 @@ func DeserializeHooksFromOptions(ctx context.Context) {
 // if a hook wants to compose behavior.
 func EnableHook(name string, args ...string) error {
 	if _, ok := hookRegistry[name]; !ok {
-		return fmt.Errorf("EnableHook: hook %s not found", name)
+		return errors.Errorf("EnableHook: hook %s not found", name)
 	}
 	enabledHooks[name] = args
 	return nil
@@ -176,7 +176,7 @@ func Encode(name string, opts []string) string {
 	w := csv.NewWriter(&cfg)
 	// This should never happen since a bytes.Buffer doesn't fail to write.
 	if err := w.Write(append([]string{name}, opts...)); err != nil {
-		panic(fmt.Sprintf("error encoding arguments: %v", err))
+		panic(errors.Wrap(err, "error encoding arguments"))
 	}
 	w.Flush()
 	return cfg.String()
@@ -189,7 +189,7 @@ func Decode(in string) (string, []string) {
 	r := csv.NewReader(strings.NewReader(in))
 	s, err := r.Read()
 	if err != nil {
-		panic(fmt.Sprintf("malformed input for decoding: %s %v", in, err))
+		panic(errors.Wrapf(err, "malformed input for decoding: %s", in))
 	}
 	return s[0], s[1:]
 }
diff --git a/sdks/go/pkg/beam/core/util/ioutilx/read.go b/sdks/go/pkg/beam/core/util/ioutilx/read.go
index 106eff0..cf3568c 100644
--- a/sdks/go/pkg/beam/core/util/ioutilx/read.go
+++ b/sdks/go/pkg/beam/core/util/ioutilx/read.go
@@ -17,9 +17,10 @@
 package ioutilx
 
 import (
-	"errors"
 	"io"
 	"unsafe"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // ReadN reads exactly N bytes from the reader. Fails otherwise.
diff --git a/sdks/go/pkg/beam/core/util/protox/any.go b/sdks/go/pkg/beam/core/util/protox/any.go
index a4ed3c1..0568f81 100644
--- a/sdks/go/pkg/beam/core/util/protox/any.go
+++ b/sdks/go/pkg/beam/core/util/protox/any.go
@@ -16,8 +16,7 @@
 package protox
 
 import (
-	"fmt"
-
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/golang/protobuf/proto"
 	protobuf "github.com/golang/protobuf/ptypes/any"
 	protobufw "github.com/golang/protobuf/ptypes/wrappers"
@@ -30,7 +29,7 @@ const (
 // Unpack decodes a proto.
 func Unpack(data *protobuf.Any, url string, ret proto.Message) error {
 	if data.TypeUrl != url {
-		return fmt.Errorf("Bad type: %v, want %v", data.TypeUrl, url)
+		return errors.Errorf("bad type: %v, want %v", data.TypeUrl, url)
 	}
 	return proto.Unmarshal(data.Value, ret)
 }
@@ -75,12 +74,12 @@ func PackBase64Proto(in proto.Message) (*protobuf.Any, error) {
 // UnpackBytes removes the BytesValue wrapper.
 func UnpackBytes(data *protobuf.Any) ([]byte, error) {
 	if data.TypeUrl != bytesValueTypeURL {
-		return nil, fmt.Errorf("Bad type: %v, want %v", data.TypeUrl, bytesValueTypeURL)
+		return nil, errors.Errorf("bad type: %v, want %v", data.TypeUrl, bytesValueTypeURL)
 	}
 
 	var buf protobufw.BytesValue
 	if err := proto.Unmarshal(data.Value, &buf); err != nil {
-		return nil, fmt.Errorf("BytesValue unmarshal failed: %v", err)
+		return nil, errors.Wrap(err, "BytesValue unmarshal failed")
 	}
 	return buf.Value, nil
 }
diff --git a/sdks/go/pkg/beam/core/util/protox/base64.go b/sdks/go/pkg/beam/core/util/protox/base64.go
index 90fb13f..296d397 100644
--- a/sdks/go/pkg/beam/core/util/protox/base64.go
+++ b/sdks/go/pkg/beam/core/util/protox/base64.go
@@ -17,8 +17,8 @@ package protox
 
 import (
 	"encoding/base64"
-	"fmt"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/golang/protobuf/proto"
 )
 
@@ -44,7 +44,7 @@ func EncodeBase64(msg proto.Message) (string, error) {
 func DecodeBase64(data string, ret proto.Message) error {
 	decoded, err := base64.StdEncoding.DecodeString(data)
 	if err != nil {
-		return fmt.Errorf("base64 decoding failed: %v", err)
+		return errors.Wrap(err, "base64 decoding failed")
 	}
 	return proto.Unmarshal(decoded, ret)
 }
diff --git a/sdks/go/pkg/beam/core/util/reflectx/call.go b/sdks/go/pkg/beam/core/util/reflectx/call.go
index f69ffda..44f0602 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/call.go
+++ b/sdks/go/pkg/beam/core/util/reflectx/call.go
@@ -19,7 +19,7 @@ import (
 	"reflect"
 	"sync"
 
-	"fmt"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"runtime/debug"
 )
 
@@ -90,7 +90,7 @@ func (c *reflectFunc) Call(args []interface{}) []interface{} {
 func CallNoPanic(fn Func, args []interface{}) (ret []interface{}, err error) {
 	defer func() {
 		if r := recover(); r != nil {
-			err = fmt.Errorf("panic: %v %s", r, debug.Stack())
+			err = errors.Errorf("panic: %v %s", r, debug.Stack())
 		}
 	}()
 	return fn.Call(args), nil
diff --git a/sdks/go/pkg/beam/core/util/reflectx/json.go b/sdks/go/pkg/beam/core/util/reflectx/json.go
index 530b9f8..2ca6fe7 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/json.go
+++ b/sdks/go/pkg/beam/core/util/reflectx/json.go
@@ -17,8 +17,9 @@ package reflectx
 
 import (
 	"encoding/json"
-	"fmt"
 	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // UnmarshalJSON decodes a json string as then given type. It is suitable
@@ -26,7 +27,7 @@ import (
 func UnmarshalJSON(t reflect.Type, str string) (interface{}, error) {
 	data := reflect.New(t).Interface()
 	if err := json.Unmarshal([]byte(str), data); err != nil {
-		return nil, fmt.Errorf("failed to decode data: %v", err)
+		return nil, errors.Wrap(err, "failed to decode data")
 	}
 	return reflect.ValueOf(data).Elem().Interface(), nil
 }
diff --git a/sdks/go/pkg/beam/core/util/symtab/symtab.go b/sdks/go/pkg/beam/core/util/symtab/symtab.go
index 408af22..ded72a7 100644
--- a/sdks/go/pkg/beam/core/util/symtab/symtab.go
+++ b/sdks/go/pkg/beam/core/util/symtab/symtab.go
@@ -21,8 +21,9 @@ import (
 	"debug/elf"
 	"debug/macho"
 	"debug/pe"
-	"fmt"
 	"os"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // SymbolTable allows for mapping between symbols and their addresses.
@@ -48,7 +49,7 @@ func New(filename string) (*SymbolTable, error) {
 		d, err := ef.DWARF()
 		if err != nil {
 			f.Close()
-			return nil, fmt.Errorf("No working DWARF: %v", err)
+			return nil, errors.Wrap(err, "No working DWARF")
 		}
 		return &SymbolTable{d}, nil
 	}
@@ -59,7 +60,7 @@ func New(filename string) (*SymbolTable, error) {
 		d, err := mf.DWARF()
 		if err != nil {
 			f.Close()
-			return nil, fmt.Errorf("No working DWARF: %v", err)
+			return nil, errors.Wrap(err, "No working DWARF")
 		}
 		return &SymbolTable{d}, nil
 	}
@@ -70,14 +71,14 @@ func New(filename string) (*SymbolTable, error) {
 		d, err := pf.DWARF()
 		if err != nil {
 			f.Close()
-			return nil, fmt.Errorf("No working DWARF: %v", err)
+			return nil, errors.Wrap(err, "No working DWARF")
 		}
 		return &SymbolTable{d}, nil
 	}
 
 	// Give up, we don't recognize it
 	f.Close()
-	return nil, fmt.Errorf("Unknown file format")
+	return nil, errors.New("Unknown file format")
 }
 
 // Addr2Sym returns the symbol name for the provided address.
@@ -100,7 +101,7 @@ func (s *SymbolTable) Addr2Sym(addr uintptr) (string, error) {
 			}
 		}
 	}
-	return "", fmt.Errorf("no symbol found at address %x", addr)
+	return "", errors.Errorf("no symbol found at address %x", addr)
 }
 
 // Sym2Addr returns the address of the provided symbol name.
@@ -124,5 +125,5 @@ func (s *SymbolTable) Sym2Addr(symbol string) (uintptr, error) {
 			}
 		}
 	}
-	return 0, fmt.Errorf("no symbol %q", symbol)
+	return 0, errors.Errorf("no symbol %q", symbol)
 }


Mime
View raw message