beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ib...@apache.org
Subject [beam] branch master updated: [BEAM-11811] Disallow num_workers > max_num_workers for Go Dataflow runner
Date Tue, 08 Jun 2021 20:48:13 GMT
This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e4c99da  [BEAM-11811] Disallow num_workers > max_num_workers for Go Dataflow runner
     new 2035e1b  Merge pull request #14955 from zhoufek/wsc_go
e4c99da is described below

commit e4c99da30b69768dbda7ba58b1f59de4fd13be58
Author: zhoufek <zhoufek@google.com>
AuthorDate: Mon Jun 7 12:27:04 2021 -0400

    [BEAM-11811] Disallow num_workers > max_num_workers for Go Dataflow runner
---
 .../pkg/beam/runners/dataflow/dataflowlib/job.go   | 13 +++++++++
 .../beam/runners/dataflow/dataflowlib/job_test.go  | 33 ++++++++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 390082b..6249441 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -17,6 +17,7 @@ package dataflowlib
 
 import (
 	"context"
+	"fmt"
 	"strings"
 	"time"
 
@@ -345,5 +346,17 @@ func validateWorkerSettings(ctx context.Context, opts *JobOptions) error
{
 		opts.WorkerZone = opts.Zone
 		opts.Zone = ""
 	}
+
+	numWorkers := opts.NumWorkers
+	maxNumWorkers := opts.MaxNumWorkers
+	if numWorkers < 0 {
+		return fmt.Errorf("num_workers (%d) cannot be negative", numWorkers)
+	}
+	if maxNumWorkers < 0 {
+		return fmt.Errorf("max_num_workers (%d) cannot be negative", maxNumWorkers)
+	}
+	if numWorkers > 0 && maxNumWorkers > 0 && numWorkers > maxNumWorkers
{
+		return fmt.Errorf("num_workers (%d) cannot exceed max_num_workers (%d)", numWorkers, maxNumWorkers)
+	}
 	return nil
 }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
index a760229..1bf178f 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
@@ -76,6 +76,28 @@ func TestValidateWorkerSettings(t *testing.T) {
 			},
 			errMessage: "experiment worker_region and option Zone are mutually exclusive",
 		},
+		{
+			name: "test_num_workers_cannot_be_negative",
+			jobOptions: JobOptions{
+				NumWorkers: -1,
+			},
+			errMessage: "num_workers (-1) cannot be negative",
+		},
+		{
+			name: "test_max_num_workers_cannot_be_negative",
+			jobOptions: JobOptions{
+				MaxNumWorkers: -1,
+			},
+			errMessage: "max_num_workers (-1) cannot be negative",
+		},
+		{
+			name: "test_num_workers_cannot_exceed_max_num_workers",
+			jobOptions: JobOptions{
+				NumWorkers:    43,
+				MaxNumWorkers: 42,
+			},
+			errMessage: "num_workers (43) cannot exceed max_num_workers (42)",
+		},
 	}
 
 	for _, test := range testsWithErr {
@@ -110,6 +132,17 @@ func TestValidateWorkerSettings(t *testing.T) {
 			opts:     JobOptions{WorkerRegion: "foo"},
 			expected: JobOptions{WorkerRegion: "foo"},
 		},
+		{
+			name: "test_num_workers_can_equal_max_num_workers",
+			opts: JobOptions{
+				NumWorkers:    42,
+				MaxNumWorkers: 42,
+			},
+			expected: JobOptions{
+				NumWorkers:    42,
+				MaxNumWorkers: 42,
+			},
+		},
 	}
 
 	for _, test := range tests {

Mime
View raw message