beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lostl...@apache.org
Subject [beam] branch master updated: [BEAM-12438] Add Regression test for issue around LP coding Row coders. (#14924)
Date Mon, 07 Jun 2021 16:46:57 GMT
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a571952  [BEAM-12438] Add Regression test for issue around LP coding Row coders.
(#14924)
a571952 is described below

commit a571952e3ce470b3871ebd333eacb3d6368d2737
Author: Robert Burke <lostluck@users.noreply.github.com>
AuthorDate: Mon Jun 7 09:45:26 2021 -0700

    [BEAM-12438] Add Regression test for issue around LP coding Row coders. (#14924)
    
    * [BEAM-12438] Run regression during integration
    * [BEAM-12438] Add LP error repro
    * [BEAM-12438] Ignore extra LP on injects.
    * [BEAM-12438] Populate schema option types.
    
    Co-authored-by: zelliott
---
 model/pipeline/src/main/proto/schema.proto         |  6 +--
 sdks/go/pkg/beam/core/runtime/exec/translate.go    |  9 +++-
 .../pkg/beam/core/runtime/graphx/schema/schema.go  | 56 ++++++++++++++-----
 .../beam/core/runtime/graphx/schema/schema_test.go | 16 ++----
 sdks/go/test/regression/lperror.go                 | 63 ++++++++++++++++++++++
 .../regression/{pardo_test.go => lperror_test.go}  | 46 +++++-----------
 sdks/go/test/regression/pardo_test.go              | 35 ++++++------
 .../{pardo_test.go => regression_test.go}          | 38 ++-----------
 sdks/go/test/run_validatesrunner_tests.sh          |  4 +-
 9 files changed, 158 insertions(+), 115 deletions(-)

diff --git a/model/pipeline/src/main/proto/schema.proto b/model/pipeline/src/main/proto/schema.proto
index 837689f..bcab2e7 100644
--- a/model/pipeline/src/main/proto/schema.proto
+++ b/model/pipeline/src/main/proto/schema.proto
@@ -113,9 +113,9 @@ message LogicalType {
 message Option {
   // REQUIRED. Identifier for the option.
   string name = 1;
-  // OPTIONAL. Type specifer for the structure of value.
-  // If not present, assumes no additional configuration is needed
-  // for this option and value is ignored.
+  // REQUIRED. Type specifer for the structure of value.
+  // Conventionally, options that don't require additional configuration should
+  // use a boolean type, with the value set to true.
   FieldType type = 2;
   FieldValue value = 3;
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index fbbdab3..105eb82 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -472,7 +472,14 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 			if !coder.IsKV(c) {
 				return nil, errors.Errorf("unexpected inject coder: %v", c)
 			}
-			u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), ValueEncoder: MakeElementEncoder(c.Components[1]),
Out: out[0]}
+			valCoder := c.Components[1]
+			// JIRA BEAM-12438 - an extra LP coder can get added here, but isn't added
+			// on decode. Strip them until we get a better fix.
+			if valCoder.Kind == coder.LP {
+				// strip unexpected length prefix coder.
+				valCoder = valCoder.Components[0]
+			}
+			u = &Inject{UID: b.idgen.New(), N: (int)(tp.Inject.N), ValueEncoder: MakeElementEncoder(valCoder),
Out: out[0]}
 
 		case graphx.URNExpand:
 			var pid string
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 087d8c1..2e3ea3f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -356,9 +356,7 @@ func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
 		schm := ftype.GetRowType().GetSchema()
 		schm = proto.Clone(schm).(*pipepb.Schema)
 		if ot.Kind() == reflect.Ptr {
-			schm.Options = append(schm.Options, &pipepb.Option{
-				Name: optGoNillable,
-			})
+			schm.Options = append(schm.Options, optGoNillable())
 		}
 		if lID != "" {
 			schm.Options = append(schm.Options, logicalOption(lID))
@@ -379,9 +377,7 @@ func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error) {
 	pt := reflect.PtrTo(t)
 	schm = proto.Clone(schm).(*pipepb.Schema)
 	schm.Id = getUUID(pt)
-	schm.Options = append(schm.Options, &pipepb.Option{
-		Name: optGoNillable,
-	})
+	schm.Options = append(schm.Options, optGoNillable())
 	r.idToType[schm.GetId()] = pt
 	r.typeToSchema[pt] = schm
 
@@ -392,14 +388,46 @@ func (r *Registry) fromType(ot reflect.Type) (*pipepb.Schema, error)
{
 // Schema Option urns.
 const (
 	// optGoNillable indicates that this top level schema should be returned as a pointer type.
-	optGoNillable = "beam:schema:go:nillable:v1"
+	optGoNillableUrn = "beam:schema:go:nillable:v1"
 	// optGoEmbedded indicates that this field is an embedded type.
-	optGoEmbedded = "beam:schema:go:embedded_field:v1"
+	optGoEmbeddedUrn = "beam:schema:go:embedded_field:v1"
 	// optGoLogical indicates that this top level schema has a logical type equivalent that
need to be looked up.
 	// It has a value type of String representing the URN for the logical type to look up.
-	optGoLogical = "beam:schema:go:logical:v1"
+	optGoLogicalUrn = "beam:schema:go:logical:v1"
 )
 
+func optGoNillable() *pipepb.Option {
+	return newToggleOption(optGoNillableUrn)
+}
+
+func optGoEmbedded() *pipepb.Option {
+	return newToggleOption(optGoEmbeddedUrn)
+}
+
+// newToggleOption constructs an Option whose presence is all
+// that matters, rather than other configuration. The option
+// is not set if the toggle isn't true, so the value is always
+// true.
+func newToggleOption(urn string) *pipepb.Option {
+	return &pipepb.Option{
+		Name: urn,
+		Type: &pipepb.FieldType{
+			TypeInfo: &pipepb.FieldType_AtomicType{
+				AtomicType: pipepb.AtomicType_BOOLEAN,
+			},
+		},
+		Value: &pipepb.FieldValue{
+			FieldValue: &pipepb.FieldValue_AtomicValue{
+				AtomicValue: &pipepb.AtomicTypeValue{
+					Value: &pipepb.AtomicTypeValue_Boolean{
+						Boolean: true,
+					},
+				},
+			},
+		},
+	}
+}
+
 func checkOptions(opts []*pipepb.Option, urn string) *pipepb.Option {
 	for _, opt := range opts {
 		if opt.GetName() == urn {
@@ -412,7 +440,7 @@ func checkOptions(opts []*pipepb.Option, urn string) *pipepb.Option {
 // nillableFromOptions converts the passed in type to it's pointer version
 // if the option is present. This permits go types to be pointers.
 func nillableFromOptions(opts []*pipepb.Option, t reflect.Type) reflect.Type {
-	if checkOptions(opts, optGoNillable) != nil {
+	if checkOptions(opts, optGoNillableUrn) != nil {
 		return reflect.PtrTo(t)
 	}
 	return nil
@@ -426,7 +454,7 @@ var optGoLogicalType = &pipepb.FieldType{
 
 func logicalOption(lID string) *pipepb.Option {
 	return &pipepb.Option{
-		Name: optGoLogical,
+		Name: optGoLogicalUrn,
 		Type: optGoLogicalType,
 		Value: &pipepb.FieldValue{
 			FieldValue: &pipepb.FieldValue_AtomicValue{
@@ -443,7 +471,7 @@ func logicalOption(lID string) *pipepb.Option {
 // fromLogicalOption returns the logical type id of this top
 // level type if this schema has a logical equivalent.
 func fromLogicalOption(opts []*pipepb.Option) (string, bool) {
-	o := checkOptions(opts, optGoLogical)
+	o := checkOptions(opts, optGoLogicalUrn)
 	if o == nil {
 		return "", false
 	}
@@ -489,7 +517,7 @@ func (r *Registry) structToSchema(t reflect.Type) (*pipepb.Schema, error)
{
 		}
 		if isAnon {
 			f = proto.Clone(f).(*pipepb.Field)
-			f.Options = append(f.Options, &pipepb.Option{Name: optGoEmbedded})
+			f.Options = append(f.Options, optGoEmbedded())
 		}
 		fields = append(fields, f)
 	}
@@ -663,7 +691,7 @@ func (r *Registry) toType(s *pipepb.Schema) (reflect.Type, error) {
 		if err != nil {
 			return nil, errors.Wrapf(err, "cannot convert schema field %v to field", sf.GetName())
 		}
-		if checkOptions(sf.Options, optGoEmbedded) != nil {
+		if checkOptions(sf.Options, optGoEmbeddedUrn) != nil {
 			rf.Anonymous = true
 		}
 		fields = append(fields, rf)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
index b298d43..9fe2132 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema_test.go
@@ -404,9 +404,7 @@ func TestSchemaConversion(t *testing.T) {
 						},
 					},
 				},
-				Options: []*pipepb.Option{{
-					Name: optGoNillable,
-				}},
+				Options: []*pipepb.Option{optGoNillable()},
 			},
 			rt: reflect.TypeOf(&struct {
 				SuperNES int16
@@ -530,9 +528,7 @@ func TestSchemaConversion(t *testing.T) {
 						},
 					},
 				},
-				Options: []*pipepb.Option{{
-					Name: optGoNillable,
-				}, logicalOption("*schema.exportedFunc")},
+				Options: []*pipepb.Option{optGoNillable(), logicalOption("*schema.exportedFunc")},
 			},
 			rt: exportedFuncType,
 		}, {
@@ -568,7 +564,7 @@ func TestSchemaConversion(t *testing.T) {
 				Fields: []*pipepb.Field{
 					{
 						Name:    "Exported",
-						Options: []*pipepb.Option{&pipepb.Option{Name: optGoEmbedded}},
+						Options: []*pipepb.Option{optGoEmbedded()},
 						Type: &pipepb.FieldType{
 							TypeInfo: &pipepb.FieldType_RowType{
 								RowType: &pipepb.RowType{
@@ -610,7 +606,7 @@ func TestSchemaConversion(t *testing.T) {
 				Fields: []*pipepb.Field{
 					{
 						Name:    "Exported",
-						Options: []*pipepb.Option{&pipepb.Option{Name: optGoEmbedded}},
+						Options: []*pipepb.Option{optGoEmbedded()},
 						Type: &pipepb.FieldType{
 							Nullable: true,
 							TypeInfo: &pipepb.FieldType_RowType{
@@ -660,9 +656,7 @@ func TestSchemaConversion(t *testing.T) {
 						},
 					},
 				},
-				Options: []*pipepb.Option{{
-					Name: optGoNillable,
-				}},
+				Options: []*pipepb.Option{optGoNillable()},
 			},
 			rt: reflect.TypeOf(&struct {
 				myInt
diff --git a/sdks/go/test/regression/lperror.go b/sdks/go/test/regression/lperror.go
new file mode 100644
index 0000000..4e27f97
--- /dev/null
+++ b/sdks/go/test/regression/lperror.go
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package regression
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam"
+)
+
+// REPRO found by https://github.com/zelliott
+
+type fruit struct {
+	Name string
+}
+
+func toFoo(id int, _ func(**fruit) bool) (int, string) {
+	return id, "Foo"
+}
+
+func toID(id int, fruitIter func(**fruit) bool, _ func(*string) bool) int {
+	var fruit *fruit
+	for fruitIter(&fruit) {
+	}
+	return id
+}
+
+// LPErrorPipeline constructs a pipeline that has a GBK followed by a CoGBK using the same
+// input, with schema encoded structs as elements. This ends up having the stage after the
+// CoGBK fail since the decoder post-cogbk is missing a Length Prefix coder that was
+// applied to the GBK input, but not the CoGBK output.
+// Root is likely in that there's no Beam standard CoGBK format for inject and expand.
+// JIRA: BEAM-12438
+func LPErrorPipeline(s beam.Scope) beam.PCollection {
+	// ["Apple", "Banana", "Cherry"]
+	fruits := beam.CreateList(s, []*fruit{{"Apple"}, {"Banana"}, {"Cherry"}})
+
+	// [0 "Apple", 0 "Banana", 0 "Cherry"]
+	fruitsKV := beam.AddFixedKey(s, fruits)
+
+	// [0 ["Apple", "Banana", "Cherry"]]
+	fruitsGBK := beam.GroupByKey(s, fruitsKV)
+
+	// [0 "Foo"]
+	fooKV := beam.ParDo(s, toFoo, fruitsGBK)
+
+	// [0 ["Foo"] ["Apple", "Banana", "Cherry"]]
+	fruitsFooCoGBK := beam.CoGroupByKey(s, fruitsKV, fooKV)
+
+	// [0]
+	return beam.ParDo(s, toID, fruitsFooCoGBK)
+}
diff --git a/sdks/go/test/regression/pardo_test.go b/sdks/go/test/regression/lperror_test.go
similarity index 56%
copy from sdks/go/test/regression/pardo_test.go
copy to sdks/go/test/regression/lperror_test.go
index 322dd69..773570d 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/lperror_test.go
@@ -18,41 +18,23 @@ package regression
 import (
 	"testing"
 
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
 	"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
-)
-
-func TestDirectParDo(t *testing.T) {
-	if err := ptest.Run(DirectParDo()); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestEmitParDo(t *testing.T) {
-	if err := ptest.Run(EmitParDo()); err != nil {
-		t.Error(err)
-	}
-}
+	"github.com/apache/beam/sdks/go/test/integration"
 
-func TestMultiEmitParDo(t *testing.T) {
-	if err := ptest.Run(MultiEmitParDo()); err != nil {
-		t.Error(err)
-	}
-}
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark"
+)
 
-func TestMixedOutputParDo(t *testing.T) {
-	if err := ptest.Run(MixedOutputParDo()); err != nil {
-		t.Error(err)
-	}
-}
+func TestLPErrorPipeline(t *testing.T) {
+	integration.CheckFilters(t)
 
-func TestDirectParDoAfterGBK(t *testing.T) {
-	if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
-		t.Error(err)
-	}
-}
+	pipeline, s := beam.NewPipelineWithRoot()
+	want := beam.CreateList(s, []int{0})
+	got := LPErrorPipeline(s)
+	passert.Equals(s, got, want)
 
-func TestEmitParDoAfterGBK(t *testing.T) {
-	if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
-		t.Error(err)
-	}
+	ptest.RunAndValidate(t, pipeline)
 }
diff --git a/sdks/go/test/regression/pardo_test.go b/sdks/go/test/regression/pardo_test.go
index 322dd69..cbcb81c 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/pardo_test.go
@@ -19,40 +19,39 @@ import (
 	"testing"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
+	"github.com/apache/beam/sdks/go/test/integration"
+
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark"
 )
 
 func TestDirectParDo(t *testing.T) {
-	if err := ptest.Run(DirectParDo()); err != nil {
-		t.Error(err)
-	}
+	integration.CheckFilters(t)
+	ptest.RunAndValidate(t, DirectParDo())
 }
 
 func TestEmitParDo(t *testing.T) {
-	if err := ptest.Run(EmitParDo()); err != nil {
-		t.Error(err)
-	}
+	integration.CheckFilters(t)
+	ptest.RunAndValidate(t, EmitParDo())
 }
 
 func TestMultiEmitParDo(t *testing.T) {
-	if err := ptest.Run(MultiEmitParDo()); err != nil {
-		t.Error(err)
-	}
+	integration.CheckFilters(t)
+	ptest.RunAndValidate(t, MultiEmitParDo())
 }
 
 func TestMixedOutputParDo(t *testing.T) {
-	if err := ptest.Run(MixedOutputParDo()); err != nil {
-		t.Error(err)
-	}
+	integration.CheckFilters(t)
+	ptest.RunAndValidate(t, MixedOutputParDo())
 }
 
 func TestDirectParDoAfterGBK(t *testing.T) {
-	if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
-		t.Error(err)
-	}
+	integration.CheckFilters(t)
+	ptest.RunAndValidate(t, DirectParDoAfterGBK())
 }
 
 func TestEmitParDoAfterGBK(t *testing.T) {
-	if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
-		t.Error(err)
-	}
+	integration.CheckFilters(t)
+	ptest.RunAndValidate(t, EmitParDoAfterGBK())
 }
diff --git a/sdks/go/test/regression/pardo_test.go b/sdks/go/test/regression/regression_test.go
similarity index 56%
copy from sdks/go/test/regression/pardo_test.go
copy to sdks/go/test/regression/regression_test.go
index 322dd69..132e9a8 100644
--- a/sdks/go/test/regression/pardo_test.go
+++ b/sdks/go/test/regression/regression_test.go
@@ -21,38 +21,8 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
 )
 
-func TestDirectParDo(t *testing.T) {
-	if err := ptest.Run(DirectParDo()); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestEmitParDo(t *testing.T) {
-	if err := ptest.Run(EmitParDo()); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestMultiEmitParDo(t *testing.T) {
-	if err := ptest.Run(MultiEmitParDo()); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestMixedOutputParDo(t *testing.T) {
-	if err := ptest.Run(MixedOutputParDo()); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestDirectParDoAfterGBK(t *testing.T) {
-	if err := ptest.Run(DirectParDoAfterGBK()); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestEmitParDoAfterGBK(t *testing.T) {
-	if err := ptest.Run(EmitParDoAfterGBK()); err != nil {
-		t.Error(err)
-	}
+// TestMain invokes ptest.Main to allow running these tests on
+// non-direct runners.
+func TestMain(m *testing.M) {
+	ptest.Main(m)
 }
diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh
index a43898e..00ec453 100755
--- a/sdks/go/test/run_validatesrunner_tests.sh
+++ b/sdks/go/test/run_validatesrunner_tests.sh
@@ -310,11 +310,11 @@ if [[ "$JENKINS" == true ]]; then
   cd ./src
 
   echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS"
-  GOPATH=$TEMP_GOPATH go test -v github.com/apache/beam/sdks/go/test/integration/... $ARGS
\
+  GOPATH=$TEMP_GOPATH go test -v github.com/apache/beam/sdks/go/test/integration/... github.com/apache/beam/sdks/go/test/regression
$ARGS \
       || TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before exiting
 else
   echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS"
-  go test -v ./sdks/go/test/integration/... $ARGS \
+  go test -v ./sdks/go/test/integration/... ./sdks/go/test/regression  $ARGS \
       || TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before exiting
 fi
 

Mime
View raw message