beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: [BEAM-7154] Updating Go SDK errors (Part 2)
Date Thu, 02 May 2019 22:42:59 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6edf005  [BEAM-7154] Updating Go SDK errors (Part 2)
     new 75eee78  Merge pull request #8462 from youngoli/beam7154
6edf005 is described below

commit 6edf0058a1019fb2c42079c1d7a10e89089d718c
Author: Daniel Oliveira <daniel.o.programmer@gmail.com>
AuthorDate: Tue Apr 30 16:19:05 2019 -0700

    [BEAM-7154] Updating Go SDK errors (Part 2)
    
    This commit changes errors from using standard Go error functionality
    to using the internal Beam "errors" package. it covers the
    subdirectories io, options, provision, and runners.
---
 sdks/go/pkg/beam/io/bigqueryio/bigquery.go         | 13 ++++++------
 sdks/go/pkg/beam/io/databaseio/database.go         | 23 +++++++++++-----------
 sdks/go/pkg/beam/io/databaseio/mapper.go           |  5 +++--
 sdks/go/pkg/beam/io/databaseio/util.go             |  5 +++--
 sdks/go/pkg/beam/io/databaseio/writer.go           |  8 +++++---
 sdks/go/pkg/beam/io/datastoreio/datastore.go       |  4 ++--
 sdks/go/pkg/beam/io/filesystem/filesystem.go       |  4 +++-
 sdks/go/pkg/beam/io/filesystem/gcs/gcs.go          |  3 ++-
 sdks/go/pkg/beam/options/jobopts/options.go        |  3 ++-
 sdks/go/pkg/beam/provision/provision.go            |  6 +++---
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      | 12 +++++------
 .../pkg/beam/runners/dataflow/dataflowlib/job.go   |  8 ++++----
 .../pkg/beam/runners/dataflow/dataflowlib/stage.go |  6 +++---
 .../beam/runners/dataflow/dataflowlib/translate.go | 23 +++++++++++-----------
 sdks/go/pkg/beam/runners/direct/direct.go          |  8 ++++----
 sdks/go/pkg/beam/runners/direct/gbk.go             |  5 +++--
 sdks/go/pkg/beam/runners/dot/dot.go                |  2 +-
 sdks/go/pkg/beam/runners/session/session.go        | 15 +++++++-------
 .../beam/runners/universal/runnerlib/compile.go    |  5 +++--
 .../beam/runners/universal/runnerlib/execute.go    |  4 ++--
 .../go/pkg/beam/runners/universal/runnerlib/job.go | 13 ++++++------
 .../pkg/beam/runners/universal/runnerlib/stage.go  |  8 ++++----
 sdks/go/pkg/beam/runners/universal/universal.go    |  3 ++-
 sdks/go/pkg/beam/util/syscallx/syscall.go          |  2 +-
 24 files changed, 102 insertions(+), 86 deletions(-)

diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
index 626df12..52cee62 100644
--- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
+++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go
@@ -28,6 +28,7 @@ import (
 	"cloud.google.com/go/bigquery"
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	bq "google.golang.org/api/bigquery/v2"
 	"google.golang.org/api/googleapi"
 	"google.golang.org/api/iterator"
@@ -67,14 +68,14 @@ func NewQualifiedTableName(s string) (QualifiedTableName, error) {
 	c := strings.LastIndex(s, ":")
 	d := strings.LastIndex(s, ".")
 	if c == -1 || d == -1 || d < c {
-		return QualifiedTableName{}, fmt.Errorf("table name missing components: %v", s)
+		return QualifiedTableName{}, errors.Errorf("table name missing components: %v", s)
 	}
 
 	project := s[:c]
 	dataset := s[c+1 : d]
 	table := s[d+1:]
 	if strings.TrimSpace(project) == "" || strings.TrimSpace(dataset) == "" || strings.TrimSpace(table)
== "" {
-		return QualifiedTableName{}, fmt.Errorf("table name has empty components: %v", s)
+		return QualifiedTableName{}, errors.Errorf("table name has empty components: %v", s)
 	}
 	return QualifiedTableName{Project: project, Dataset: dataset, Table: table}, nil
 }
@@ -150,7 +151,7 @@ func mustInferSchema(t reflect.Type) bigquery.Schema {
 	}
 	schema, err := bigquery.InferSchema(reflect.Zero(t).Interface())
 	if err != nil {
-		panic(fmt.Sprintf("invalid schema type: %v", err))
+		panic(errors.Wrapf(err, "invalid schema type: %v", t))
 	}
 	return schema
 }
@@ -251,12 +252,12 @@ func (f *writeFn) ProcessElement(ctx context.Context, _ int, iter func(*beam.X)
 	for iter(&val) {
 		current, err := getInsertSize(val.(interface{}), schema)
 		if err != nil {
-			return fmt.Errorf("biquery write error: %v", err)
+			return errors.Wrapf(err, "bigquery write error")
 		}
 		if len(data)+1 > writeRowLimit || size+current > writeSizeLimit {
 			// Write rows in batches to comply with BQ limits.
 			if err := put(ctx, table, f.Type.T, data); err != nil {
-				return fmt.Errorf("bigquery write error [len=%d, size=%d]: %v", len(data), size, err)
+				return errors.Wrapf(err, "bigquery write error [len=%d, size=%d]", len(data), size)
 			}
 			data = nil
 			size = writeOverheadBytes
@@ -269,7 +270,7 @@ func (f *writeFn) ProcessElement(ctx context.Context, _ int, iter func(*beam.X)
 		return nil
 	}
 	if err := put(ctx, table, f.Type.T, data); err != nil {
-		return fmt.Errorf("bigquery write error [len=%d, size=%d]: %v", len(data), size, err)
+		return errors.Wrapf(err, "bigquery write error [len=%d, size=%d]", len(data), size)
 	}
 	return nil
 }
diff --git a/sdks/go/pkg/beam/io/databaseio/database.go b/sdks/go/pkg/beam/io/databaseio/database.go
index e9de9cb..2b41671 100644
--- a/sdks/go/pkg/beam/io/databaseio/database.go
+++ b/sdks/go/pkg/beam/io/databaseio/database.go
@@ -22,6 +22,7 @@ import (
 	"database/sql"
 	"fmt"
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	"reflect"
 	"strings"
@@ -70,17 +71,17 @@ func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit func(beam.X
 	//TODO move DB Open and Close to Setup and Teardown methods or StartBundle and FinishBundle
 	db, err := sql.Open(f.Driver, f.Dsn)
 	if err != nil {
-		return fmt.Errorf("failed to open database: %v, %v", f.Driver, err)
+		return errors.Wrapf(err, "failed to open database: %v", f.Driver)
 	}
 	defer db.Close()
 	statement, err := db.PrepareContext(ctx, f.Query)
 	if err != nil {
-		return fmt.Errorf("failed to prepare query: %v, %v", f.Query, err)
+		return errors.Wrapf(err, "failed to prepare query: %v", f.Query)
 	}
 	defer statement.Close()
 	rows, err := statement.QueryContext(ctx)
 	if err != nil {
-		return fmt.Errorf("failed to run query: %v, %v", f.Query, err)
+		return errors.Wrapf(err, "failed to run query: %v", f.Query)
 	}
 	defer rows.Close()
 	var mapper rowMapper
@@ -95,7 +96,7 @@ func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit func(beam.X
 			}
 			columnsTypes, _ := rows.ColumnTypes()
 			if mapper, err = newQueryMapper(columns, columnsTypes, f.Type.T); err != nil {
-				return fmt.Errorf("failed to create rowValues mapper: %v", err)
+				return errors.WithContext(err, "creating rowValues mapper")
 			}
 		}
 		rowValues, err := mapper(reflectRow)
@@ -104,7 +105,7 @@ func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit func(beam.X
 		}
 		err = rows.Scan(rowValues...)
 		if err != nil {
-			return fmt.Errorf("failed to scan %v, %v", f.Query, err)
+			return errors.Wrapf(err, "failed to scan %v", f.Query)
 		}
 		if loader, ok := row.(MapLoader); ok {
 			asDereferenceSlice(rowValues)
@@ -155,7 +156,7 @@ func (f *writeFn) ProcessElement(ctx context.Context, _ int, iter func(*beam.X)
 	//TODO move DB Open and Close to Setup and Teardown methods or StartBundle and FinishBundle
 	db, err := sql.Open(f.Driver, f.Dsn)
 	if err != nil {
-		return fmt.Errorf("failed to open database: %v, %v", f.Driver, err)
+		return errors.Wrapf(err, "failed to open database: %v", f.Driver)
 	}
 	defer db.Close()
 	projection := "*"
@@ -165,21 +166,21 @@ func (f *writeFn) ProcessElement(ctx context.Context, _ int, iter func(*beam.X)
 	dql := fmt.Sprintf("SELECT %v FROM  %v WHERE 1 = 0", projection, f.Table)
 	query, err := db.Prepare(dql)
 	if err != nil {
-		return fmt.Errorf("failed to prepare query: %v, %v", f.Table, err)
+		return errors.Wrapf(err, "failed to prepare query: %v", f.Table)
 	}
 	defer query.Close()
 	rows, err := query.Query()
 	if err != nil {
-		return fmt.Errorf("failed to query: %v, %v", f.Table, err)
+		return errors.Wrapf(err, "failed to query: %v", f.Table)
 	}
 	columns, err := rows.Columns()
 	if err != nil {
-		return fmt.Errorf("failed to discover column: %v, %v", f.Table, err)
+		return errors.Wrapf(err, "failed to discover column: %v", f.Table)
 	}
 	//TODO move to Setup methods
 	mapper, err := newWriterRowMapper(columns, f.Type.T)
 	if err != nil {
-		return fmt.Errorf("failed to create row mapper: %v", err)
+		return errors.WithContext(err, "creating row mapper")
 	}
 	writer, err := newWriter(f.BatchSize, f.Table, columns)
 	if err != nil {
@@ -200,7 +201,7 @@ func (f *writeFn) ProcessElement(ctx context.Context, _ int, iter func(*beam.X)
 			row, err = mapper(reflect.ValueOf(val))
 		}
 		if err != nil {
-			return fmt.Errorf("failed to map row %T: %v", val, err)
+			return errors.Wrapf(err, "failed to map row %T", val)
 		}
 		if err = writer.add(row); err != nil {
 			return err
diff --git a/sdks/go/pkg/beam/io/databaseio/mapper.go b/sdks/go/pkg/beam/io/databaseio/mapper.go
index 7c55f80..56a98a5 100644
--- a/sdks/go/pkg/beam/io/databaseio/mapper.go
+++ b/sdks/go/pkg/beam/io/databaseio/mapper.go
@@ -19,10 +19,11 @@ package databaseio
 
 import (
 	"database/sql"
-	"fmt"
 	"reflect"
 	"strings"
 	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 //rowMapper represents a record mapper
@@ -36,7 +37,7 @@ func newQueryMapper(columns []string, columnTypes []*sql.ColumnType, recordType
 	} else if recordType.Kind() == reflect.Struct {
 		return newQueryStructMapper(columns, recordType)
 	}
-	return nil, fmt.Errorf("unsupported type %s", recordType)
+	return nil, errors.Errorf("unsupported type %s", recordType)
 }
 
 //newQueryStructMapper creates a new record mapper for supplied struct type
diff --git a/sdks/go/pkg/beam/io/databaseio/util.go b/sdks/go/pkg/beam/io/databaseio/util.go
index 88888ba..04324fa 100644
--- a/sdks/go/pkg/beam/io/databaseio/util.go
+++ b/sdks/go/pkg/beam/io/databaseio/util.go
@@ -18,9 +18,10 @@
 package databaseio
 
 import (
-	"fmt"
 	"reflect"
 	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 //mapFields maps column into field index in record type
@@ -48,7 +49,7 @@ func mapFields(columns []string, recordType reflect.Type) ([]int, error)
{
 			fieldIndex, ok = indexedFields[strings.Replace(strings.ToLower(column), "_", "", strings.Count(column,
"_"))]
 		}
 		if !ok {
-			return nil, fmt.Errorf("failed to matched a %v field for SQL column: %v", recordType,
column)
+			return nil, errors.Errorf("failed to matched a %v field for SQL column: %v", recordType,
column)
 		}
 		mappedFieldIndex[i] = fieldIndex
 	}
diff --git a/sdks/go/pkg/beam/io/databaseio/writer.go b/sdks/go/pkg/beam/io/databaseio/writer.go
index 7b5a638..67a1b08 100644
--- a/sdks/go/pkg/beam/io/databaseio/writer.go
+++ b/sdks/go/pkg/beam/io/databaseio/writer.go
@@ -22,6 +22,8 @@ import (
 	"fmt"
 	"golang.org/x/net/context"
 	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // Writer returns a row of data to be inserted into a table.
@@ -44,7 +46,7 @@ func (w *writer) add(row []interface{}) error {
 	w.rowCount++
 	w.totalCount++
 	if len(row) != w.columnCount {
-		return fmt.Errorf("expected %v row values, but had: %v", w.columnCount, len(row))
+		return errors.Errorf("expected %v row values, but had: %v", w.columnCount, len(row))
 	}
 	w.binding = append(w.binding, row...)
 	return nil
@@ -59,7 +61,7 @@ func (w *writer) write(ctx context.Context, db *sql.DB) error {
 	}
 	affected, _ := resultSet.RowsAffected()
 	if int(affected) != w.rowCount {
-		return fmt.Errorf("expected to write: %v, but written: %v", w.rowCount, affected)
+		return errors.Errorf("expected to write: %v, but written: %v", w.rowCount, affected)
 	}
 	w.binding = []interface{}{}
 	w.rowCount = 0
@@ -82,7 +84,7 @@ func (w *writer) writeIfNeeded(ctx context.Context, db *sql.DB) error {
 
 func newWriter(batchSize int, table string, columns []string) (*writer, error) {
 	if len(columns) == 0 {
-		return nil, fmt.Errorf("columns were empty")
+		return nil, errors.New("columns were empty")
 	}
 	values := strings.Repeat("?,", len(columns))
 	return &writer{
diff --git a/sdks/go/pkg/beam/io/datastoreio/datastore.go b/sdks/go/pkg/beam/io/datastoreio/datastore.go
index 7d3b4bc..d80395c 100644
--- a/sdks/go/pkg/beam/io/datastoreio/datastore.go
+++ b/sdks/go/pkg/beam/io/datastoreio/datastore.go
@@ -20,7 +20,6 @@ package datastoreio
 import (
 	"context"
 	"encoding/json"
-	"fmt"
 	"math"
 	"reflect"
 	"sort"
@@ -30,6 +29,7 @@ import (
 	"cloud.google.com/go/datastore"
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	"google.golang.org/api/iterator"
 )
@@ -209,7 +209,7 @@ func (f *queryFn) ProcessElement(ctx context.Context, _ string, v func(*string)
 	// lookup type
 	t, ok := runtime.LookupType(f.Type)
 	if !ok {
-		return fmt.Errorf("No type registered %s", f.Type)
+		return errors.Errorf("No type registered %s", f.Type)
 	}
 
 	// Translate BoundedQuery to datastore.Query
diff --git a/sdks/go/pkg/beam/io/filesystem/filesystem.go b/sdks/go/pkg/beam/io/filesystem/filesystem.go
index dd4833f..018c928 100644
--- a/sdks/go/pkg/beam/io/filesystem/filesystem.go
+++ b/sdks/go/pkg/beam/io/filesystem/filesystem.go
@@ -22,6 +22,8 @@ import (
 	"fmt"
 	"io"
 	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 var registry = make(map[string]func(context.Context) Interface)
@@ -41,7 +43,7 @@ func New(ctx context.Context, path string) (Interface, error) {
 	scheme := getScheme(path)
 	mkfs, ok := registry[scheme]
 	if !ok {
-		return nil, fmt.Errorf("file system scheme %v not registered for %v", scheme, path)
+		return nil, errors.Errorf("file system scheme %v not registered for %v", scheme, path)
 	}
 	return mkfs(ctx), nil
 }
diff --git a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
index 356c718..f167532 100644
--- a/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
+++ b/sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
@@ -25,6 +25,7 @@ import (
 	"strings"
 
 	"cloud.google.com/go/storage"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
@@ -50,7 +51,7 @@ func New(ctx context.Context) filesystem.Interface {
 
 		client, err = storage.NewClient(ctx, option.WithoutAuthentication())
 		if err != nil {
-			panic(fmt.Sprintf("failed to create GCS client: %v", err))
+			panic(errors.Wrapf(err, "failed to create GCS client"))
 		}
 	}
 	return &fs{client: client}
diff --git a/sdks/go/pkg/beam/options/jobopts/options.go b/sdks/go/pkg/beam/options/jobopts/options.go
index b71d7b0..86beb94 100644
--- a/sdks/go/pkg/beam/options/jobopts/options.go
+++ b/sdks/go/pkg/beam/options/jobopts/options.go
@@ -27,6 +27,7 @@ import (
 
 	"sync/atomic"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -66,7 +67,7 @@ var (
 // such as Dataflow set a reasonable default. Convenience function.
 func GetEndpoint() (string, error) {
 	if *Endpoint == "" {
-		return "", fmt.Errorf("no job service endpoint specified. Use --endpoint=<endpoint>")
+		return "", errors.New("no job service endpoint specified. Use --endpoint=<endpoint>")
 	}
 	return *Endpoint, nil
 }
diff --git a/sdks/go/pkg/beam/provision/provision.go b/sdks/go/pkg/beam/provision/provision.go
index 7349ef2..9b44d2f 100644
--- a/sdks/go/pkg/beam/provision/provision.go
+++ b/sdks/go/pkg/beam/provision/provision.go
@@ -20,10 +20,10 @@ package provision
 import (
 	"context"
 	"encoding/json"
-	"fmt"
 
 	"time"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
 	"github.com/golang/protobuf/jsonpb"
@@ -42,10 +42,10 @@ func Info(ctx context.Context, endpoint string) (*pb.ProvisionInfo, error)
{
 
 	resp, err := client.GetProvisionInfo(ctx, &pb.GetProvisionInfoRequest{})
 	if err != nil {
-		return nil, fmt.Errorf("failed to get manifest: %v", err)
+		return nil, errors.Wrap(err, "failed to get manifest")
 	}
 	if resp.GetInfo() == nil {
-		return nil, fmt.Errorf("empty manifest")
+		return nil, errors.New("empty manifest")
 	}
 	return resp.GetInfo(), nil
 }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 14389a7..2600188 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -20,7 +20,6 @@ package dataflow
 import (
 	"context"
 	"encoding/json"
-	"errors"
 	"flag"
 	"fmt"
 	"io"
@@ -32,6 +31,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
@@ -95,7 +95,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	var jobLabels map[string]string
 	if *labels != "" {
 		if err := json.Unmarshal([]byte(*labels), &jobLabels); err != nil {
-			return fmt.Errorf("error reading --label flag as JSON: %v", err)
+			return errors.Wrapf(err, "error reading --label flag as JSON")
 		}
 	}
 
@@ -155,7 +155,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	}
 	model, err := graphx.Marshal(edges, &graphx.Options{Environment: createEnvironment(ctx)})
 	if err != nil {
-		return fmt.Errorf("failed to generate model pipeline: %v", err)
+		return errors.WithContext(err, "generating model pipeline")
 	}
 
 	// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
@@ -189,7 +189,7 @@ func gcsRecorderHook(opts []string) perf.CaptureHook {
 	return func(ctx context.Context, spec string, r io.Reader) error {
 		client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
 		if err != nil {
-			return fmt.Errorf("couldn't establish GCS client: %v", err)
+			return errors.WithContext(err, "establishing GCS client")
 		}
 		return gcsx.WriteObject(ctx, client, bucket, path.Join(prefix, spec), r)
 	}
@@ -216,8 +216,8 @@ func createEnvironment(ctx context.Context) pb.Environment {
 		payload := &pb.DockerPayload{ContainerImage: config}
 		serializedPayload, err := proto.Marshal(payload)
 		if err != nil {
-			panic(fmt.Sprintf(
-				"Failed to serialize Environment payload %v for config %v: %v", payload, config, err))
+			panic(errors.Wrapf(err,
+				"Failed to serialize Environment payload %v for config %v", payload, config))
 		}
 		environment = pb.Environment{
 			Urn:     urn,
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index dbd2e5d..3979d6e 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -17,7 +17,6 @@ package dataflowlib
 
 import (
 	"context"
-	"fmt"
 	"strings"
 	"time"
 
@@ -25,6 +24,7 @@ import (
 	// Importing to get the side effect of the remote execution hook. See init().
 	_ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"golang.org/x/oauth2/google"
@@ -85,7 +85,7 @@ func Translate(p *pb.Pipeline, opts *JobOptions, workerURL, jarURL, modelURL
str
 
 	images := pipelinex.ContainerImages(p)
 	if len(images) != 1 {
-		return nil, fmt.Errorf("Dataflow supports one container image only: %v", images)
+		return nil, errors.Errorf("Dataflow supports one container image only: %v", images)
 	}
 
 	packages := []*df.Package{{
@@ -177,7 +177,7 @@ func WaitForCompletion(ctx context.Context, client *df.Service, project,
region,
 	for {
 		j, err := client.Projects.Locations.Jobs.Get(project, region, jobID).Do()
 		if err != nil {
-			return fmt.Errorf("failed to get job: %v", err)
+			return errors.Wrap(err, "failed to get job")
 		}
 
 		switch j.CurrentState {
@@ -190,7 +190,7 @@ func WaitForCompletion(ctx context.Context, client *df.Service, project,
region,
 			return nil
 
 		case "JOB_STATE_FAILED":
-			return fmt.Errorf("job %s failed", jobID)
+			return errors.Errorf("job %s failed", jobID)
 
 		case "JOB_STATE_RUNNING":
 			log.Info(ctx, "Job still running ...")
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
index 0b76859..67a2bde 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
@@ -18,11 +18,11 @@ package dataflowlib
 import (
 	"bytes"
 	"context"
-	"fmt"
 	"io"
 	"os"
 
 	"cloud.google.com/go/storage"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
 )
 
@@ -35,7 +35,7 @@ func StageModel(ctx context.Context, project, modelURL string, model []byte)
err
 func StageFile(ctx context.Context, project, url, filename string) error {
 	fd, err := os.Open(filename)
 	if err != nil {
-		return fmt.Errorf("failed to open file %s: %v", filename, err)
+		return errors.Wrapf(err, "failed to open file %s", filename)
 	}
 	defer fd.Close()
 
@@ -45,7 +45,7 @@ func StageFile(ctx context.Context, project, url, filename string) error
{
 func upload(ctx context.Context, project, object string, r io.Reader) error {
 	bucket, obj, err := gcsx.ParseObject(object)
 	if err != nil {
-		return fmt.Errorf("invalid staging location %v: %v", object, err)
+		return errors.Wrapf(err, "invalid staging location %v", object)
 	}
 	client, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
 	if err != nil {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
index ca65932..047d818 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
@@ -31,6 +31,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/stringx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pubsub_v1 "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio/v1"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/golang/protobuf/proto"
@@ -129,7 +130,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step,
er
 	case graphx.URNParDo:
 		var payload pb.ParDoPayload
 		if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil {
-			return nil, fmt.Errorf("invalid ParDo payload for %v: %v", t, err)
+			return nil, errors.Wrapf(err, "invalid ParDo payload for %v", t)
 		}
 
 		var steps []*df.Step
@@ -174,24 +175,24 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step,
er
 		// Combine ParDo with the CombineValues kind, set its SerializedFn to map to the
 		// composite payload, and the accumulator coding.
 		if len(t.Subtransforms) != 2 {
-			return nil, fmt.Errorf("invalid CombinePerKey, expected 2 subtransforms but got %d in
%v", len(t.Subtransforms), t)
+			return nil, errors.Errorf("invalid CombinePerKey, expected 2 subtransforms but got %d
in %v", len(t.Subtransforms), t)
 		}
 		steps, err := x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)),
t.Subtransforms)
 		if err != nil {
-			return nil, fmt.Errorf("invalid CombinePerKey, couldn't extract GBK from %v: %v", t, err)
+			return nil, errors.Wrapf(err, "invalid CombinePerKey, couldn't extract GBK from %v", t)
 		}
 		var payload pb.CombinePayload
 		if err := proto.Unmarshal(t.Spec.Payload, &payload); err != nil {
-			return nil, fmt.Errorf("invalid Combine payload for %v: %v", t, err)
+			return nil, errors.Wrapf(err, "invalid Combine payload for %v", t)
 		}
 
 		c, err := x.coders.Coder(payload.AccumulatorCoderId)
 		if err != nil {
-			return nil, fmt.Errorf("invalid Combine payload , missing Accumulator Coder %v: %v", t,
err)
+			return nil, errors.Wrapf(err, "invalid Combine payload , missing Accumulator Coder %v",
t)
 		}
 		enc, err := graphx.EncodeCoderRef(c)
 		if err != nil {
-			return nil, fmt.Errorf("invalid Combine payload, couldn't encode Accumulator Coder %v:
%v", t, err)
+			return nil, errors.Wrapf(err, "invalid Combine payload, couldn't encode Accumulator Coder
%v", t)
 		}
 		json.Unmarshal([]byte(steps[1].Properties), &prop)
 		prop.Encoding = enc
@@ -226,7 +227,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step,
er
 
 		var msg pubsub_v1.PubSubPayload
 		if err := proto.Unmarshal(t.Spec.Payload, &msg); err != nil {
-			return nil, fmt.Errorf("bad pubsub payload: %v", err)
+			return nil, errors.Wrap(err, "bad pubsub payload")
 		}
 
 		prop.Format = "pubsub"
@@ -252,7 +253,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step,
er
 			return []*df.Step{x.newStep(id, writeKind, prop)}, nil
 
 		default:
-			return nil, fmt.Errorf("bad pubsub op: %v", msg.Op)
+			return nil, errors.Errorf("bad pubsub op: %v", msg.Op)
 		}
 
 	default:
@@ -260,7 +261,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step,
er
 			return x.translateTransforms(fmt.Sprintf("%v%v/", trunk, path.Base(t.UniqueName)), t.Subtransforms)
 		}
 
-		return nil, fmt.Errorf("unexpected primitive urn: %v", t)
+		return nil, errors.Errorf("unexpected primitive urn: %v", t)
 	}
 }
 
@@ -320,11 +321,11 @@ func (x *translator) wrapCoder(pcol *pb.PCollection, c *coder.Coder)
*graphx.Cod
 	ws := x.comp.WindowingStrategies[pcol.WindowingStrategyId]
 	wc, err := x.coders.WindowCoder(ws.WindowCoderId)
 	if err != nil {
-		panic(fmt.Sprintf("failed to decode window coder %v for windowing strategy %v: %v", ws.WindowCoderId,
pcol.WindowingStrategyId, err))
+		panic(errors.Wrapf(err, "failed to decode window coder %v for windowing strategy %v", ws.WindowCoderId,
pcol.WindowingStrategyId))
 	}
 	ret, err := graphx.EncodeCoderRef(coder.NewW(c, wc))
 	if err != nil {
-		panic(fmt.Sprintf("failed to wrap coder %v for windowing strategy %v: %v", c, pcol.WindowingStrategyId,
err))
+		panic(errors.Wrapf(err, "failed to wrap coder %v for windowing strategy %v", c, pcol.WindowingStrategyId))
 	}
 	return ret
 }
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go b/sdks/go/pkg/beam/runners/direct/direct.go
index 608fe98..bd1d324 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -19,7 +19,6 @@ package direct
 
 import (
 	"context"
-	"fmt"
 	"path"
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
@@ -27,6 +26,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -47,11 +47,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
 	edges, _, err := p.Build()
 	if err != nil {
-		return fmt.Errorf("invalid pipeline: %v", err)
+		return errors.Wrap(err, "invalid pipeline")
 	}
 	plan, err := Compile(edges)
 	if err != nil {
-		return fmt.Errorf("translation failed: %v", err)
+		return errors.Wrap(err, "translation failed")
 	}
 	log.Info(ctx, plan)
 
@@ -294,7 +294,7 @@ func (b *builder) makeLink(id linkID) (exec.Node, error) {
 		u = &exec.WindowInto{UID: b.idgen.New(), Fn: edge.WindowFn, Out: out[0]}
 
 	default:
-		return nil, fmt.Errorf("unexpected edge: %v", edge)
+		return nil, errors.Errorf("unexpected edge: %v", edge)
 	}
 
 	b.links[id] = u
diff --git a/sdks/go/pkg/beam/runners/direct/gbk.go b/sdks/go/pkg/beam/runners/direct/gbk.go
index 4ce0e00..13763ec 100644
--- a/sdks/go/pkg/beam/runners/direct/gbk.go
+++ b/sdks/go/pkg/beam/runners/direct/gbk.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 type group struct {
@@ -65,10 +66,10 @@ func (n *CoGBK) ProcessElement(ctx context.Context, elm *exec.FullValue,
_ ...ex
 
 		var buf bytes.Buffer
 		if err := n.enc.Encode(&exec.FullValue{Elm: value.Elm}, &buf); err != nil {
-			return fmt.Errorf("failed to encode key %v for CoGBK: %v", elm, err)
+			return errors.WithContextf(err, "encoding key %v for CoGBK", elm)
 		}
 		if err := n.wEnc.Encode(ws, &buf); err != nil {
-			return fmt.Errorf("failed to encode window %v for CoGBK: %v", w, err)
+			return errors.WithContextf(err, "encoding window %v for CoGBK", w)
 		}
 		key := buf.String()
 
diff --git a/sdks/go/pkg/beam/runners/dot/dot.go b/sdks/go/pkg/beam/runners/dot/dot.go
index 4b68c57..547aa10 100644
--- a/sdks/go/pkg/beam/runners/dot/dot.go
+++ b/sdks/go/pkg/beam/runners/dot/dot.go
@@ -20,12 +20,12 @@ package dot
 import (
 	"bytes"
 	"context"
-	"errors"
 	"flag"
 	"io/ioutil"
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	dotlib "github.com/apache/beam/sdks/go/pkg/beam/core/util/dot"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 func init() {
diff --git a/sdks/go/pkg/beam/runners/session/session.go b/sdks/go/pkg/beam/runners/session/session.go
index aa8ae1e..5145ecf 100644
--- a/sdks/go/pkg/beam/runners/session/session.go
+++ b/sdks/go/pkg/beam/runners/session/session.go
@@ -32,6 +32,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	fnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
 	rapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
@@ -118,7 +119,7 @@ func (c *controlServer) readSession(filename string) {
 	for {
 		b, err := br.Peek(peekLen)
 		if err != nil && err != io.EOF {
-			panic(fmt.Sprintf("problem peeking length value: %v", err))
+			panic(errors.Wrap(err, "Problem peeking length value"))
 		}
 		if err == io.EOF {
 			break
@@ -130,18 +131,18 @@ func (c *controlServer) readSession(filename string) {
 		b, err = br.Peek(int(l))
 		var hMsg session.EntryHeader
 		if err := proto.Unmarshal(b, &hMsg); err != nil {
-			panic(fmt.Sprintf("Error decoding entry header: %v", err))
+			panic(errors.Wrap(err, "Error decoding entry header"))
 		}
 		br.Discard(int(l))
 
 		msgBytes, err := br.Peek(int(hMsg.Len))
 		if err != nil {
-			panic(fmt.Sprintf("Couldn't peek message: %v", err))
+			panic(errors.Wrap(err, "Couldn't peek message"))
 		}
 
 		var bMsg session.Entry
 		if err := proto.Unmarshal(msgBytes, &bMsg); err != nil {
-			panic(fmt.Sprintf("Error decoding message: %v", err))
+			panic(errors.Wrap(err, "Error decoding message"))
 		}
 		c.handleEntry(&bMsg)
 		br.Discard(int(hMsg.Len))
@@ -261,7 +262,7 @@ func (l *loggingServer) Logging(stream fnapi_pb.BeamFnLogging_LoggingServer)
err
 func Execute(ctx context.Context, p *beam.Pipeline) error {
 	worker, err := buildLocalBinary(ctx)
 	if err != nil {
-		return fmt.Errorf("Couldn't build worker binary: %v", err)
+		return errors.WithContext(err, "building worker binary")
 	}
 
 	log.Infof(ctx, "built worker binary at %s\n", worker)
@@ -320,7 +321,7 @@ func buildLocalBinary(ctx context.Context) (string, error) {
 		program = file
 	}
 	if program == "" {
-		return "", fmt.Errorf("could not detect user main")
+		return "", errors.New("could not detect user main")
 	}
 
 	log.Infof(ctx, "Compiling %v as %v", program, ret)
@@ -331,7 +332,7 @@ func buildLocalBinary(ctx context.Context) (string, error) {
 	cmd := exec.Command(build[0], build[1:]...)
 	if out, err := cmd.CombinedOutput(); err != nil {
 		log.Info(ctx, string(out))
-		return "", fmt.Errorf("failed to compile %v: %v", program, err)
+		return "", errors.Wrapf(err, "failed to compile %v: %v", program)
 	}
 	return ret, nil
 }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
index f447d9c..3a5709b 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
@@ -29,6 +29,7 @@ import (
 
 	"sync/atomic"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
@@ -68,7 +69,7 @@ func BuildWorkerBinary(ctx context.Context, filename string) error {
 		program = file
 	}
 	if !strings.HasSuffix(program, ".go") {
-		return fmt.Errorf("could not detect user main")
+		return errors.New("could not detect user main")
 	}
 
 	log.Infof(ctx, "Cross-compiling %v as %v", program, filename)
@@ -79,7 +80,7 @@ func BuildWorkerBinary(ctx context.Context, filename string) error {
 	cmd := exec.Command(build[0], build[1:]...)
 	cmd.Env = append(os.Environ(), "GOOS=linux", "GOARCH=amd64")
 	if out, err := cmd.CombinedOutput(); err != nil {
-		return fmt.Errorf("failed to cross-compile %v: %v\n%v", program, err, string(out))
+		return errors.Errorf("failed to cross-compile %v: %v\n%v", program, err, string(out))
 	}
 	return nil
 }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index 8af2f59..14249f3 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -17,10 +17,10 @@ package runnerlib
 
 import (
 	"context"
-	"fmt"
 	"os"
 	"time"
 
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
@@ -34,7 +34,7 @@ func Execute(ctx context.Context, p *pb.Pipeline, endpoint string, opt *JobOptio
 
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
-		return "", fmt.Errorf("failed to connect to job service: %v", err)
+		return "", errors.WithContextf(err, "connecting to job service")
 	}
 	defer cc.Close()
 	client := jobpb.NewJobServiceClient(cc)
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index c9640b2..aac8a73 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -23,6 +23,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
@@ -56,7 +57,7 @@ func Prepare(ctx context.Context, client jobpb.JobServiceClient, p *pb.Pipeline,
 
 	options, err := provision.OptionsToProto(raw)
 	if err != nil {
-		return "", "", "", fmt.Errorf("failed to produce pipeline options: %v", err)
+		return "", "", "", errors.WithContext(err, "producing pipeline options")
 	}
 	req := &jobpb.PrepareJobRequest{
 		Pipeline:        p,
@@ -65,7 +66,7 @@ func Prepare(ctx context.Context, client jobpb.JobServiceClient, p *pb.Pipeline,
 	}
 	resp, err := client.Prepare(ctx, req)
 	if err != nil {
-		return "", "", "", fmt.Errorf("failed to connect to job service: %v", err)
+		return "", "", "", errors.Wrap(err, "failed to connect to job service: %v")
 	}
 	return resp.GetPreparationId(), resp.GetArtifactStagingEndpoint().GetUrl(), resp.GetStagingSessionToken(),
nil
 }
@@ -79,7 +80,7 @@ func Submit(ctx context.Context, client jobpb.JobServiceClient, id, token
string
 
 	resp, err := client.Run(ctx, req)
 	if err != nil {
-		return "", fmt.Errorf("failed to submit job: %v", err)
+		return "", errors.Wrap(err, "failed to submit job")
 	}
 	return resp.GetJobId(), nil
 }
@@ -89,7 +90,7 @@ func Submit(ctx context.Context, client jobpb.JobServiceClient, id, token
string
 func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID string)
error {
 	stream, err := client.GetMessageStream(ctx, &jobpb.JobMessagesRequest{JobId: jobID})
 	if err != nil {
-		return fmt.Errorf("failed to get job stream: %v", err)
+		return errors.Wrap(err, "failed to get job stream")
 	}
 
 	for {
@@ -111,7 +112,7 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient,
jobID
 			case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
 				return nil
 			case jobpb.JobState_FAILED:
-				return fmt.Errorf("job %v failed", jobID)
+				return errors.Errorf("job %v failed", jobID)
 			}
 
 		case msg.GetMessageResponse() != nil:
@@ -121,7 +122,7 @@ func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient,
jobID
 			log.Output(ctx, messageSeverity(resp.GetImportance()), 1, text)
 
 		default:
-			return fmt.Errorf("unexpected job update: %v", proto.MarshalTextString(msg))
+			return errors.Errorf("unexpected job update: %v", proto.MarshalTextString(msg))
 		}
 	}
 }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
index 9516dde..acbbe70 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
@@ -17,10 +17,10 @@ package runnerlib
 
 import (
 	"context"
-	"fmt"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
 )
@@ -31,7 +31,7 @@ func Stage(ctx context.Context, id, endpoint, binary, st string, files ...artifa
 	ctx = grpcx.WriteWorkerID(ctx, id)
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
-		return "", fmt.Errorf("failed to connect to artifact service: %v", err)
+		return "", errors.WithContext(err, "connecting to artifact service")
 	}
 	defer cc.Close()
 
@@ -41,11 +41,11 @@ func Stage(ctx context.Context, id, endpoint, binary, st string, files
...artifa
 
 	md, err := artifact.MultiStage(ctx, client, 10, files, st)
 	if err != nil {
-		return "", fmt.Errorf("failed to stage artifacts: %v", err)
+		return "", errors.WithContext(err, "staging artifacts")
 	}
 	token, err := artifact.Commit(ctx, client, md, st)
 	if err != nil {
-		return "", fmt.Errorf("failed to commit artifacts: %v", err)
+		return "", errors.WithContext(err, "committing artifacts")
 	}
 	return token, nil
 }
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index 3a470a9..0e22db7 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -25,6 +25,7 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	// Importing to get the side effect of the remote execution hook. See init().
 	_ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
@@ -50,7 +51,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	}
 	pipeline, err := graphx.Marshal(edges, &graphx.Options{Environment: createEnvironment(ctx)})
 	if err != nil {
-		return fmt.Errorf("failed to generate model pipeline: %v", err)
+		return errors.WithContextf(err, "generating model pipeline")
 	}
 
 	log.Info(ctx, proto.MarshalTextString(pipeline))
diff --git a/sdks/go/pkg/beam/util/syscallx/syscall.go b/sdks/go/pkg/beam/util/syscallx/syscall.go
index 61f3a03..4d0babc 100644
--- a/sdks/go/pkg/beam/util/syscallx/syscall.go
+++ b/sdks/go/pkg/beam/util/syscallx/syscall.go
@@ -20,7 +20,7 @@
 package syscallx
 
 import (
-	"errors"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 // ErrUnsupported is the error returned for unsupported operations.


Mime
View raw message