rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: add producer async method. resolve #98 (#100)
Date Mon, 08 Jul 2019 10:54:59 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 45cb340  add producer async method. resolve #98 (#100)
45cb340 is described below

commit 45cb34063443322ab550790eb15c5236bbfa335c
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Mon Jul 8 18:54:55 2019 +0800

    add producer async method. resolve #98 (#100)
---
 examples/producer/async/main.go       | 65 +++++++++++++++++++++++++++
 internal/kernel/client.go             |  7 +++
 internal/producer/producer.go         | 63 +++++++++++++++++++++++----
 internal/remote/future.go             | 82 +++++++++++++++++++++++++++++++++++
 internal/remote/remote_client.go      | 67 ++++------------------------
 internal/remote/remote_client_test.go | 49 +++++++++++++++++++++
 primitive/ctx.go                      |  1 +
 7 files changed, 268 insertions(+), 66 deletions(-)

diff --git a/examples/producer/async/main.go b/examples/producer/async/main.go
new file mode 100644
index 0000000..3888e32
--- /dev/null
+++ b/examples/producer/async/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"strconv"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/internal/producer"
+	"github.com/apache/rocketmq-client-go/primitive"
+)
+
+// Package main implements a async producer to send message.
+func main() {
+	nameServerAddr := []string{"127.0.0.1:9876"}
+	p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
+	err := p.Start()
+	if err != nil {
+		fmt.Printf("start producer error: %s", err.Error())
+		os.Exit(1)
+	}
+	var wg sync.WaitGroup
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		err := p.SendAsync(context.Background(), &primitive.Message{
+			Topic:      "TopicTest",
+			Body:       []byte("Hello RocketMQ Go Client!"),
+			Properties: map[string]string{"id": strconv.Itoa(i)},
+		}, func(ctx context.Context, result *primitive.SendResult, e error) {
+			if e != nil {
+				fmt.Printf("receive message error: %s\n", err)
+			} else {
+				fmt.Printf("send message success: result=%s\n", result.String())
+			}
+			wg.Done()
+		})
+
+		if err != nil {
+			fmt.Printf("send message error: %s\n", err)
+		}
+	}
+	wg.Wait()
+	err = p.Shutdown()
+	if err != nil {
+		fmt.Printf("shundown producer error: %s", err.Error())
+	}
+}
diff --git a/internal/kernel/client.go b/internal/kernel/client.go
index 94f45c9..5aa6849 100644
--- a/internal/kernel/client.go
+++ b/internal/kernel/client.go
@@ -178,6 +178,13 @@ func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
 	return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
 }
 
+func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+	timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
+	return c.remoteClient.InvokeAsync(addr, request, timeoutMillis, func(future *remote.ResponseFuture)
{
+		f(future.ResponseCommand, future.Err)
+	})
+}
+
 func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
 	timeoutMillis time.Duration) error {
 	return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index 5be553e..f7c87d1 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -31,10 +31,16 @@ import (
 	"github.com/pkg/errors"
 )
 
+var(
+	ErrTopicEmpty = errors.New("topic is nil")
+	ErrMessageEmpty = errors.New("message is nil")
+)
+
 type Producer interface {
 	Start() error
 	Shutdown() error
 	SendSync(context.Context, *primitive.Message) (*primitive.SendResult, error)
+	SendAsync(context.Context, *primitive.Message, func(context.Context, *primitive.SendResult,
error)) error
 	SendOneWay(context.Context, *primitive.Message) error
 }
 
@@ -108,13 +114,20 @@ func (p *defaultProducer) Shutdown() error {
 	return nil
 }
 
-func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message) (*primitive.SendResult,
error) {
+func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
 	if msg == nil {
-		return nil, errors.New("message is nil")
+		return errors.New("message is nil")
 	}
 
 	if msg.Topic == "" {
-		return nil, errors.New("topic is nil")
+		return errors.New("topic is nil")
+	}
+	return nil
+}
+
+func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message) (*primitive.SendResult,
error) {
+	if err := p.checkMsg(msg); err != nil {
+		return nil, err
 	}
 
 	resp := new(primitive.SendResult)
@@ -165,13 +178,47 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 	return err
 }
 
-func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message) error {
-	if msg == nil {
-		return errors.New("message is nil")
+func (p *defaultProducer) SendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context,
*primitive.SendResult, error)) error {
+	if err := p.checkMsg(msg); err != nil {
+		return err
 	}
 
-	if msg.Topic == "" {
-		return errors.New("topic is nil")
+	if p.interceptor != nil {
+		primitive.WithMehod(ctx, primitive.SendAsync)
+
+		return p.interceptor(ctx, msg, nil, func(ctx context.Context, req, reply interface{}) error
{
+			return p.sendAsync(ctx, msg, h)
+		})
+	}
+	return p.sendAsync(ctx, msg, h)
+}
+
+func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context,
*primitive.SendResult, error)) error {
+
+	mq := p.selectMessageQueue(msg.Topic)
+	if mq == nil {
+		return errors.Errorf("the topic=%s route info not found", msg.Topic)
+	}
+
+	addr := kernel.FindBrokerAddrByName(mq.BrokerName)
+	if addr == "" {
+		return errors.Errorf("topic=%s route info not found", mq.Topic)
+	}
+
+	return p.client.InvokeAsync(addr, p.buildSendRequest(mq, msg), 3*time.Second, func(command
*remote.RemotingCommand, e error) {
+		if e != nil {
+			h(ctx, nil, e)
+			return
+		}
+		resp := new(primitive.SendResult)
+		p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
+		h(ctx, resp, e)
+	})
+}
+
+func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message) error {
+	if err := p.checkMsg(msg); err != nil {
+		return err
 	}
 
 	if p.interceptor != nil {
diff --git a/internal/remote/future.go b/internal/remote/future.go
new file mode 100644
index 0000000..8f604cc
--- /dev/null
+++ b/internal/remote/future.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remote
+
+import (
+	"sync"
+	"time"
+)
+
+// ResponseFuture 
+type ResponseFuture struct {
+	ResponseCommand *RemotingCommand
+	SendRequestOK   bool
+	Err             error
+	Opaque          int32
+	TimeoutMillis   time.Duration
+	callback        func(*ResponseFuture)
+	BeginTimestamp  int64
+	Done            chan bool
+	callbackOnce    sync.Once
+}
+
+// NewResponseFuture create ResponseFuture with opaque, timeout and callback
+func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback func(*ResponseFuture))
*ResponseFuture {
+	return &ResponseFuture{
+		Opaque:         opaque,
+		Done:           make(chan bool),
+		TimeoutMillis:  timeoutMillis,
+		callback:       callback,
+		BeginTimestamp: time.Now().Unix() * 1000,
+	}
+}
+
+func (r *ResponseFuture) executeInvokeCallback() {
+	r.callbackOnce.Do(func() {
+		if r.callback != nil {
+			r.callback(r)
+		}
+	})
+}
+
+func (r *ResponseFuture) isTimeout() bool {
+	diff := time.Now().Unix()*1000 - r.BeginTimestamp
+	return diff > int64(r.TimeoutMillis)
+}
+
+func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
+	var (
+		cmd *RemotingCommand
+		err error
+	)
+	timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
+	for {
+		select {
+		case <-r.Done:
+			cmd, err = r.ResponseCommand, r.Err
+			goto done
+		case <-timer.C:
+			err = ErrRequestTimeout
+			r.Err = err
+			goto done
+		}
+	}
+done:
+	timer.Stop()
+	return cmd, err
+}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index cf71134..fde17e4 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -35,64 +35,6 @@ var (
 	connectionLocker  sync.Mutex
 )
 
-//ResponseFuture for
-type ResponseFuture struct {
-	ResponseCommand *RemotingCommand
-	SendRequestOK   bool
-	Err             error
-	Opaque          int32
-	TimeoutMillis   time.Duration
-	callback        func(*ResponseFuture)
-	BeginTimestamp  int64
-	Done            chan bool
-	callbackOnce    sync.Once
-}
-
-//NewResponseFuture create ResponseFuture with opaque, timeout and callback
-func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback func(*ResponseFuture))
*ResponseFuture {
-	return &ResponseFuture{
-		Opaque:         opaque,
-		Done:           make(chan bool),
-		TimeoutMillis:  timeoutMillis,
-		callback:       callback,
-		BeginTimestamp: time.Now().Unix() * 1000,
-	}
-}
-
-func (r *ResponseFuture) executeInvokeCallback() {
-	r.callbackOnce.Do(func() {
-		if r.callback != nil {
-			r.callback(r)
-		}
-	})
-}
-
-func (r *ResponseFuture) isTimeout() bool {
-	diff := time.Now().Unix()*1000 - r.BeginTimestamp
-	return diff > int64(r.TimeoutMillis)
-}
-
-func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
-	var (
-		cmd *RemotingCommand
-		err error
-	)
-	timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
-	for {
-		select {
-		case <-r.Done:
-			cmd, err = r.ResponseCommand, r.Err
-			goto done
-		case <-timer.C:
-			err = ErrRequestTimeout
-			goto done
-		}
-	}
-done:
-	timer.Stop()
-	return cmd, err
-}
-
 type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
 
 type TcpOption struct {
@@ -116,6 +58,7 @@ func (c *RemotingClient) RegisterRequestFunc(code int16, f ClientRequestFunc)
{
 	c.processors[code] = f
 }
 
+// TODO: merge sync and async model. sync should run on async model by blocking on chan
 func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis
time.Duration) (*RemotingCommand, error) {
 	conn, err := c.connect(addr)
 	if err != nil {
@@ -132,6 +75,7 @@ func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand,
timeo
 	return resp.waitResponse()
 }
 
+// InvokeAsync send request witout blocking, just return immediately.
 func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis
time.Duration, callback func(*ResponseFuture)) error {
 	conn, err := c.connect(addr)
 	if err != nil {
@@ -144,8 +88,15 @@ func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand,
time
 		return err
 	}
 	resp.SendRequestOK = true
+	go c.receiveAsync(resp)
 	return nil
+}
 
+func (c *RemotingClient) receiveAsync(f *ResponseFuture) {
+	_, err := f.waitResponse()
+	if err != nil {
+		f.executeInvokeCallback()
+	}
 }
 
 func (c *RemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration)
error {
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index c999c0e..96e8f1b 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -24,6 +24,8 @@ import (
 	"sync"
 	"testing"
 	"time"
+
+	"github.com/stretchr/testify/assert"
 )
 
 func TestNewResponseFuture(t *testing.T) {
@@ -278,6 +280,53 @@ done:
 	wg.Wait()
 }
 
+func TestInvokeAsyncTimeout(t *testing.T) {
+	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
+	serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
+	serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+	serverSendRemotingCommand.Flag = ResponseType
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	client := NewRemotingClient()
+
+	var clientSend sync.WaitGroup  // blocking client send message until the server listen success.
+	clientSend.Add(1)
+	go func() {
+		clientSend.Wait()
+		err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+			time.Duration(1000), func(r *ResponseFuture) {
+				assert.NotNil(t, r.Err)
+				assert.Equal(t, ErrRequestTimeout, r.Err)
+				wg.Done()
+			})
+		assert.Nil(t, err, "failed to invokeSync.")
+	}()
+
+	l, err := net.Listen("tcp", ":3000")
+	assert.Nil(t, err)
+	defer l.Close()
+	clientSend.Done()
+
+	for {
+		conn, err := l.Accept()
+		assert.Nil(t, err)
+		defer conn.Close()
+
+		scanner := client.createScanner(conn)
+		for scanner.Scan() {
+			t.Logf("receive request.")
+			_, err := decode(scanner.Bytes())
+			assert.Nil(t, err, "failed to decode RemotingCommnad.")
+
+			time.Sleep(5 * time.Second) // force client timeout
+			goto done
+		}
+	}
+done:
+	wg.Wait()
+}
+
 func TestInvokeOneWay(t *testing.T) {
 	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
 
diff --git a/primitive/ctx.go b/primitive/ctx.go
index e74f91a..812ffd6 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -32,6 +32,7 @@ const (
 	// method name in  producer
 	SendSync = "SendSync"
 	SendOneway = "SendOneway"
+	SendAsync = "SendAsync"
 	// method name in consumer
 	ConsumerPush = "ConsumerPush"
 	ConsumerPull = "ConsumerPull"


Mime
View raw message