beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: Reduce internal functions availble on beam package.
Date Wed, 13 Feb 2019 17:55:24 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9de2620  Reduce internal functions availble on beam package.
     new 869f233  Merge pull request #7832 from lostluck/surface
9de2620 is described below

commit 9de2620a7d85d2955214afb4f8b7d9ec496ffb1b
Author: Robert Burke <robert@frantil.com>
AuthorDate: Mon Feb 11 22:38:41 2019 +0000

    Reduce internal functions availble on beam package.
---
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go | 48 +++++++++++------------
 sdks/go/pkg/beam/core/runtime/graphx/user.go      | 29 ++------------
 sdks/go/pkg/beam/encoding.go                      | 47 ++++------------------
 sdks/go/pkg/beam/util.go                          |  8 ----
 4 files changed, 33 insertions(+), 99 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 6e2ac90..d893a93 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -124,11 +124,11 @@ func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error)
{
 	if err != nil {
 		return nil, fmt.Errorf("bad underlying type: %v", err)
 	}
-	enc, err := EncodeUserFn(c.Enc)
+	enc, err := encodeUserFn(c.Enc)
 	if err != nil {
 		return nil, fmt.Errorf("bad enc: %v", err)
 	}
-	dec, err := EncodeUserFn(c.Dec)
+	dec, err := encodeUserFn(c.Dec)
 	if err != nil {
 		return nil, fmt.Errorf("bad dec: %v", err)
 	}
@@ -145,22 +145,20 @@ func encodeCustomCoder(c *coder.CustomCoder) (*v1.CustomCoder, error)
{
 func decodeCustomCoder(c *v1.CustomCoder) (*coder.CustomCoder, error) {
 	t, err := decodeType(c.Type)
 	if err != nil {
-		return nil, fmt.Errorf("bad type: %v", err)
+		return nil, fmt.Errorf("decodeCustomCoder bad type: %v", err)
 	}
-	enc, err := DecodeUserFn(c.Enc)
+	enc, err := decodeUserFn(c.Enc)
 	if err != nil {
-		return nil, fmt.Errorf("bad dec: %v", err)
+		return nil, fmt.Errorf("decodeCustomCoder bad encoder: %v", err)
 	}
-	dec, err := DecodeUserFn(c.Dec)
+	dec, err := decodeUserFn(c.Dec)
 	if err != nil {
-		return nil, fmt.Errorf("bad dec: %v", err)
+		return nil, fmt.Errorf("decodeCustomCoder bad decoder: %v", err)
 	}
 
-	ret := &coder.CustomCoder{
-		Name: c.Name,
-		Type: t,
-		Enc:  enc,
-		Dec:  dec,
+	ret, err := coder.NewCustomCoder(c.Name, t, enc, dec)
+	if err != nil {
+		return nil, fmt.Errorf("decodeCustomCoder: %v", err)
 	}
 	return ret, nil
 }
@@ -207,7 +205,7 @@ func encodeFn(u *graph.Fn) (*v1.Fn, error) {
 		}}, nil
 
 	case u.Fn != nil:
-		fn, err := EncodeUserFn(u.Fn)
+		fn, err := encodeUserFn(u.Fn)
 		if err != nil {
 			return nil, fmt.Errorf("bad userfn: %v", err)
 		}
@@ -257,11 +255,15 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) {
 		})
 	}
 	if u.Fn != nil {
-		fn, err := DecodeUserFn(u.Fn)
+		fn, err := decodeUserFn(u.Fn)
+		if err != nil {
+			return nil, fmt.Errorf("bad userfn: %v", err)
+		}
+		fx, err := funcx.New(reflectx.MakeFunc(fn))
 		if err != nil {
 			return nil, fmt.Errorf("bad userfn: %v", err)
 		}
-		return &graph.Fn{Fn: fn}, nil
+		return &graph.Fn{Fn: fx}, nil
 	}
 
 	t, err := decodeType(u.Type)
@@ -275,34 +277,30 @@ func decodeFn(u *v1.Fn) (*graph.Fn, error) {
 	return graph.NewFn(fn)
 }
 
-// EncodeUserFn translates the preprocessed representation of a Beam user function
+// encodeUserFn translates the preprocessed representation of a Beam user function
 // into the wire representation, capturing all the inputs and outputs needed.
-func EncodeUserFn(u *funcx.Fn) (*v1.UserFn, error) {
+func encodeUserFn(u *funcx.Fn) (*v1.UserFn, error) {
 	// TODO(herohde) 5/23/2017: reject closures and dynamic functions. They can't
 	// be serialized.
 
 	symbol := u.Fn.Name()
 	t, err := encodeType(u.Fn.Type())
 	if err != nil {
-		return nil, fmt.Errorf("encode: bad function type: %v", err)
+		return nil, fmt.Errorf("encodeUserFn: bad function type: %v", err)
 	}
 	return &v1.UserFn{Name: symbol, Type: t}, nil
 }
 
-// DecodeUserFn receives the wire representation of a Beam user function,
+// decodeUserFn receives the wire representation of a Beam user function,
 // extracting the preprocessed representation, expanding all inputs and outputs
 // of the function.
-func DecodeUserFn(ref *v1.UserFn) (*funcx.Fn, error) {
+func decodeUserFn(ref *v1.UserFn) (interface{}, error) {
 	t, err := decodeType(ref.GetType())
 	if err != nil {
 		return nil, err
 	}
 
-	fn, err := runtime.ResolveFunction(ref.Name, t)
-	if err != nil {
-		return nil, fmt.Errorf("decode: failed to find symbol %v: %v", ref.Name, err)
-	}
-	return funcx.New(reflectx.MakeFunc(fn))
+	return runtime.ResolveFunction(ref.Name, t)
 }
 
 func encodeFullType(t typex.FullType) (*v1.FullType, error) {
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/user.go b/sdks/go/pkg/beam/core/runtime/graphx/user.go
index 969a513..73addb1 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/user.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/user.go
@@ -23,7 +23,6 @@ import (
 	"encoding/base64"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
-	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx/v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
@@ -60,7 +59,7 @@ func EncodeFn(fn reflectx.Func) (string, error) {
 	if err != nil {
 		return "", err
 	}
-	ref, err := EncodeUserFn(u)
+	ref, err := encodeUserFn(u)
 	if err != nil {
 		return "", err
 	}
@@ -74,33 +73,11 @@ func DecodeFn(data string) (reflectx.Func, error) {
 	if err := protox.DecodeBase64(data, &ref); err != nil {
 		return nil, err
 	}
-	fn, err := DecodeUserFn(&ref)
+	fn, err := decodeUserFn(&ref)
 	if err != nil {
 		return nil, err
 	}
-	return fn.Fn, nil
-}
-
-// EncodeGraphFn encodes a *graph.Fn as a string.
-func EncodeGraphFn(u *graph.Fn) (string, error) {
-	ref, err := encodeFn(u)
-	if err != nil {
-		return "", err
-	}
-	return protox.EncodeBase64(ref)
-}
-
-// DecodeGraphFn decodes an encoded *graph.Fn.
-func DecodeGraphFn(data string) (*graph.Fn, error) {
-	var ref v1.Fn
-	if err := protox.DecodeBase64(data, &ref); err != nil {
-		return nil, err
-	}
-	fn, err := decodeFn(&ref)
-	if err != nil {
-		return nil, err
-	}
-	return fn, nil
+	return reflectx.MakeFunc(fn), nil
 }
 
 // EncodeCoder encodes a coder as a string. Any custom coder function
diff --git a/sdks/go/pkg/beam/encoding.go b/sdks/go/pkg/beam/encoding.go
index f41f2fa..0ab524d 100644
--- a/sdks/go/pkg/beam/encoding.go
+++ b/sdks/go/pkg/beam/encoding.go
@@ -23,18 +23,6 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
-// EncodeType encodes a type as a string. Unless registered, the decoded type
-// is only guaranteed to the isomorphic to the input and with no methods.
-func EncodeType(t reflect.Type) (string, error) {
-	return graphx.EncodeType(t)
-}
-
-// DecodeType decodes a type. Unless registered, the decoded type
-// is only guaranteed to the isomorphic to the input and with no methods.
-func DecodeType(data string) (reflect.Type, error) {
-	return graphx.DecodeType(data)
-}
-
 // EncodedType is a serialization wrapper around a type for convenience.
 type EncodedType struct {
 	// T is the type to preserve across serialization.
@@ -43,7 +31,7 @@ type EncodedType struct {
 
 // MarshalJSON returns the JSON encoding this value.
 func (w EncodedType) MarshalJSON() ([]byte, error) {
-	str, err := EncodeType(w.T)
+	str, err := graphx.EncodeType(w.T)
 	if err != nil {
 		return nil, err
 	}
@@ -56,7 +44,7 @@ func (w *EncodedType) UnmarshalJSON(buf []byte) error {
 	if err := json.Unmarshal(buf, &s); err != nil {
 		return err
 	}
-	t, err := DecodeType(s)
+	t, err := graphx.DecodeType(s)
 	if err != nil {
 		return err
 	}
@@ -64,20 +52,6 @@ func (w *EncodedType) UnmarshalJSON(buf []byte) error {
 	return nil
 }
 
-// EncodeFunc encodes a function and parameter types as a string. The function
-// symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must
-// be encodable.
-func EncodeFunc(fn reflectx.Func) (string, error) {
-	return graphx.EncodeFn(fn)
-}
-
-// DecodeFunc encodes a function as a string. The function symbol must be
-// resolvable via the runtime.GlobalSymbolResolver. The parameter types must
-// be encodable.
-func DecodeFunc(data string) (reflectx.Func, error) {
-	return graphx.DecodeFn(data)
-}
-
 // EncodedFunc is a serialization wrapper around a function for convenience.
 type EncodedFunc struct {
 	// Fn is the function to preserve across serialization.
@@ -86,7 +60,7 @@ type EncodedFunc struct {
 
 // MarshalJSON returns the JSON encoding this value.
 func (w EncodedFunc) MarshalJSON() ([]byte, error) {
-	str, err := EncodeFunc(w.Fn)
+	str, err := graphx.EncodeFn(w.Fn)
 	if err != nil {
 		return nil, err
 	}
@@ -99,7 +73,7 @@ func (w *EncodedFunc) UnmarshalJSON(buf []byte) error {
 	if err := json.Unmarshal(buf, &s); err != nil {
 		return err
 	}
-	fn, err := DecodeFunc(s)
+	fn, err := graphx.DecodeFn(s)
 	if err != nil {
 		return err
 	}
@@ -107,13 +81,6 @@ func (w *EncodedFunc) UnmarshalJSON(buf []byte) error {
 	return nil
 }
 
-// EncodeCoder encodes a coder as a string. Any custom coder function
-// symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must
-// be encodable.
-func EncodeCoder(c Coder) (string, error) {
-	return graphx.EncodeCoder(c.coder)
-}
-
 // DecodeCoder decodes a coder. Any custom coder function symbol must be
 // resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.
 func DecodeCoder(data string) (Coder, error) {
@@ -132,7 +99,7 @@ type EncodedCoder struct {
 
 // MarshalJSON returns the JSON encoding this value.
 func (w EncodedCoder) MarshalJSON() ([]byte, error) {
-	str, err := EncodeCoder(w.Coder)
+	str, err := graphx.EncodeCoder(w.Coder.coder)
 	if err != nil {
 		return nil, err
 	}
@@ -145,10 +112,10 @@ func (w *EncodedCoder) UnmarshalJSON(buf []byte) error {
 	if err := json.Unmarshal(buf, &s); err != nil {
 		return err
 	}
-	c, err := DecodeCoder(s)
+	c, err := graphx.DecodeCoder(s)
 	if err != nil {
 		return err
 	}
-	w.Coder = c
+	w.Coder = Coder{coder: c}
 	return nil
 }
diff --git a/sdks/go/pkg/beam/util.go b/sdks/go/pkg/beam/util.go
index aa0ff7c..9fad037 100644
--- a/sdks/go/pkg/beam/util.go
+++ b/sdks/go/pkg/beam/util.go
@@ -112,11 +112,3 @@ func Must(a PCollection, err error) PCollection {
 	}
 	return a
 }
-
-// Must2 returns the input, but panics if err != nil.
-func Must2(a, b PCollection, err error) (PCollection, PCollection) {
-	if err != nil {
-		panic(err)
-	}
-	return a, b
-}


Mime
View raw message