beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3287) Go SDK support for portable pipelines
Date Wed, 21 Mar 2018 00:28:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3287?focusedWorklogId=82561&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82561
]

ASF GitHub Bot logged work on BEAM-3287:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Mar/18 00:27
            Start Date: 21/Mar/18 00:27
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #4888: [BEAM-3287] Add Go support for
universal runners, incl Flink
URL: https://github.com/apache/beam/pull/4888
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 5259246a276..a7357a2999b 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -77,24 +77,41 @@ func main() {
 	// (2) Retrieve the staged files.
 	//
 	// The Go SDK harness downloads the worker binary and invokes
-	// it. For now, we assume that the first (and only) package
-	// is the binary.
+	// it. The binary is required to be keyed as "worker", if there
+	// are more than one artifact.
 
 	dir := filepath.Join(*semiPersistDir, "staged")
 	artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, dir)
 	if err != nil {
 		log.Fatalf("Failed to retrieve staged files: %v", err)
 	}
-	if len(artifacts) == 0 {
-		log.Fatal("No binaries staged")
+
+	const worker = "worker"
+	name := worker
+
+	switch len(artifacts) {
+	case 0:
+		log.Fatal("No artifacts staged")
+	case 1:
+		name = artifacts[0].Name
+	default:
+		found := false
+		for _, a := range artifacts {
+			if a.Name == worker {
+				found = true
+				break
+			}
+		}
+		if !found {
+			log.Fatalf("No artifact named '%v' found", worker)
+		}
 	}
 
 	// (3) The persist dir may be on a noexec volume, so we must
 	// copy the binary to a different location to execute.
-
-	prog := filepath.Join("/bin", artifacts[0].Name)
-	if err := copyExe(filepath.Join(dir, artifacts[0].Name), prog); err != nil {
-		log.Fatalf("Failed to copy binary: %v", err)
+	const prog = "/bin/worker"
+	if err := copyExe(filepath.Join(dir, name), prog); err != nil {
+		log.Fatalf("Failed to copy worker binary: %v", err)
 	}
 
 	args := []string{
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 7c3519ada83..616a7f44d0f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -28,12 +28,12 @@ import (
 const (
 	// Model constants
 
-	URNImpulse = "urn:beam:transform:impulse:v1"
+	URNImpulse = "beam:transform:impulse:v1"
 	URNParDo   = "urn:beam:transform:pardo:v1"
-	URNFlatten = "urn:beam:transform:flatten:v1"
-	URNGBK     = "urn:beam:transform:groupbykey:v1"
-	URNCombine = "urn:beam:transform:combine:v1"
-	URNWindow  = "urn:beam:transform:window:v1"
+	URNFlatten = "beam:transform:flatten:v1"
+	URNGBK     = "beam:transform:group_by_key:v1"
+	URNCombine = "beam:transform:combine:v1"
+	URNWindow  = "beam:transform:window:v1"
 
 	URNGlobalWindowsWindowFn = "beam:windowfn:global_windows:v0.1"
 
@@ -42,7 +42,7 @@ const (
 	// TODO: remove URNJavaDoFN when the Dataflow runner
 	// uses the model pipeline and no longer falls back to Java.
 	URNJavaDoFn = "urn:beam:dofn:javasdk:0.1"
-	URNDoFn     = "urn:beam:go:transform:dofn:v1"
+	URNDoFn     = "beam:go:transform:dofn:v1"
 )
 
 // TODO(herohde) 11/6/2017: move some of the configuration into the graph during construction.
@@ -372,6 +372,7 @@ func (m *marshaller) addWindowingStrategy(w *window.Window) string {
 			OutputTime:      pb.OutputTime_END_OF_WINDOW,
 			ClosingBehavior: pb.ClosingBehavior_EMIT_IF_NONEMPTY,
 			AllowedLateness: 0,
+			OnTimeBehavior:  pb.OnTimeBehavior_FIRE_ALWAYS,
 		}
 		m.windowing[id] = ws
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
index c99601da989..9b37f0dac98 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
@@ -49,28 +49,6 @@ func init() {
 	runtime.RegisterInit(hook)
 }
 
-// TODO(herohde) 7/7/2017: Dataflow has a concept of sdk pipeline options and
-// various values in this map:
-//
-// { "display_data": [...],
-//   "options":{
-//      "autoscalingAlgorithm":"NONE",
-//      "dataflowJobId":"2017-07-07_xxx",
-//      "gcpTempLocation":"",
-//      "maxNumWorkers":0,
-//      "numWorkers":1,
-//      "project":"xxx",
-//      "options": <Go SDK pipeline options>,
-//  }}
-//
-// Which we may or may not want to be visible as first-class pipeline options.
-// It is also TBD how/if to support global display data, but we certainly don't
-// want it served back to the harness.
-
-type wrapper struct {
-	Options runtime.RawOptions `json:"options"`
-}
-
 // hook starts the harness, if in worker mode. Otherwise, is is a no-op.
 func hook() {
 	if !*worker {
@@ -84,7 +62,7 @@ func hook() {
 	// harness.Main returns. We want to be sure any error makes it out.
 
 	if *options != "" {
-		var opt wrapper
+		var opt runtime.RawOptionsWrapper
 		if err := json.Unmarshal([]byte(*options), &opt); err != nil {
 			fmt.Fprintf(os.Stderr, "Failed to parse pipeline options '%v': %v", *options, err)
 			os.Exit(1)
diff --git a/sdks/go/pkg/beam/core/runtime/options.go b/sdks/go/pkg/beam/core/runtime/options.go
index 23fa1609f7e..19f2303144e 100644
--- a/sdks/go/pkg/beam/core/runtime/options.go
+++ b/sdks/go/pkg/beam/core/runtime/options.go
@@ -38,6 +38,37 @@ type RawOptions struct {
 	Options map[string]string `json:"options"`
 }
 
+// TODO(herohde) 7/7/2017: Dataflow has a concept of sdk pipeline options and
+// various values in this map:
+//
+// { "display_data": [...],
+//   "options":{
+//      "autoscalingAlgorithm":"NONE",
+//      "dataflowJobId":"2017-07-07_xxx",
+//      "gcpTempLocation":"",
+//      "maxNumWorkers":0,
+//      "numWorkers":1,
+//      "project":"xxx",
+//      "options": <Go SDK pipeline options>,
+//  }}
+//
+// Which we may or may not want to be visible as first-class pipeline options.
+// It is also TBD how/if to support global display data, but we certainly don't
+// want it served back to the harness.
+
+// TODO(herohde) 3/12/2018: remove the extra options wrapper and the bogus
+// fields current required by the Java runners.
+
+// RawOptionsWrapper wraps RawOptions to the form expected by the
+// harness. The extra layer is currently needed due to Dataflow
+// expectations about this representation. Subject to change.
+type RawOptionsWrapper struct {
+	Options     RawOptions `json:"beam:option:go_options:v1"`
+	Runner      string     `json:"beam:option:runner:v1"`
+	AppName     string     `json:"beam:option:app_name:v1"`
+	Experiments []string   `json:"beam:option:experiments:v1"`
+}
+
 // Import imports the options from previously exported data and makes the
 // options read-only. It panics if import is called twice.
 func (o *Options) Import(opt RawOptions) {
diff --git a/sdks/go/pkg/beam/options/jobopts/options.go b/sdks/go/pkg/beam/options/jobopts/options.go
new file mode 100644
index 00000000000..1079ec17073
--- /dev/null
+++ b/sdks/go/pkg/beam/options/jobopts/options.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package jobopts contains shared options for job submission. These options
+// are exposed to allow user code to inspect and modify them.
+package jobopts
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"strings"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/log"
+)
+
+var (
+	// Endpoint is the job service endpoint.
+	Endpoint = flag.String("endpoint", "", "Job service endpoint (required).")
+
+	// JobName is the name of the job.
+	JobName = flag.String("job_name", "", "Job name (optional).")
+
+	// ContainerImage is the location of the SDK harness container image.
+	ContainerImage = flag.String("container_image", "", "Container image")
+
+	// Experiments toggle experimental features in the runner.
+	Experiments = flag.String("experiments", "", "Comma-separated list of experiments (optional).")
+
+	// Async determines whether to wait for job completion.
+	Async = flag.Bool("async", false, "Do not wait for job completion.")
+
+	// InternalJavaRunner is the java class needed at this time for Java runners.
+	// To be removed.
+	InternalJavaRunner = flag.String("internal_java_runner", "", "Internal java runner class.")
+)
+
+// GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
+// such as Dataflow set a reasonable default. Convenience function.
+func GetEndpoint() (string, error) {
+	if *Endpoint == "" {
+		return "", fmt.Errorf("no job service endpoint specified. Use --endpoint=<endpoint>")
+	}
+	return *Endpoint, nil
+}
+
+// GetJobName returns the specified job name or, if not present, an
+// autogenerated name. Convenience function.
+func GetJobName() string {
+	if *JobName == "" {
+		*JobName = fmt.Sprintf("go-job-%v", time.Now().UnixNano())
+	}
+	return *JobName
+}
+
+// GetContainerImage returns the specified SDK harness container image or,
+// if not present, the default development container for the current user.
+// Convenience function.
+func GetContainerImage(ctx context.Context) string {
+	if *ContainerImage == "" {
+		*ContainerImage = os.ExpandEnv("$USER-docker-apache.bintray.io/beam/go:latest")
+		log.Infof(ctx, "No container image specified. Using dev image: '%v'", *ContainerImage)
+	}
+	return *ContainerImage
+}
+
+// GetExperiments returns the experiments.
+func GetExperiments() []string {
+	if *Experiments == "" {
+		return nil
+	}
+	return strings.Split(*Experiments, ",")
+}
diff --git a/sdks/go/pkg/beam/provision/provision.go b/sdks/go/pkg/beam/provision/provision.go
index 656b3f27539..7349ef2a343 100644
--- a/sdks/go/pkg/beam/provision/provision.go
+++ b/sdks/go/pkg/beam/provision/provision.go
@@ -45,7 +45,7 @@ func Info(ctx context.Context, endpoint string) (*pb.ProvisionInfo, error)
{
 		return nil, fmt.Errorf("failed to get manifest: %v", err)
 	}
 	if resp.GetInfo() == nil {
-		return nil, fmt.Errorf("empty manifest",)
+		return nil, fmt.Errorf("empty manifest")
 	}
 	return resp.GetInfo(), nil
 }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index c750a408011..c50d76fb9b4 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -23,12 +23,8 @@ import (
 	"flag"
 	"fmt"
 	"os"
-	"os/exec"
 	"os/user"
 	"path"
-	"path/filepath"
-	"runtime"
-	"strings"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
@@ -38,6 +34,8 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
+	"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
+	"github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
 	"github.com/golang/protobuf/proto"
 	"golang.org/x/oauth2/google"
@@ -49,14 +47,11 @@ import (
 
 var (
 	endpoint        = flag.String("dataflow_endpoint", "", "Dataflow endpoint (optional).")
-	jobName         = flag.String("job_name", "", "Dataflow job name (optional).")
 	stagingLocation = flag.String("staging_location", "", "GCS staging location (required).")
 	image           = flag.String("worker_harness_container_image", "", "Worker harness container
image (required).")
 	numWorkers      = flag.Int64("num_workers", 0, "Number of workers (optional).")
-	experiments     = flag.String("experiments", "", "Comma-separated list of experiments (optional).")
 
 	dryRun         = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit
it.")
-	block          = flag.Bool("block", true, "Wait for job to terminate.")
 	teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).")
 
 	// SDK options
@@ -85,12 +80,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 		return errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>")
 	}
 	if *image == "" {
-		return errors.New("no container image specified. Use --worker_harness_container_image=<image>")
-	}
-
-	if *jobName == "" {
-		*jobName = fmt.Sprintf("go-%v-%v", username(), time.Now().UnixNano())
+		*image = jobopts.GetContainerImage(ctx)
 	}
+	jobName := jobopts.GetJobName()
 
 	edges, _, err := p.Build()
 	if err != nil {
@@ -111,10 +103,12 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
 	// (1) Upload Go binary and model to GCS.
 
-	worker, err := buildLocalBinary(ctx)
+	worker, err := runnerlib.BuildTempWorkerBinary(ctx)
 	if err != nil {
 		return err
 	}
+	defer os.Remove(worker)
+
 	binary, err := stageWorker(ctx, project, *stagingLocation, worker)
 	if err != nil {
 		return err
@@ -139,7 +133,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
 	job := &df.Job{
 		ProjectId: project,
-		Name:      *jobName,
+		Name:      jobName,
 		Type:      "JOB_TYPE_BATCH",
 		Environment: &df.Environment{
 			UserAgent: newMsg(userAgent{
@@ -167,6 +161,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 				NumWorkers:                  1,
 			}},
 			TempStoragePrefix: *stagingLocation + "/tmp",
+			Experiments:       jobopts.GetExperiments(),
 		},
 		Steps: steps,
 	}
@@ -177,9 +172,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	if *teardownPolicy != "" {
 		job.Environment.WorkerPools[0].TeardownPolicy = *teardownPolicy
 	}
-	if *experiments != "" {
-		job.Environment.Experiments = strings.Split(*experiments, ",")
-	}
 	printJob(ctx, job)
 
 	if *dryRun {
@@ -205,7 +197,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 	}
 	log.Infof(ctx, "Logs: https://console.cloud.google.com/logs/viewer?project=%v&resource=dataflow_step%%2Fjob_id%%2F%v",
project, upd.Id)
 
-	if !*block {
+	if *jobopts.Async {
 		return nil
 	}
 
@@ -282,46 +274,6 @@ func stageWorker(ctx context.Context, project, location, worker string)
(string,
 	return gcsx.Upload(client, project, bucket, obj, fd)
 }
 
-// buildLocalBinary creates a local worker binary suitable to run on Dataflow. It finds the
filename
-// by examining the call stack. We want the user entry (*), for example:
-//
-//   /Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/runners/beamexec/main.go
(skip: 2)
-// * /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go
(skip: 3)
-//   /usr/local/go/src/runtime/proc.go (skip: 4)
-//   /usr/local/go/src/runtime/asm_amd64.s (skip: 5)
-func buildLocalBinary(ctx context.Context) (string, error) {
-	ret := filepath.Join(os.TempDir(), fmt.Sprintf("dataflow-go-%v", time.Now().UnixNano()))
-	if *dryRun {
-		log.Infof(ctx, "Dry-run: not building binary %v", ret)
-		return ret, nil
-	}
-
-	program := ""
-	for i := 3; ; i++ {
-		_, file, _, ok := runtime.Caller(i)
-		if !ok || strings.HasSuffix(file, "runtime/proc.go") {
-			break
-		}
-		program = file
-	}
-	if program == "" {
-		return "", fmt.Errorf("could not detect user main")
-	}
-
-	log.Infof(ctx, "Cross-compiling %v as %v", program, ret)
-
-	// Cross-compile given go program. Not awesome.
-	build := []string{"go", "build", "-o", ret, program}
-
-	cmd := exec.Command(build[0], build[1:]...)
-	cmd.Env = append(os.Environ(), "GOOS=linux", "GOARCH=amd64")
-	if out, err := cmd.CombinedOutput(); err != nil {
-		log.Infof(ctx, string(out))
-		return "", fmt.Errorf("failed to cross-compile %v: %v", program, err)
-	}
-	return ret, nil
-}
-
 func username() string {
 	if u, err := user.Current(); err == nil {
 		return u.Username
diff --git a/sdks/go/pkg/beam/runners/dataflow/translate.go b/sdks/go/pkg/beam/runners/dataflow/translate.go
index c2d4f88a11c..440dcd7a9ab 100644
--- a/sdks/go/pkg/beam/runners/dataflow/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/translate.go
@@ -24,6 +24,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
@@ -32,7 +33,6 @@ import (
 	rnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/golang/protobuf/proto"
 	df "google.golang.org/api/dataflow/v1b3"
-	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 )
 
 const (
diff --git a/sdks/go/pkg/beam/runners/flink/flink.go b/sdks/go/pkg/beam/runners/flink/flink.go
new file mode 100644
index 00000000000..74bd307c9d8
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/flink/flink.go
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package flink contains the Flink runner.
+package flink
+
+import (
+	"context"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
+	"github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
+)
+
+func init() {
+	beam.RegisterRunner("flink", Execute)
+}
+
+// Execute runs the given pipeline on Flink. Convenience wrapper over the
+// universal runner.
+func Execute(ctx context.Context, p *beam.Pipeline) error {
+	if *jobopts.InternalJavaRunner == "" {
+		*jobopts.InternalJavaRunner = "org.apache.beam.runners.flink.FlinkRunner"
+	}
+	return universal.Execute(ctx, p)
+}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
new file mode 100644
index 00000000000..1b612d64d11
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/compile.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package runnerlib contains utilities for submitting Go pipelines
+// to a Beam model runner.
+package runnerlib
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"os/exec"
+	"path/filepath"
+	"runtime"
+	"strings"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/log"
+)
+
+// BuildWorkerTempBinary creates a local worker binary in the tmp directory
+// for linux/amd64. Caller responsible for deleting the binary.
+func BuildTempWorkerBinary(ctx context.Context) (string, error) {
+	filename := filepath.Join(os.TempDir(), fmt.Sprintf("beam-go-%v", time.Now().UnixNano()))
+	if err := BuildWorkerBinary(ctx, filename); err != nil {
+		return "", err
+	}
+	return filename, nil
+}
+
+// BuildWorkerBinary creates a local worker binary for linux/amd64. It finds the filename
+// by examining the call stack. We want the user entry (*), for example:
+//
+//   /Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/runners/beamexec/main.go
(skip: 2)
+// * /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go
(skip: 3)
+//   /usr/local/go/src/runtime/proc.go (skip: 4)
+//   /usr/local/go/src/runtime/asm_amd64.s (skip: 5)
+func BuildWorkerBinary(ctx context.Context, filename string) error {
+	program := ""
+	for i := 3; ; i++ {
+		_, file, _, ok := runtime.Caller(i)
+		if !ok || strings.HasSuffix(file, "runtime/proc.go") {
+			break
+		}
+		program = file
+	}
+	if program == "" {
+		return fmt.Errorf("could not detect user main")
+	}
+
+	log.Infof(ctx, "Cross-compiling %v as %v", program, filename)
+
+	// Cross-compile given go program. Not awesome.
+	build := []string{"go", "build", "-o", filename, program}
+
+	cmd := exec.Command(build[0], build[1:]...)
+	cmd.Env = append(os.Environ(), "GOOS=linux", "GOARCH=amd64")
+	if out, err := cmd.CombinedOutput(); err != nil {
+		return fmt.Errorf("failed to cross-compile %v: %v\n%v", program, err, out)
+	}
+	return nil
+}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
new file mode 100644
index 00000000000..c4cdc2b0a13
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package runnerlib
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/log"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+)
+
+// Execute executes a pipeline on the universal runner serving the given endpoint.
+// Convenience function.
+func Execute(ctx context.Context, p *pb.Pipeline, endpoint string, opt *JobOptions, async
bool) (string, error) {
+	// (1) Prepare job to obtain artifact staging instructions.
+
+	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+	if err != nil {
+		return "", fmt.Errorf("failed to connect to job service: %v", err)
+	}
+	defer cc.Close()
+	client := jobpb.NewJobServiceClient(cc)
+
+	prepID, artifactEndpoint, err := Prepare(ctx, client, p, opt)
+	if err != nil {
+		return "", err
+	}
+
+	log.Infof(ctx, "Prepared job with id: %v", prepID)
+
+	// (2) Stage artifacts.
+
+	worker, err := BuildTempWorkerBinary(ctx)
+	if err != nil {
+		return "", err
+	}
+	defer os.Remove(worker)
+
+	token, err := Stage(ctx, prepID, artifactEndpoint, worker)
+	if err != nil {
+		return "", err
+	}
+
+	log.Infof(ctx, "Staged binary artifact with token: %v", token)
+
+	// (3) Submit job
+
+	jobID, err := Submit(ctx, client, prepID, token)
+	if err != nil {
+		return "", err
+	}
+
+	log.Infof(ctx, "Submitted job: %v", jobID)
+
+	// (4) Wait for completion.
+
+	if async {
+		return jobID, nil
+	}
+	return jobID, WaitForCompletion(ctx, client, jobID)
+}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
new file mode 100644
index 00000000000..218513b74b2
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package runnerlib
+
+import (
+	"context"
+	"fmt"
+	"io"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/log"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/provision"
+	"github.com/golang/protobuf/proto"
+)
+
+// JobOptions capture the various options for submitting jobs
+// to universal runners.
+type JobOptions struct {
+	// Name is the job name.
+	Name string
+	// Experiments are additional experiments.
+	Experiments []string
+
+	// TODO(herohde) 3/17/2018: add further parametrization as needed
+
+	// InternalJavaRunner is the class of the receiving Java runner. To be removed.
+	InternalJavaRunner string
+}
+
+// Prepare prepares a job to the given job service. It returns the preparation id
+// and artifact staging endpoint, if successful.
+func Prepare(ctx context.Context, client jobpb.JobServiceClient, p *pb.Pipeline, opt *JobOptions)
(id, endpoint string, err error) {
+	raw := runtime.RawOptionsWrapper{
+		Options:     beam.PipelineOptions.Export(),
+		Runner:      opt.InternalJavaRunner,
+		AppName:     opt.Name,
+		Experiments: append(opt.Experiments, "beam_fn_api"),
+	}
+
+	options, err := provision.OptionsToProto(raw)
+	if err != nil {
+		return "", "", fmt.Errorf("failed to produce pipeline options: %v", err)
+	}
+	req := &jobpb.PrepareJobRequest{
+		Pipeline:        p,
+		PipelineOptions: options,
+		JobName:         opt.Name,
+	}
+	resp, err := client.Prepare(ctx, req)
+	if err != nil {
+		return "", "", fmt.Errorf("failed to connect to job service: %v", err)
+	}
+	return resp.GetPreparationId(), resp.GetArtifactStagingEndpoint().GetUrl(), nil
+}
+
+// Submit submits a job to the given job service. It returns a jobID, if successful.
+func Submit(ctx context.Context, client jobpb.JobServiceClient, id, token string) (string,
error) {
+	req := &jobpb.RunJobRequest{
+		PreparationId: id,
+		StagingToken:  token,
+	}
+
+	resp, err := client.Run(ctx, req)
+	if err != nil {
+		return "", fmt.Errorf("failed to submit job: %v", err)
+	}
+	return resp.GetJobId(), nil
+}
+
+// WaitForCompletion monitors the given job until completion. It logs any messages
+// and state changes received.
+func WaitForCompletion(ctx context.Context, client jobpb.JobServiceClient, jobID string)
error {
+	stream, err := client.GetMessageStream(ctx, &jobpb.JobMessagesRequest{JobId: jobID})
+	if err != nil {
+		return fmt.Errorf("failed to get job stream: %v", err)
+	}
+
+	for {
+		msg, err := stream.Recv()
+		if err != nil {
+			if err == io.EOF {
+				return nil
+			}
+			return err
+		}
+
+		switch {
+		case msg.GetStateResponse() != nil:
+			resp := msg.GetStateResponse()
+
+			log.Infof(ctx, "Job state: %v", resp.GetState().String())
+
+			switch resp.State {
+			case jobpb.JobState_DONE, jobpb.JobState_CANCELLED:
+				return nil
+			case jobpb.JobState_FAILED:
+				return fmt.Errorf("job %v failed", jobID)
+			}
+
+		case msg.GetMessageResponse() != nil:
+			resp := msg.GetMessageResponse()
+
+			text := fmt.Sprintf("%v (%v): %v", resp.GetTime(), resp.GetMessageId(), resp.GetMessageText())
+			log.Output(ctx, messageSeverity(resp.GetImportance()), 1, text)
+
+		default:
+			return fmt.Errorf("unexpected job update: %v", proto.MarshalTextString(msg))
+		}
+	}
+}
+
+func messageSeverity(importance jobpb.JobMessage_MessageImportance) log.Severity {
+	switch importance {
+	case jobpb.JobMessage_JOB_MESSAGE_ERROR:
+		return log.SevError
+	case jobpb.JobMessage_JOB_MESSAGE_WARNING:
+		return log.SevWarn
+	case jobpb.JobMessage_JOB_MESSAGE_BASIC:
+		return log.SevInfo
+	case jobpb.JobMessage_JOB_MESSAGE_DEBUG, jobpb.JobMessage_JOB_MESSAGE_DETAILED:
+		return log.SevDebug
+	default:
+		return log.SevUnspecified
+	}
+}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
new file mode 100644
index 00000000000..369398f73bd
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package runnerlib
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+)
+
+// Stage stages the worker binary and any additional files to the given
+// artifact staging endpoint. It returns the commit token if successful.
+func Stage(ctx context.Context, id, endpoint, binary string, files ...artifact.KeyedFile)
(string, error) {
+	ctx = grpcx.WriteWorkerID(ctx, id)
+	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+	if err != nil {
+		return "", fmt.Errorf("failed to connect to artifact service: %v", err)
+	}
+	defer cc.Close()
+
+	client := jobpb.NewArtifactStagingServiceClient(cc)
+
+	files = append(files, artifact.KeyedFile{Key: "worker", Filename: binary})
+
+	md, err := artifact.MultiStage(ctx, client, 10, files)
+	if err != nil {
+		return "", fmt.Errorf("failed to stage artifacts: %v", err)
+	}
+	token, err := artifact.Commit(ctx, client, md)
+	if err != nil {
+		return "", fmt.Errorf("failed to commit artifacts: %v", err)
+	}
+	return token, nil
+}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
new file mode 100644
index 00000000000..a64367968df
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package universal contains a general-purpose runner that can submit jobs
+// to any portable Beam runner.
+package universal
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
+	"github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
+	"github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
+)
+
+func init() {
+	// Note that we also _ import harness/init to setup the remote execution hook.
+	beam.RegisterRunner("universal", Execute)
+}
+
+// Execute executes the pipeline on a universal beam runner.
+func Execute(ctx context.Context, p *beam.Pipeline) error {
+	endpoint, err := jobopts.GetEndpoint()
+	if err != nil {
+		return err
+	}
+
+	edges, _, err := p.Build()
+	if err != nil {
+		return err
+	}
+	pipeline, err := graphx.Marshal(edges, &graphx.Options{ContainerImageURL: jobopts.GetContainerImage(ctx)})
+	if err != nil {
+		return fmt.Errorf("failed to generate model pipeline: %v", err)
+	}
+
+	opt := &runnerlib.JobOptions{
+		Name:               jobopts.GetJobName(),
+		Experiments:        jobopts.GetExperiments(),
+		InternalJavaRunner: *jobopts.InternalJavaRunner,
+	}
+	_, err = runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
+	return err
+}
diff --git a/sdks/go/pkg/beam/x/beamx/run.go b/sdks/go/pkg/beam/x/beamx/run.go
index 917c079f505..c6eaf294b4c 100644
--- a/sdks/go/pkg/beam/x/beamx/run.go
+++ b/sdks/go/pkg/beam/x/beamx/run.go
@@ -23,12 +23,14 @@ import (
 	"github.com/apache/beam/sdks/go/pkg/beam"
 	// Import the reflection-optimized runtime.
 	_ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/optimized"
-	// The imports here are for the side effect of runner registration.
 	_ "github.com/apache/beam/sdks/go/pkg/beam/io/textio/gcs"
 	_ "github.com/apache/beam/sdks/go/pkg/beam/io/textio/local"
+	// The imports here are for the side effect of runner registration.
 	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
 	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
 	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/dot"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
 )
 
 var runner = flag.String("runner", "direct", "Pipeline runner.")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82561)
    Time Spent: 4.5h  (was: 4h 20m)

> Go SDK support for portable pipelines
> -------------------------------------
>
>                 Key: BEAM-3287
>                 URL: https://issues.apache.org/jira/browse/BEAM-3287
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>            Priority: Major
>              Labels: portability
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The Go SDK should participate in the portability framework, incl. job submission w/ a
docker container image.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message