beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [beam] youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK
Date Tue, 24 Mar 2020 04:20:55 GMT
youngoli commented on a change in pull request #11197: [BEAM-8292] Portable Reshuffle for Go
SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396886566
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
 ##########
 @@ -473,6 +479,164 @@ func (m *marshaller) addNode(n *graph.Node) string {
 	return m.makeNode(id, m.coders.Add(n.Coder), n)
 }
 
+// expandReshuffle translates resharding to a composite reshuffle
+// transform.
+//
+// With proper runner support, the SDK doesn't need to do anything.
+// However, we still need to provide a backup plan in terms of other
+// PTransforms in the event the runner doesn't have a native implementation.
+//
+// In particular, the "backup plan" needs to:
+//
+//  * Encode the windowed element, preserving timestamps.
+//  * Add random keys to the encoded windowed element []bytes
+//  * GroupByKey (in the global window).
+//  * Explode the resulting elements list.
+//  * Decode the windowed element []bytes.
+//
+// While a simple reshard can be written in user terms, (timestamps and windows
+// are accessible to user functions) there are some framework internal
+// optimizations that can be done if the framework is aware of the reshard, though
+// ideally this is handled on the runner side.
+//
+// User code is able to write reshards, but it's easier to access
+// the window coders framework side, which is critical for the reshard
+// to function with unbounded inputs.
+func (m *marshaller) expandReshuffle(edge NamedEdge) string {
+	id := edgeID(edge.Edge)
+	var kvCoderID, gbkCoderID string
+	{
+		kv := makeUnionCoder()
+		kvCoderID = m.coders.Add(kv)
+		gbkCoderID = m.coders.Add(coder.NewCoGBK(kv.Components))
+	}
+
+	var subtransforms []string
+
+	in := edge.Edge.Input[0]
+
+	origInput := m.addNode(in.From)
+	// We need to preserve the old windowing/triggering here
+	// for re-instatement after the GBK.
+	preservedWSId := m.pcollections[origInput].GetWindowingStrategyId()
+
+	// Get the windowing strategy from before:
+	postReify := fmt.Sprintf("%v_%v_reifyts", nodeID(in.From), id)
+	m.makeNode(postReify, kvCoderID, in.From)
+
+	// We need to replace postReify's windowing strategy with one appropriate
+	// for reshuffles.
+	{
+		wfn := window.NewGlobalWindows()
+		m.pcollections[postReify].WindowingStrategyId =
+			m.internWindowingStrategy(&pb.WindowingStrategy{
+				// Not segregated by time...
+				WindowFn: makeWindowFn(wfn),
+				// ...output after every element is received...
+				Trigger: &pb.Trigger{
+					// Should this be an Always trigger instead?
+					Trigger: &pb.Trigger_ElementCount_{
+						ElementCount: &pb.Trigger_ElementCount{
+							ElementCount: 1,
+						},
+					},
+				},
+				// ...and after outputing, discard the output elements...
+				AccumulationMode: pb.AccumulationMode_DISCARDING,
+				// ...and since every pane should have 1 element,
+				// try to preserve the timestamp.
+				OutputTime: pb.OutputTime_EARLIEST_IN_PANE,
+				// Defaults copied from marshalWindowingStrategy.
+				// TODO(BEAM-3304): migrate to user side operations once trigger support is in.
+				EnvironmentId:   m.addDefaultEnv(),
+				MergeStatus:     pb.MergeStatus_NON_MERGING,
+				WindowCoderId:   m.coders.AddWindowCoder(makeWindowCoder(wfn)),
+				ClosingBehavior: pb.ClosingBehavior_EMIT_IF_NONEMPTY,
+				AllowedLateness: 0,
+				OnTimeBehavior:  pb.OnTimeBehavior_FIRE_ALWAYS,
+			})
+	}
+
+	// Inputs (i)
+
+	inputID := fmt.Sprintf("%v_reifyts", id)
+	payload := &pb.ParDoPayload{
+		DoFn: &pb.FunctionSpec{
+			Urn: URNReshuffleInput,
+			Payload: []byte(protox.MustEncodeBase64(&v1.TransformPayload{
+				Urn: URNReshuffleInput,
+			})),
+		},
+	}
+	input := &pb.PTransform{
+		UniqueName: inputID,
+		Spec: &pb.FunctionSpec{
+			Urn:     URNParDo,
+			Payload: protox.MustEncode(payload),
+		},
+		Inputs:        map[string]string{"i0": nodeID(in.From)},
+		Outputs:       map[string]string{"i0": postReify},
+		EnvironmentId: m.addDefaultEnv(),
+	}
+	m.transforms[inputID] = input
+	subtransforms = append(subtransforms, inputID)
+
+	outNode := edge.Edge.Output[0].To
+
+	// GBK
+
+	gbkOut := fmt.Sprintf("%v_out", nodeID(outNode))
+	m.makeNode(gbkOut, gbkCoderID, outNode)
+
+	gbkID := fmt.Sprintf("%v_gbk", id)
+	gbk := &pb.PTransform{
+		UniqueName: gbkID,
+		Spec:       &pb.FunctionSpec{Urn: URNGBK},
+		Inputs:     map[string]string{"i0": postReify},
 
 Review comment:
   Is this input supposed to be `postReify`? I would've expected `inputID` from the previous
step.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message