beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: [BEAM-3306] Hash keys when lifting
Date Tue, 12 Feb 2019 00:56:13 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b5877a  [BEAM-3306] Hash keys when lifting
     new 1f2b3f7  Merge pull request #7812 from lostluck/keylift
0b5877a is described below

commit 0b5877a79f0956b656261f2270e167329ec7b341
Author: Robert Burke <robert@frantil.com>
AuthorDate: Mon Feb 11 20:42:09 2019 +0000

    [BEAM-3306] Hash keys when lifting
---
 sdks/go/pkg/beam/core/runtime/exec/combine.go      |  26 +++-
 sdks/go/pkg/beam/core/runtime/exec/combine_test.go | 108 ++++++++++----
 sdks/go/pkg/beam/core/runtime/exec/hash.go         | 149 +++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/exec/hash_test.go    | 158 +++++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/exec/translate.go    |  14 +-
 5 files changed, 422 insertions(+), 33 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index 067f852..3464444 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -24,6 +24,7 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
@@ -258,20 +259,31 @@ func (n *Combine) String() string {
 // FinishBundle step.
 type LiftedCombine struct {
 	*Combine
+	KeyCoder *coder.Coder
 
-	cache map[interface{}]FullValue
+	keyHash elementHasher
+	cache   map[uint64]FullValue
 }
 
 func (n *LiftedCombine) String() string {
 	return fmt.Sprintf("LiftedCombine[%v] Keyed:%v Out:%v", path.Base(n.Fn.Name()), n.UsesKey,
n.Out.ID())
 }
 
+// Up initializes the LiftedCombine.
+func (n *LiftedCombine) Up(ctx context.Context) error {
+	if err := n.Combine.Up(ctx); err != nil {
+		return err
+	}
+	n.keyHash = makeElementHasher(n.KeyCoder)
+	return nil
+}
+
 // StartBundle initializes the in memory cache of keys to accumulators.
 func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data DataContext) error
{
 	if err := n.Combine.StartBundle(ctx, id, data); err != nil {
 		return err
 	}
-	n.cache = make(map[interface{}]FullValue)
+	n.cache = make(map[uint64]FullValue)
 	return nil
 }
 
@@ -282,10 +294,14 @@ func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue,
val
 		return fmt.Errorf("invalid status for precombine %v: %v", n.UID, n.status)
 	}
 
+	key, err := n.keyHash.Hash(value.Elm)
+	if err != nil {
+		return n.fail(err)
+	}
 	// Value is a KV so Elm & Elm2 are populated.
 	// Check the cache for an already present accumulator
 
-	afv, notfirst := n.cache[value.Elm]
+	afv, notfirst := n.cache[key]
 	var a interface{}
 	if notfirst {
 		a = afv.Elm2
@@ -297,13 +313,13 @@ func (n *LiftedCombine) ProcessElement(ctx context.Context, value FullValue,
val
 		a = b
 	}
 
-	a, err := n.addInput(ctx, a, value.Elm, value.Elm2, value.Timestamp, !notfirst)
+	a, err = n.addInput(ctx, a, value.Elm, value.Elm2, value.Timestamp, !notfirst)
 	if err != nil {
 		return n.fail(err)
 	}
 
 	// Cache the accumulator with the key
-	n.cache[value.Elm] = FullValue{Windows: value.Windows, Elm: value.Elm, Elm2: a, Timestamp:
value.Timestamp}
+	n.cache[key] = FullValue{Windows: value.Windows, Elm: value.Elm, Elm2: a, Timestamp: value.Timestamp}
 
 	return nil
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
index de26391..e1b4be7 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
@@ -17,6 +17,7 @@ package exec
 
 import (
 	"context"
+	"encoding/binary"
 	"fmt"
 	"reflect"
 	"runtime"
@@ -61,7 +62,7 @@ func fnName(x interface{}) string {
 func TestCombine(t *testing.T) {
 	for _, test := range tests {
 		t.Run(fnName(test.Fn), func(t *testing.T) {
-			edge := getCombineEdge(t, test.Fn, test.AccumCoder)
+			edge := getCombineEdge(t, test.Fn, reflectx.Int, test.AccumCoder)
 
 			out := &CaptureNode{UID: 1}
 			combine := &Combine{UID: 2, Fn: edge.CombineFn, Out: out}
@@ -80,27 +81,75 @@ func TestCombine(t *testing.T) {
 // TestLiftedCombine verifies that the LiftedCombine, MergeAccumulators, and
 // ExtractOutput nodes work correctly after the lift has been performed.
 func TestLiftedCombine(t *testing.T) {
-	for _, test := range tests {
-		t.Run(fnName(test.Fn), func(t *testing.T) {
-			edge := getCombineEdge(t, test.Fn, test.AccumCoder)
+	withCoder := func(t *testing.T, suffix string, key interface{}, keyCoder *coder.Coder) {
+		for _, test := range tests {
+			t.Run(fnName(test.Fn)+"_"+suffix, func(t *testing.T) {
+				edge := getCombineEdge(t, test.Fn, reflectx.Int, test.AccumCoder)
+
+				out := &CaptureNode{UID: 1}
+				extract := &ExtractOutput{Combine: &Combine{UID: 2, Fn: edge.CombineFn, Out:
out}}
+				merge := &MergeAccumulators{Combine: &Combine{UID: 3, Fn: edge.CombineFn, Out:
extract}}
+				gbk := &simpleGBK{UID: 4, KeyCoder: keyCoder, Out: merge}
+				precombine := &LiftedCombine{Combine: &Combine{UID: 5, Fn: edge.CombineFn, Out:
gbk}, KeyCoder: keyCoder}
+				n := &FixedRoot{UID: 6, Elements: makeKVInput(key, test.Input...), Out: precombine}
+
+				constructAndExecutePlan(t, []Unit{n, precombine, gbk, merge, extract, out})
+				expected := makeKV(key, test.Expected)
+				if !equalList(out.Elements, expected) {
+					t.Errorf("liftedCombineChain(%s) = %v, want %v", edge.CombineFn.Name(), extractKeyedValues(out.Elements...),
extractKeyedValues(expected...))
+				}
+			})
+		}
+	}
+	withCoder(t, "intKeys", 42, intCoder(reflectx.Int))
+	withCoder(t, "int64Keys", int64(42), intCoder(reflectx.Int64))
 
-			out := &CaptureNode{UID: 1}
-			extract := &ExtractOutput{Combine: &Combine{UID: 2, Fn: edge.CombineFn, Out: out}}
-			merge := &MergeAccumulators{Combine: &Combine{UID: 3, Fn: edge.CombineFn, Out:
extract}}
-			gbk := &simpleGBK{UID: 4, Out: merge}
-			precombine := &LiftedCombine{Combine: &Combine{UID: 5, Fn: edge.CombineFn, Out:
gbk}}
-			n := &FixedRoot{UID: 6, Elements: makeKVInput(42, test.Input...), Out: precombine}
+	cc, err := coder.NewCustomCoder("codable", myCodableType, codableEncoder, codableDecoder)
+	if err != nil {
+		t.Fatalf("%v", err)
+	}
+	withCoder(t, "pointerKeys", &myCodable{42}, &coder.Coder{Kind: coder.Custom, T:
typex.New(myCodableType), Custom: cc})
 
-			constructAndExecutePlan(t, []Unit{n, precombine, gbk, merge, extract, out})
-			expected := makeKV(42, test.Expected)
-			if !equalList(out.Elements, expected) {
-				t.Errorf("liftedCombineChain(%s) = %v, want %v", edge.CombineFn.Name(), extractKeyedValues(out.Elements...),
extractKeyedValues(expected...))
-			}
-		})
+}
+
+type codable interface {
+	EncodeMe() []byte
+	DecodeMe([]byte)
+}
+
+func codableEncoder(v codable) []byte {
+	return v.EncodeMe()
+}
+
+var myCodableType = reflect.TypeOf((*myCodable)(nil))
+
+func codableDecoder(t reflect.Type, b []byte) codable {
+	var v codable
+	switch t {
+	case myCodableType:
+		v = &myCodable{}
+	default:
+		panic("don't know this type" + t.String())
 	}
+	v.DecodeMe(b)
+	return v
 }
 
-func getCombineEdge(t *testing.T, cfn interface{}, ac *coder.Coder) *graph.MultiEdge {
+type myCodable struct {
+	val uint64
+}
+
+func (c *myCodable) EncodeMe() []byte {
+	data := make([]byte, 8)
+	binary.LittleEndian.PutUint64(data, c.val)
+	return data
+}
+
+func (c *myCodable) DecodeMe(b []byte) {
+	c.val = binary.LittleEndian.Uint64(b)
+}
+
+func getCombineEdge(t *testing.T, cfn interface{}, kt reflect.Type, ac *coder.Coder) *graph.MultiEdge
{
 	t.Helper()
 	fn, err := graph.NewCombineFn(cfn)
 	if err != nil {
@@ -115,7 +164,7 @@ func getCombineEdge(t *testing.T, cfn interface{}, ac *coder.Coder) *graph.Multi
 	} else {
 		vtype = fn.MergeAccumulatorsFn().Param[1].T
 	}
-	inT := typex.NewCoGBK(typex.New(reflectx.Int), typex.New(vtype))
+	inT := typex.NewCoGBK(typex.New(kt), typex.New(vtype))
 	in := g.NewNode(inT, window.DefaultWindowingStrategy(), true)
 
 	edge, err := graph.NewCombine(g, g.Root(), fn, in, ac)
@@ -269,11 +318,12 @@ func intCoder(t reflect.Type) *coder.Coder {
 
 // simpleGBK buffers all input and continues on FinishBundle. Use with small single-bundle
data only.
 type simpleGBK struct {
-	UID  UnitID
-	Edge *graph.MultiEdge
-	Out  Node
+	UID      UnitID
+	Out      Node
+	KeyCoder *coder.Coder
 
-	m map[interface{}]*group
+	hasher elementHasher
+	m      map[uint64]*group
 }
 
 type group struct {
@@ -286,7 +336,8 @@ func (n *simpleGBK) ID() UnitID {
 }
 
 func (n *simpleGBK) Up(ctx context.Context) error {
-	n.m = make(map[interface{}]*group)
+	n.m = make(map[uint64]*group)
+	n.hasher = makeElementHasher(n.KeyCoder)
 	return nil
 }
 
@@ -295,16 +346,19 @@ func (n *simpleGBK) StartBundle(ctx context.Context, id string, data
DataContext
 }
 
 func (n *simpleGBK) ProcessElement(ctx context.Context, elm FullValue, _ ...ReStream) error
{
-	key := elm.Elm.(int)
+	key := elm.Elm
 	value := elm.Elm2
-
-	g, ok := n.m[key]
+	keyHash, err := n.hasher.Hash(key)
+	if err != nil {
+		return err
+	}
+	g, ok := n.m[keyHash]
 	if !ok {
 		g = &group{
 			key:    FullValue{Elm: key, Timestamp: elm.Timestamp, Windows: elm.Windows},
 			values: make([]FullValue, 0),
 		}
-		n.m[key] = g
+		n.m[keyHash] = g
 	}
 	g.values = append(g.values, FullValue{Elm: value, Timestamp: elm.Timestamp})
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/hash.go b/sdks/go/pkg/beam/core/runtime/exec/hash.go
new file mode 100644
index 0000000..77c45d6
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/hash.go
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"fmt"
+	"hash"
+	"hash/fnv"
+	"math"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+)
+
+// Infrastructure for hashing values for lifted combines.
+
+type elementHasher interface {
+	Hash(element interface{}) (uint64, error)
+}
+
+func makeElementHasher(c *coder.Coder) elementHasher {
+	// TODO(lostluck): move to a faster hashing library once we can take dependencies easily.
+	hasher := fnv.New64a()
+	switch c.Kind {
+	case coder.Bytes:
+		return &bytesHasher{hash: hasher}
+
+	case coder.VarInt:
+		return &numberHasher{}
+
+	case coder.Custom:
+		// Shortcut for primitives where we know we can do better.
+		switch c.Custom.Type {
+		case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32, reflectx.Int64,
+			reflectx.Uint, reflectx.Uint8, reflectx.Uint16, reflectx.Uint32, reflectx.Uint64,
+			reflectx.Float32, reflectx.Float64:
+			return &numberHasher{}
+		case reflectx.String:
+			return &stringHasher{hash: hasher}
+		}
+		// TODO(lostluck): 2019.02.07 - consider supporting encoders that
+		// take in a io.Writer instead.
+		return &customEncodedHasher{
+			hash:  hasher,
+			t:     c.Custom.Type,
+			coder: makeEncoder(c.Custom.Enc.Fn),
+		}
+	default:
+		panic(fmt.Sprintf("Unexpected coder for hashing: %v", c))
+	}
+}
+
+type bytesHasher struct {
+	hash hash.Hash64
+}
+
+func (h *bytesHasher) Hash(element interface{}) (uint64, error) {
+	h.hash.Reset()
+	h.hash.Write(element.([]byte))
+	return h.hash.Sum64(), nil
+}
+
+type stringHasher struct {
+	hash hash.Hash64
+}
+
+func (h *stringHasher) Hash(element interface{}) (uint64, error) {
+	h.hash.Reset()
+	s := element.(string)
+	var b [64]byte
+	l := len(s)
+	i := 0
+	for len(s)-i > 64 {
+		n := i + 64
+		copy(b[:], s[i:n])
+		h.hash.Write(b[:])
+		i = n
+	}
+	n := l - i
+	copy(b[:], s[i:])
+	h.hash.Write(b[:n])
+	return h.hash.Sum64(), nil
+}
+
+type numberHasher struct {
+}
+
+func (h *numberHasher) Hash(element interface{}) (uint64, error) {
+	var val uint64
+	switch n := element.(type) {
+	case int:
+		val = uint64(n)
+	case int8:
+		val = uint64(n)
+	case int16:
+		val = uint64(n)
+	case int32:
+		val = uint64(n)
+	case int64:
+		val = uint64(n)
+	case uint:
+		val = uint64(n)
+	case uint8:
+		val = uint64(n)
+	case uint16:
+		val = uint64(n)
+	case uint32:
+		val = uint64(n)
+	case uint64:
+		val = n
+	case float64:
+		val = math.Float64bits(n)
+	case float32:
+		val = uint64(math.Float64bits(float64(n)))
+	default:
+		panic(fmt.Sprintf("received unknown value type: want a number:, got %T", n))
+	}
+	return val, nil
+}
+
+type customEncodedHasher struct {
+	hash  hash.Hash64
+	t     reflect.Type
+	coder Encoder
+}
+
+func (h *customEncodedHasher) Hash(element interface{}) (uint64, error) {
+	h.hash.Reset()
+	b, err := h.coder.Encode(h.t, element)
+	if err != nil {
+		return 0, err
+	}
+	h.hash.Write(b)
+	return h.hash.Sum64(), nil
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/hash_test.go b/sdks/go/pkg/beam/core/runtime/exec/hash_test.go
new file mode 100644
index 0000000..babf7d7
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/hash_test.go
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"encoding/json"
+	"fmt"
+	"hash/fnv"
+	"reflect"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+)
+
+func BenchmarkPrimitives(b *testing.B) {
+	var value FullValue
+	myHash := fnv.New64a()
+	b.Run("int", func(b *testing.B) {
+		test := interface{}(int(42424242))
+		b.Run("native", func(b *testing.B) {
+			m := make(map[int]FullValue)
+			for i := 0; i < b.N; i++ {
+				k := test.(int)
+				value = m[k]
+				m[k] = value
+			}
+		})
+		cc, err := coderx.NewVarIntZ(reflectx.Int)
+		if err != nil {
+			b.Fatal(err)
+		}
+		encoded := &customEncodedHasher{hash: myHash, coder: makeEncoder(cc.Enc.Fn)}
+		dedicated := &numberHasher{}
+		hashbench(b, test, encoded, dedicated)
+	})
+	b.Run("float32", func(b *testing.B) {
+		test := interface{}(float32(42424242.242424))
+		b.Run("native", func(b *testing.B) {
+			m := make(map[float32]FullValue)
+			for i := 0; i < b.N; i++ {
+				k := test.(float32)
+				value = m[k]
+				m[k] = value
+			}
+		})
+		cc, err := coderx.NewFloat(reflectx.Float32)
+		if err != nil {
+			b.Fatal(err)
+		}
+		encoded := &customEncodedHasher{hash: myHash, coder: makeEncoder(cc.Enc.Fn)}
+		dedicated := &numberHasher{}
+		hashbench(b, test, encoded, dedicated)
+	})
+
+	b.Run("string", func(b *testing.B) {
+		tests := []interface{}{
+			"I am the very model of a modern major string.",
+			"this is 10",
+			strings.Repeat("100 chars!", 10),   // 100
+			strings.Repeat("1k chars!!", 100),  // 1000
+			strings.Repeat("10k chars!", 1000), // 10000
+		}
+		for _, test := range tests {
+			b.Run(fmt.Sprint(len(test.(string))), func(b *testing.B) {
+				b.Run("native", func(b *testing.B) {
+					m := make(map[string]FullValue)
+					for i := 0; i < b.N; i++ {
+						k := test.(string)
+						value = m[k]
+						m[k] = value
+					}
+				})
+
+				cc, err := coderx.NewString()
+				if err != nil {
+					b.Fatal(err)
+				}
+				encoded := &customEncodedHasher{hash: myHash, coder: makeEncoder(cc.Enc.Fn)}
+				dedicated := &stringHasher{hash: myHash}
+				hashbench(b, test, encoded, dedicated)
+			})
+		}
+	})
+	b.Run("struct", func(b *testing.B) {
+		tests := []interface{}{
+			struct {
+				A      int
+				B      string
+				Foobar [4]int
+			}{A: 56, B: "stringtastic", Foobar: [4]int{4, 2, 3, 1}},
+		}
+		for _, test := range tests {
+			typ := reflect.TypeOf(test)
+			b.Run(fmt.Sprint(typ.String()), func(b *testing.B) {
+				encoded := &customEncodedHasher{hash: myHash, coder: &jsonEncoder{}}
+				hashbench(b, test, encoded, nil)
+			})
+		}
+	})
+}
+
+type jsonEncoder struct{}
+
+func (*jsonEncoder) Encode(t reflect.Type, element interface{}) ([]byte, error) {
+	return json.Marshal(element)
+}
+
+func hashbench(b *testing.B, test interface{}, encoded, dedicated elementHasher) {
+	var value FullValue
+	b.Run("interface", func(b *testing.B) {
+		m := make(map[interface{}]FullValue)
+		for i := 0; i < b.N; i++ {
+			k := test
+			value = m[k]
+			m[k] = value
+		}
+	})
+	b.Run("encodedHash", func(b *testing.B) {
+		m := make(map[uint64]FullValue)
+		for i := 0; i < b.N; i++ {
+			k, err := encoded.Hash(test)
+			if err != nil {
+				b.Fatal(err)
+			}
+			value = m[k]
+			m[k] = value
+		}
+	})
+	if dedicated == nil {
+		return
+	}
+	b.Run("dedicatedHash", func(b *testing.B) {
+		m := make(map[uint64]FullValue)
+		for i := 0; i < b.N; i++ {
+			k, err := dedicated.Hash(test)
+			if err != nil {
+				b.Fatal(err)
+			}
+			value = m[k]
+			m[k] = value
+		}
+	})
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 2829b25..031ac82 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -412,9 +412,21 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 					return nil, err
 				}
 				cn.UsesKey = typex.IsKV(in[0].Type)
+
 				switch urn {
 				case urnPerKeyCombinePre:
-					u = &LiftedCombine{Combine: cn}
+					inputs := unmarshalKeyedValues(transform.GetInputs())
+					if len(inputs) != 1 {
+						return nil, fmt.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
+					}
+					ec, _, err := b.makeCoderForPCollection(inputs[0])
+					if err != nil {
+						return nil, err
+					}
+					if !coder.IsKV(ec) {
+						return nil, fmt.Errorf("unexpected non-KV coder PCollection input to combine: %v",
ec)
+					}
+					u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0]}
 				case urnPerKeyCombineMerge:
 					u = &MergeAccumulators{Combine: cn}
 				case urnPerKeyCombineExtract:


Mime
View raw message