rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq-client-go] branch master updated: [ISSUE #514] feature: add admintool, createTopic & deleteTopic (#515)
Date Mon, 19 Oct 2020 12:10:16 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cf97e1  [ISSUE #514] feature: add admintool, createTopic & deleteTopic (#515)
9cf97e1 is described below

commit 9cf97e115816a37c3426552df9601f50d3637639
Author: Shuyang Wu <wosoyoung@gmail.com>
AuthorDate: Mon Oct 19 20:09:55 2020 +0800

    [ISSUE #514] feature: add admintool, createTopic & deleteTopic (#515)
    
    * feature: createTopic & deleteTopic, fix #514, fix #502
    
    * update deleteTopic to adapt sarama
    
    * move test to example; would mock and test in new PR
    
    * fix typo
    
    * rm commented code
    
    * update intro.md
    
    * rm commented code
    
    * rm unused code
    
    * add info/error to rlog
---
 admin/admin.go               | 202 +++++++++++++++++++++++++++++++++++++++++++
 admin/option.go              | 131 ++++++++++++++++++++++++++++
 docs/Introduction.md         |  25 ++++++
 docs/feature.md              |  15 +++-
 examples/admin/topic/main.go |  64 ++++++++++++++
 internal/request.go          | 127 +++++++++++++++++++++++----
 6 files changed, 544 insertions(+), 20 deletions(-)

diff --git a/admin/admin.go b/admin/admin.go
new file mode 100644
index 0000000..f45f39a
--- /dev/null
+++ b/admin/admin.go
@@ -0,0 +1,202 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package admin
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+type Admin interface {
+	CreateTopic(ctx context.Context, opts ...OptionCreate) error
+	DeleteTopic(ctx context.Context, opts ...OptionDelete) error
+	//TODO
+	//TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error)
+	//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
+	Close() error
+}
+
+// TODO: move outdated context to ctx
+type adminOptions struct {
+	internal.ClientOptions
+}
+
+type AdminOption func(options *adminOptions)
+
+func defaultAdminOptions() *adminOptions {
+	opts := &adminOptions{
+		ClientOptions: internal.DefaultClientOptions(),
+	}
+	opts.GroupName = "TOOLS_ADMIN"
+	opts.InstanceName = time.Now().String()
+	return opts
+}
+
+// WithResolver nameserver resolver to fetch nameserver addr
+func WithResolver(resolver primitive.NsResolver) AdminOption {
+	return func(options *adminOptions) {
+		options.Resolver = resolver
+	}
+}
+
+type admin struct {
+	cli     internal.RMQClient
+	namesrv internal.Namesrvs
+
+	opts *adminOptions
+
+	closeOnce sync.Once
+}
+
+// NewAdmin initialize admin
+func NewAdmin(opts ...AdminOption) (Admin, error) {
+	defaultOpts := defaultAdminOptions()
+	for _, opt := range opts {
+		opt(defaultOpts)
+	}
+
+	cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	namesrv, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, err
+	}
+	//log.Printf("Client: %#v", namesrv.srvs)
+	return &admin{
+		cli:     cli,
+		namesrv: namesrv,
+		opts:    defaultOpts,
+	}, nil
+}
+
+// CreateTopic create topic.
+// TODO: another implementation like sarama, without brokerAddr as input
+func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
+	cfg := defaultTopicConfigCreate()
+	for _, apply := range opts {
+		apply(&cfg)
+	}
+
+	request := &internal.CreateTopicRequestHeader{
+		Topic:           cfg.Topic,
+		DefaultTopic:    cfg.DefaultTopic,
+		ReadQueueNums:   cfg.ReadQueueNums,
+		WriteQueueNums:  cfg.WriteQueueNums,
+		Perm:            cfg.Perm,
+		TopicFilterType: cfg.TopicFilterType,
+		TopicSysFlag:    cfg.TopicSysFlag,
+		Order:           cfg.Order,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil)
+	_, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
+	if err != nil {
+		rlog.Error("create topic error", map[string]interface{}{
+			rlog.LogKeyTopic:         cfg.Topic,
+			rlog.LogKeyBroker:        cfg.BrokerAddr,
+			rlog.LogKeyUnderlayError: err,
+		})
+	} else {
+		rlog.Info("create topic success", map[string]interface{}{
+			rlog.LogKeyTopic:  cfg.Topic,
+			rlog.LogKeyBroker: cfg.BrokerAddr,
+		})
+	}
+	return err
+}
+
+// DeleteTopicInBroker delete topic in broker.
+func (a *admin) deleteTopicInBroker(ctx context.Context, topic string, brokerAddr string)
(*remote.RemotingCommand, error) {
+	request := &internal.DeleteTopicRequestHeader{
+		Topic: topic,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInBroker, request, nil)
+	return a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second)
+}
+
+// DeleteTopicInNameServer delete topic in nameserver.
+func (a *admin) deleteTopicInNameServer(ctx context.Context, topic string, nameSrvAddr string)
(*remote.RemotingCommand, error) {
+	request := &internal.DeleteTopicRequestHeader{
+		Topic: topic,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInNameSrv, request, nil)
+	return a.cli.InvokeSync(ctx, nameSrvAddr, cmd, 5*time.Second)
+}
+
+// DeleteTopic delete topic in both broker and nameserver.
+func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
+	cfg := defaultTopicConfigDelete()
+	for _, apply := range opts {
+		apply(&cfg)
+	}
+	//delete topic in broker
+	if cfg.BrokerAddr == "" {
+		a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+		cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic)
+	}
+
+	if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
+		if err != nil {
+			rlog.Error("delete topic in broker error", map[string]interface{}{
+				rlog.LogKeyTopic:         cfg.Topic,
+				rlog.LogKeyBroker:        cfg.BrokerAddr,
+				rlog.LogKeyUnderlayError: err,
+			})
+		}
+		return err
+	}
+
+	//delete topic in nameserver
+	if len(cfg.NameSrvAddr) == 0 {
+		a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+		cfg.NameSrvAddr = a.namesrv.AddrList()
+	}
+
+	for _, nameSrvAddr := range cfg.NameSrvAddr {
+		if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil {
+			if err != nil {
+				rlog.Error("delete topic in name server error", map[string]interface{}{
+					rlog.LogKeyTopic:         cfg.Topic,
+					"nameServer":             nameSrvAddr,
+					rlog.LogKeyUnderlayError: err,
+				})
+			}
+			return err
+		}
+	}
+	rlog.Info("delete topic success", map[string]interface{}{
+		rlog.LogKeyTopic:  cfg.Topic,
+		rlog.LogKeyBroker: cfg.BrokerAddr,
+		"nameServer":      cfg.NameSrvAddr,
+	})
+	return nil
+}
+
+func (a *admin) Close() error {
+	a.closeOnce.Do(func() {
+		a.cli.Shutdown()
+	})
+	return nil
+}
diff --git a/admin/option.go b/admin/option.go
new file mode 100644
index 0000000..d5a648e
--- /dev/null
+++ b/admin/option.go
@@ -0,0 +1,131 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package admin
+
+func defaultTopicConfigCreate() TopicConfigCreate {
+	opts := TopicConfigCreate{
+		DefaultTopic:    "defaultTopic",
+		ReadQueueNums:   8,
+		WriteQueueNums:  8,
+		Perm:            6,
+		TopicFilterType: "SINGLE_TAG",
+		TopicSysFlag:    0,
+		Order:           false,
+	}
+	return opts
+}
+
+type TopicConfigCreate struct {
+	Topic           string
+	BrokerAddr      string
+	DefaultTopic    string
+	ReadQueueNums   int
+	WriteQueueNums  int
+	Perm            int
+	TopicFilterType string
+	TopicSysFlag    int
+	Order           bool
+}
+
+type OptionCreate func(*TopicConfigCreate)
+
+func WithTopicCreate(Topic string) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.Topic = Topic
+	}
+}
+
+func WithBrokerAddrCreate(BrokerAddr string) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.BrokerAddr = BrokerAddr
+	}
+}
+
+func WithReadQueueNums(ReadQueueNums int) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.ReadQueueNums = ReadQueueNums
+	}
+}
+
+func WithWriteQueueNums(WriteQueueNums int) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.WriteQueueNums = WriteQueueNums
+	}
+}
+
+func WithPerm(Perm int) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.Perm = Perm
+	}
+}
+
+func WithTopicFilterType(TopicFilterType string) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.TopicFilterType = TopicFilterType
+	}
+}
+
+func WithTopicSysFlag(TopicSysFlag int) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.TopicSysFlag = TopicSysFlag
+	}
+}
+
+func WithOrder(Order bool) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.Order = Order
+	}
+}
+
+func defaultTopicConfigDelete() TopicConfigDelete {
+	opts := TopicConfigDelete{}
+	return opts
+}
+
+type TopicConfigDelete struct {
+	Topic       string
+	ClusterName string
+	NameSrvAddr []string
+	BrokerAddr  string
+}
+
+type OptionDelete func(*TopicConfigDelete)
+
+func WithTopicDelete(Topic string) OptionDelete {
+	return func(opts *TopicConfigDelete) {
+		opts.Topic = Topic
+	}
+}
+
+func WithBrokerAddrDelete(BrokerAddr string) OptionDelete {
+	return func(opts *TopicConfigDelete) {
+		opts.BrokerAddr = BrokerAddr
+	}
+}
+
+func WithClusterName(ClusterName string) OptionDelete {
+	return func(opts *TopicConfigDelete) {
+		opts.ClusterName = ClusterName
+	}
+}
+
+func WithNameSrvAddr(NameSrvAddr []string) OptionDelete {
+	return func(opts *TopicConfigDelete) {
+		opts.NameSrvAddr = NameSrvAddr
+	}
+}
diff --git a/docs/Introduction.md b/docs/Introduction.md
index f5b8e12..2132b91 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -101,3 +101,28 @@ err = c.Start()
 ```
 
 Full examples: [consumer](../examples/consumer)
+
+
+### Admin: Topic Operation
+
+#### Examples
+- create topic
+```
+testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})))
+err = testAdmin.CreateTopic(
+	context.Background(),
+	admin.WithTopicCreate("newTopic"),
+	admin.WithBrokerAddrCreate("127.0.0.1:10911"),
+)
+```
+
+- delete topic
+`ClusterName` not supported yet
+```
+err = testAdmin.DeleteTopic(
+	context.Background(),
+	admin.WithTopicDelete("newTopic"),
+	//admin.WithBrokerAddrDelete("127.0.0.1:10911"),	//optional
+	//admin.WithNameSrvAddr(nameSrvAddr),				//optional
+)
+```
\ No newline at end of file
diff --git a/docs/feature.md b/docs/feature.md
index 795ffac..eed3e17 100644
--- a/docs/feature.md
+++ b/docs/feature.md
@@ -70,4 +70,17 @@
 - [ ] Other
     - [ ] VIPChannel
     - [ ] RPCHook
-    
\ No newline at end of file
+    
+## Admin
+
+### Topic/Cluster
+- [x] updateTopic
+- [x] deleteTopic
+- [ ] updateSubGroup
+- [ ] deleteSubGroup
+- [ ] updateBrokerConfig
+- [ ] updateTopicPerm
+- [ ] listTopic
+- [ ] topicRoute
+- [ ] topicStatus
+- [ ] topicClusterList
\ No newline at end of file
diff --git a/examples/admin/topic/main.go b/examples/admin/topic/main.go
new file mode 100644
index 0000000..ef9a536
--- /dev/null
+++ b/examples/admin/topic/main.go
@@ -0,0 +1,64 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/apache/rocketmq-client-go/v2/admin"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	topic := "newOne"
+	//clusterName := "DefaultCluster"
+	nameSrvAddr := []string{"127.0.0.1:9876"}
+	brokerAddr := "127.0.0.1:10911"
+
+	testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)))
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+
+	//create topic
+	err = testAdmin.CreateTopic(
+		context.Background(),
+		admin.WithTopicCreate(topic),
+		admin.WithBrokerAddrCreate(brokerAddr),
+	)
+	if err != nil {
+		fmt.Println("Create topic error:", err.Error())
+	}
+
+	//deletetopic
+	err = testAdmin.DeleteTopic(
+		context.Background(),
+		admin.WithTopicDelete(topic),
+		//admin.WithBrokerAddrDelete(brokerAddr),
+		//admin.WithNameSrvAddr(nameSrvAddr),
+	)
+	if err != nil {
+		fmt.Println("Delete topic error:", err.Error())
+	}
+
+	err = testAdmin.Close()
+	if err != nil {
+		fmt.Printf("Shutdown admin error: %s", err.Error())
+	}
+}
diff --git a/internal/request.go b/internal/request.go
index 5438790..fa88efe 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -24,25 +24,33 @@ import (
 )
 
 const (
-	ReqSendMessage              = int16(10)
-	ReqPullMessage              = int16(11)
-	ReqQueryConsumerOffset      = int16(14)
-	ReqUpdateConsumerOffset     = int16(15)
-	ReqSearchOffsetByTimestamp  = int16(29)
-	ReqGetMaxOffset             = int16(30)
-	ReqHeartBeat                = int16(34)
-	ReqConsumerSendMsgBack      = int16(36)
-	ReqENDTransaction           = int16(37)
-	ReqGetConsumerListByGroup   = int16(38)
-	ReqLockBatchMQ              = int16(41)
-	ReqUnlockBatchMQ            = int16(42)
-	ReqGetRouteInfoByTopic      = int16(105)
-	ReqSendBatchMessage         = int16(320)
-	ReqCheckTransactionState    = int16(39)
-	ReqNotifyConsumerIdsChanged = int16(40)
-	ReqResetConsuemrOffset      = int16(220)
-	ReqGetConsumerRunningInfo   = int16(307)
-	ReqConsumeMessageDirectly   = int16(309)
+	ReqSendMessage                   = int16(10)
+	ReqPullMessage                   = int16(11)
+	ReqQueryMessage                  = int16(12)
+	ReqQueryConsumerOffset           = int16(14)
+	ReqUpdateConsumerOffset          = int16(15)
+	ReqCreateTopic                   = int16(17)
+	ReqSearchOffsetByTimestamp       = int16(29)
+	ReqGetMaxOffset                  = int16(30)
+	ReqGetMinOffset                  = int16(31)
+	ReqViewMessageByID               = int16(33)
+	ReqHeartBeat                     = int16(34)
+	ReqConsumerSendMsgBack           = int16(36)
+	ReqENDTransaction                = int16(37)
+	ReqGetConsumerListByGroup        = int16(38)
+	ReqLockBatchMQ                   = int16(41)
+	ReqUnlockBatchMQ                 = int16(42)
+	ReqGetRouteInfoByTopic           = int16(105)
+	ReqGetBrokerClusterInfo          = int16(106)
+	ReqSendBatchMessage              = int16(320)
+	ReqCheckTransactionState         = int16(39)
+	ReqNotifyConsumerIdsChanged      = int16(40)
+	ReqGetAllTopicListFromNameServer = int16(206)
+	ReqDeleteTopicInBroker           = int16(215)
+	ReqDeleteTopicInNameSrv          = int16(216)
+	ReqResetConsuemrOffset           = int16(220)
+	ReqGetConsumerRunningInfo        = int16(307)
+	ReqConsumeMessageDirectly        = int16(309)
 )
 
 type SendMessageRequestHeader struct {
@@ -318,3 +326,84 @@ func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string
 		request.clientID = v
 	}
 }
+
+type QueryMessageRequestHeader struct {
+	Topic          string
+	Key            string
+	MaxNum         int
+	BeginTimestamp int64
+	EndTimestamp   int64
+}
+
+func (request *QueryMessageRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+	maps["key"] = request.Key
+	maps["maxNum"] = fmt.Sprintf("%d", request.MaxNum)
+	maps["beginTimestamp"] = strconv.FormatInt(request.BeginTimestamp, 10)
+	maps["endTimestamp"] = fmt.Sprintf("%d", request.EndTimestamp)
+
+	return maps
+}
+
+func (request *QueryMessageRequestHeader) Decode(properties map[string]string) error {
+	return nil
+}
+
+type ViewMessageRequestHeader struct {
+	Offset int64
+}
+
+func (request *ViewMessageRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["offset"] = strconv.FormatInt(request.Offset, 10)
+
+	return maps
+}
+
+type CreateTopicRequestHeader struct {
+	Topic           string
+	DefaultTopic    string
+	ReadQueueNums   int
+	WriteQueueNums  int
+	Perm            int
+	TopicFilterType string
+	TopicSysFlag    int
+	Order           bool
+}
+
+func (request *CreateTopicRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+	maps["defaultTopic"] = request.DefaultTopic
+	maps["readQueueNums"] = fmt.Sprintf("%d", request.ReadQueueNums)
+	maps["writeQueueNums"] = fmt.Sprintf("%d", request.WriteQueueNums)
+	maps["perm"] = fmt.Sprintf("%d", request.Perm)
+	maps["topicFilterType"] = request.TopicFilterType
+	maps["topicSysFlag"] = fmt.Sprintf("%d", request.TopicSysFlag)
+	maps["order"] = strconv.FormatBool(request.Order)
+
+	return maps
+}
+
+type TopicListRequestHeader struct {
+	Topic string
+}
+
+func (request *TopicListRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+
+	return maps
+}
+
+type DeleteTopicRequestHeader struct {
+	Topic string
+}
+
+func (request *DeleteTopicRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+
+	return maps
+}


Mime
View raw message