rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [2/2] incubator-rocketmq-externals git commit: [ROCKETMQ-198] Go-Client's incomplete implement.
Date Fri, 07 Jul 2017 01:56:42 GMT
[ROCKETMQ-198] Go-Client's incomplete implement.

Author: tangjie <styletang.me@gmail.com>

Closes #22 from StyleTang/go-client-all.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/28b98b09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/28b98b09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/28b98b09

Branch: refs/heads/master
Commit: 28b98b096f7104f08658a116525b0812b9a14367
Parents: c98a770
Author: tangjie <styletang.me@gmail.com>
Authored: Fri Jul 7 09:56:15 2017 +0800
Committer: yukon <yukon@apache.org>
Committed: Fri Jul 7 09:56:15 2017 +0800

----------------------------------------------------------------------
 rocketmq-go/clean_expire_msg_controller.go      |  49 +++
 rocketmq-go/docs/roadmap.md                     | 105 +++--
 rocketmq-go/example/consumer_example.go         |  50 ++-
 .../example/producer_consumer_example.go        |  77 ++++
 rocketmq-go/example/producer_example.go         |  40 ++
 rocketmq-go/model/config/consumer_config.go     |  58 ++-
 rocketmq-go/model/config/producer_config.go     |  46 ++
 rocketmq-go/model/config/rocketmq_config.go     |  20 -
 rocketmq-go/model/constant/config.go            |   2 +-
 rocketmq-go/model/constant/mix_all.go           |   3 -
 .../model/header/pull_message_request_header.go |  12 +-
 .../model/header/send_message_request_header.go |  29 +-
 rocketmq-go/model/message/message_queue.go      |  84 ----
 rocketmq-go/model/process_queue.go              | 421 -------------------
 rocketmq-go/model/process_queue_info.go         |   8 -
 rocketmq-go/model/response_code.go              |   4 +-
 rocketmq-go/model/send_result.go                |  13 +-
 rocketmq-go/model/topic_publishInfo.go          |  76 ----
 rocketmq-go/model/topic_publish_info.go         |   6 -
 rocketmq-go/model/topic_route_data.go           | 105 -----
 rocketmq-go/mq_client_manage.go                 | 261 ++++++++++++
 rocketmq-go/mq_client_manager.go                |  90 ----
 rocketmq-go/mq_consumer.go                      |  74 ----
 rocketmq-go/mq_producer.go                      |  53 ++-
 rocketmq-go/mq_push_consumer.go                 | 153 +++++++
 rocketmq-go/pull_message_controller.go          | 329 +++++++++++++++
 rocketmq-go/rebalance_controller.go             |  33 ++
 .../allocate_message_averagely.go               |  80 ++++
 .../allocate_message_averagely_by_circle.go     |  79 ++++
 .../allocate_message_by_config.go               |  27 ++
 .../allocate_message_by_machine_room.go         |  80 ++++
 .../allocate_message_queue_strategy.go          |  27 ++
 rocketmq-go/service/consume_message_service.go  |  24 +-
 rocketmq-go/service/mq_client.go                |  12 +-
 rocketmq-go/service/mq_fault_strategy.go        |  49 +++
 rocketmq-go/service/offset_store.go             | 163 +++++++
 rocketmq-go/service/offset_store_service.go     |  21 -
 rocketmq-go/service/producer_service.go         | 222 +++++++++-
 .../service/producer_service_for_send_back.go   | 115 +++++
 rocketmq-go/service/rebalance.go                | 307 ++++++++++++++
 rocketmq-go/service/rebalance_service.go        |  25 --
 rocketmq-go/tasks.go                            |  68 +++
 rocketmq-go/util/compress_util.go               |  63 +++
 rocketmq-go/util/concurrent_map.go              |  16 +
 rocketmq-go/util/message_client_id_generator.go |   4 +-
 rocketmq-go/util/message_properties.go          |  47 +++
 rocketmq-go/util/regex_util.go                  |  33 ++
 47 files changed, 2572 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/clean_expire_msg_controller.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/clean_expire_msg_controller.go b/rocketmq-go/clean_expire_msg_controller.go
new file mode 100644
index 0000000..73bb35b
--- /dev/null
+++ b/rocketmq-go/clean_expire_msg_controller.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+	"time"
+)
+
+type CleanExpireMsgController struct {
+	mqClient      service.RocketMqClient
+	clientFactory *ClientFactory
+}
+
+func NewCleanExpireMsgController(mqClient service.RocketMqClient, clientFactory *ClientFactory) *CleanExpireMsgController {
+	return &CleanExpireMsgController{
+		mqClient:      mqClient,
+		clientFactory: clientFactory,
+	}
+}
+
+func (self *CleanExpireMsgController) Start() {
+	for _, consumer := range self.clientFactory.ConsumerTable {
+		go func() {
+			cleanExpireMsgTimer := time.NewTimer(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * 1000 * 60 * time.Millisecond)
+			//cleanExpireMsgTimer := time.NewTimer(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * time.Millisecond)
+			for {
+				<-cleanExpireMsgTimer.C
+				consumer.CleanExpireMsg()
+				cleanExpireMsgTimer.Reset(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * 1000 * 60 * time.Millisecond)
+				//cleanExpireMsgTimer.Reset(time.Duration(consumer.ConsumerConfig.ConsumeTimeout) * time.Millisecond)
+			}
+		}()
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/docs/roadmap.md
----------------------------------------------------------------------
diff --git a/rocketmq-go/docs/roadmap.md b/rocketmq-go/docs/roadmap.md
index 0db9033..a69c561 100644
--- a/rocketmq-go/docs/roadmap.md
+++ b/rocketmq-go/docs/roadmap.md
@@ -1,45 +1,64 @@
 # RoadMap-Milestone1
 
 ## Consumer
-- [ ] ConsumerType
-    - [ ] PushConsumer
-- [ ] MessageListener
-    - [ ] Concurrently
-- [ ] MessageModel
-    - [ ] CLUSTERING
-- [ ] OffsetStore
-    - [ ] RemoteBrokerOffsetStore
-- [ ] RebalanceService
-- [ ] PullMessageService
-- [ ] ConsumeMessageService
-- [ ] AllocateMessageQueueStrategy
-    - [ ] AllocateMessageQueueAveragely
-- [ ] Other
-    - [ ] Config
-    - [ ] ZIP
-    - [ ] ConsumeFromWhere
-        - [ ] CONSUME_FROM_LAST_OFFSET
-        - [ ] CONSUME_FROM_FIRST_OFFSET
-        - [ ] CONSUME_FROM_TIMESTAMP
-    - [ ] Retry(sendMessageBack)
-    - [ ] TimeOut(clearExpiredMessage)
-    - [ ] ACK(partSuccess)
-    - [ ] FlowControl(messageCanNotConsume)
+- [x] ConsumerType
+    - [x] PushConsumer
+- [x] MessageListener
+    - [x] Concurrently
+- [x] MessageModel
+    - [x] CLUSTERING
+- [x] OffsetStore
+    - [x] RemoteBrokerOffsetStore
+- [x] RebalanceService
+- [x] PullMessageService
+- [x] ConsumeMessageService
+- [x] AllocateMessageQueueStrategy
+    - [x] AllocateMessageQueueAveragely
+- [x] Other
+    - [x] Config
+    - [x] ZIP
+    - [x] ConsumeFromWhere
+        - [x] CONSUME_FROM_LAST_OFFSET
+        - [x] CONSUME_FROM_FIRST_OFFSET
+        - [x] CONSUME_FROM_TIMESTAMP
+    - [x] Retry(sendMessageBack)
+    - [x] TimeOut(clearExpiredMessage)
+    - [x] ACK(partSuccess)
+    - [x] FlowControl(messageCanNotConsume)
+    
+## Producer
+- [x] ProducerType
+    - [x] DefaultProducer
+- [x] API
+    - [x] Send
+        - [x] Sync
+- [x] Other
+    - [x] DelayMessage
+    - [x] Config
+    - [x] MessageId Generate
+    - [x] CompressMsg
+    - [x] TimeOut
+    - [x] LoadBalance
+    - [x] DefaultTopic
+    - [x] VipChannel
+    - [x] MQFaultStrategy
+
 ## Manager
-- [ ] Controller
-    - [ ] PullMessageController
-- [ ] Task
-    - [ ] Heartbeat
-    - [ ] UpdateTopicRouteInfoFromNameServer
-    - [ ] PersistAllConsumerOffset
-    - [ ] ClearExpiredMessage(form consumer consumeMessageService)
-- [ ] ClientRemotingProcessor
-    - [ ] CHECK_TRANSACTION_STATE
-    - [ ] NOTIFY_CONSUMER_IDS_CHANGED
-    - [ ] RESET_CONSUMER_CLIENT_OFFSET
-    - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
-    - [ ] GET_CONSUMER_RUNNING_INFO
-    - [ ] CONSUME_MESSAGE_DIRECTLY
+- [x] Controller
+    - [x] PullMessageController
+- [x] Task
+    - [x] UpdateTopicRouteInfo
+    - [x] Heartbeat
+    - [x] Rebalance
+    - [x] PullMessage
+    - [x] CleanExpireMsg
+- [x] ClientRemotingProcessor
+    - [x] CHECK_TRANSACTION_STATE
+    - [x] NOTIFY_CONSUMER_IDS_CHANGED
+    - [x] RESET_CONSUMER_CLIENT_OFFSET
+    - [x] GET_CONSUMER_STATUS_FROM_CLIENT
+    - [x] GET_CONSUMER_RUNNING_INFO
+    - [x] CONSUME_MESSAGE_DIRECTLY
 
 ## Remoting
 - [x] MqClientRequest
@@ -122,13 +141,11 @@
     - [ ] RebalanceController
     - [ ] PullMessageController
 - [ ] Task
-    - [ ] PollNameServer
+    - [ ] UpdateTopicRouteInfo
     - [ ] Heartbeat
-    - [ ] UpdateTopicRouteInfoFromNameServer
-    - [ ] CleanOfflineBroker
-    - [ ] PersistAllConsumerOffset
-    - [ ] ClearExpiredMessage(form consumer consumeMessageService)
-    - [ ] UploadFilterClassSource(FromHeartBeat/But Golang Not Easy To do this(Java Source))
+    - [ ] Rebalance
+    - [ ] PullMessage
+    - [ ] CleanExpireMsg
 - [ ] ClientRemotingProcessor
     - [ ] CHECK_TRANSACTION_STATE
     - [ ] NOTIFY_CONSUMER_IDS_CHANGED

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/example/consumer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/consumer_example.go b/rocketmq-go/example/consumer_example.go
index af74c01..7c94e58 100644
--- a/rocketmq-go/example/consumer_example.go
+++ b/rocketmq-go/example/consumer_example.go
@@ -17,39 +17,37 @@
 package main
 
 import (
-	"errors"
 	"github.com/apache/incubator-rocketmq-externals/rocketmq-go"
 	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
 	"github.com/golang/glog"
+	"time"
 )
 
 func main() {
 
-	// create a mqClientManager instance
-	var mqClientConfig = &rocketmq.MqClientConfig{}
-	var mqClientManager = rocketmq.NewMqClientManager(mqClientConfig)
-
-	// create rocketMq consumer
-	var consumerConfig = &rocketmq.MqConsumerConfig{}
-	var consumer1 = rocketmq.NewDefaultMQPushConsumer("testGroup", consumerConfig)
-	consumer1.Subscribe("testTopic", "*")
-	consumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult {
-		var index = -1
-		for i, msg := range msgs {
-			// your code here,for example,print msg
-			glog.Info(msg)
-			var err = errors.New("error")
-			if err != nil {
-				break
-			}
-			index = i
+	var (
+		testTopic = "GoLang"
+	)
+	var comsumer1 = rocketmq.NewDefaultMQPushConsumer(testTopic + "-StyleTang")
+	comsumer1.ConsumerConfig.PullInterval = 0
+	comsumer1.ConsumerConfig.ConsumeTimeout = 1
+	comsumer1.ConsumerConfig.ConsumeMessageBatchMaxSize = 16
+	comsumer1.ConsumerConfig.ConsumeFromWhere = "CONSUME_FROM_TIMESTAMP"
+	comsumer1.ConsumerConfig.ConsumeTimestamp = time.Now()
+	comsumer1.Subscribe(testTopic, "*")
+	comsumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult {
+		for _, msg := range msgs {
+			glog.Info(msg.BornTimestamp)
 		}
-		return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: index}
+		glog.Info("look message len ", len(msgs))
+		return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: len(msgs)}
 	})
-
-	//register consumer to mqClientManager
-	mqClientManager.RegisterConsumer(consumer1)
-
-	//start it
-	mqClientManager.Start()
+	var clienConfig = &config.ClientConfig{}
+	clienConfig.SetNameServerAddress("120.55.113.35:9876")
+	rocketMqManager := rocketmq.MqClientManagerInit(clienConfig)
+	rocketMqManager.RegistConsumer(comsumer1)
+	rocketMqManager.Start()
+	select {}
+	rocketMqManager.ShutDown()
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/example/producer_consumer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/producer_consumer_example.go b/rocketmq-go/example/producer_consumer_example.go
new file mode 100644
index 0000000..0d8e455
--- /dev/null
+++ b/rocketmq-go/example/producer_consumer_example.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package main
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go" //todo todo  I want only import this
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"github.com/golang/glog"
+	"net/http"
+	_ "net/http/pprof"
+	"time"
+)
+
+func main() {
+	go func() {
+		http.ListenAndServe("localhost:6060", nil)
+	}()
+	var (
+		testTopic = "GoLang"
+	)
+	var producer1 = rocketmq.NewDefaultMQProducer("Test1")
+	producer1.ProducerConfig.CompressMsgBodyOverHowMuch = 1
+	var producer2 = rocketmq.NewDefaultMQProducer("Test2")
+	var comsumer1 = rocketmq.NewDefaultMQPushConsumer(testTopic + "-StyleTang")
+	comsumer1.ConsumerConfig.PullInterval = 0
+	comsumer1.ConsumerConfig.ConsumeTimeout = 1
+	comsumer1.ConsumerConfig.ConsumeMessageBatchMaxSize = 16
+	comsumer1.ConsumerConfig.ConsumeFromWhere = "CONSUME_FROM_TIMESTAMP"
+	comsumer1.ConsumerConfig.ConsumeTimestamp = time.Now()
+	comsumer1.Subscribe(testTopic, "*")
+	comsumer1.RegisterMessageListener(func(msgs []model.MessageExt) model.ConsumeConcurrentlyResult {
+		for _, msg := range msgs {
+			glog.Info(msg.BornTimestamp)
+		}
+		glog.Info("look message len ", len(msgs))
+		return model.ConsumeConcurrentlyResult{ConsumeConcurrentlyStatus: model.CONSUME_SUCCESS, AckIndex: len(msgs)}
+	})
+	var clienConfig = &config.ClientConfig{}
+	clienConfig.SetNameServerAddress("120.55.113.35:9876")
+	rocketMqManager := rocketmq.MqClientManagerInit(clienConfig)
+	rocketMqManager.RegistProducer(producer1)
+	rocketMqManager.RegistProducer(producer2)
+	rocketMqManager.RegistConsumer(comsumer1)
+	rocketMqManager.Start()
+	for i := 0; i < 10000000; i++ {
+		var message = &model.Message{}
+		message.Topic = testTopic
+		message.SetKeys([]string{"xxx"})
+		message.SetTag("1122")
+		message.Body = []byte("hellAXXWord" + util.IntToString(i))
+
+		xx, ee := producer1.Send(message)
+		if ee != nil {
+			glog.Error(ee)
+			continue
+		}
+		glog.V(0).Infof("sendMessageResutl messageId[%s] err[%s]", xx.MsgID(), ee)
+	}
+	select {}
+	rocketMqManager.ShutDown()
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/example/producer_example.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/example/producer_example.go b/rocketmq-go/example/producer_example.go
index bda2941..acc2011 100644
--- a/rocketmq-go/example/producer_example.go
+++ b/rocketmq-go/example/producer_example.go
@@ -15,3 +15,43 @@
  *  limitations under the License.
  */
 package main
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"github.com/golang/glog"
+	_ "net/http/pprof"
+)
+
+func main() {
+	var (
+		testTopic = "GoLang"
+	)
+	var producer1 = rocketmq.NewDefaultMQProducer("Test1")
+	producer1.ProducerConfig.CompressMsgBodyOverHowMuch = 1
+	var producer2 = rocketmq.NewDefaultMQProducer("Test2")
+	var clienConfig = &config.ClientConfig{}
+	clienConfig.SetNameServerAddress("120.55.113.35:9876")
+	rocketMqManager := rocketmq.MqClientManagerInit(clienConfig)
+	rocketMqManager.RegistProducer(producer1)
+	rocketMqManager.RegistProducer(producer2)
+	rocketMqManager.Start()
+	for i := 0; i < 1000; i++ {
+		var message = &model.Message{}
+		message.Topic = testTopic
+		message.SetKeys([]string{"xxx"})
+		message.SetTag("1122")
+		message.Body = []byte("hellAXXWord" + util.IntToString(i))
+
+		xx, ee := producer1.Send(message)
+		if ee != nil {
+			glog.Error(ee)
+			continue
+		}
+		glog.V(0).Infof("sendMessageResutl messageId[%s] err[%s]", xx.MsgID(), ee)
+	}
+	select {}
+	rocketMqManager.ShutDown()
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/config/consumer_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/consumer_config.go b/rocketmq-go/model/config/consumer_config.go
index 25f7585..b6a6f32 100644
--- a/rocketmq-go/model/config/consumer_config.go
+++ b/rocketmq-go/model/config/consumer_config.go
@@ -18,22 +18,30 @@ package config
 
 import "time"
 
-type RocketMqConsumerConfig struct {
-	ConsumeFromWhere string
-	/**
-	 * Minimum consumer thread number
-	 */
-	//consumeThreadMin                  int
-	//					/**
-	//					 * Max consumer thread number
-	//					 */
-	//consumeThreadMax                  int
+/**
+ * Delay some time when exception occur
+ */
+const PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION int64 = 3000
 
-	/**
-	 * Threshold for dynamic adjustment of the number of thread pool
-	 */
-	//adjustThreadPoolNumsThreshold     int   // = 100000;
+/**
+ * Flow control interval
+ */
+const PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL int64 = 50
 
+//consume from where
+//first consume from the last offset
+const CONSUME_FROM_LAST_OFFSET string = "CONSUME_FROM_LAST_OFFSET"
+
+//first consume from the first offset
+const CONSUME_FROM_FIRST_OFFSET string = "CONSUME_FROM_FIRST_OFFSET"
+
+//first consume from the time
+const CONSUME_FROM_TIMESTAMP string = "CONSUME_FROM_TIMESTAMP"
+
+//consume from where
+
+type RocketMqConsumerConfig struct {
+	ConsumeFromWhere string
 	/**
 	 * Concurrently max span offset.it has no effect on sequential consumption
 	 */
@@ -94,6 +102,26 @@ type RocketMqConsumerConfig struct {
 }
 
 func NewRocketMqConsumerConfig() (consumerConfig *RocketMqConsumerConfig) {
-	consumerConfig = &RocketMqConsumerConfig{}
+	consumerConfig = &RocketMqConsumerConfig{
+		ConsumeFromWhere:              CONSUME_FROM_LAST_OFFSET,
+		ConsumeConcurrentlyMaxSpan:    2000,
+		PullThresholdForQueue:         1000,
+		PullInterval:                  0,
+		ConsumeMessageBatchMaxSize:    1,
+		PullBatchSize:                 32,
+		PostSubscriptionWhenPull:      false,
+		UnitMode:                      false,
+		MaxReconsumeTimes:             16,
+		SuspendCurrentQueueTimeMillis: 1000,
+		ConsumeTimeout:                15,
+		ConsumeTimestamp:              time.Now().Add(-30 * time.Minute),
+
+		// use custom or constants.don't suggest to change
+		PullTimeDelayMillsWhenException:   PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION,
+		PullTimeDelayMillsWhenFlowControl: PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL,
+		PullTimeDelayMillsWhenSuspend:     1000,
+		BrokerSuspendMaxTimeMillis:        1000 * 15,
+		ConsumerTimeoutMillisWhenSuspend:  1000 * 30,
+	}
 	return
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/config/producer_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/producer_config.go b/rocketmq-go/model/config/producer_config.go
index ce109fb..3bf4844 100644
--- a/rocketmq-go/model/config/producer_config.go
+++ b/rocketmq-go/model/config/producer_config.go
@@ -17,4 +17,50 @@
 package config
 
 type RocketMqProducerConfig struct {
+	SendMsgTimeout int64 //done
+	//private int sendMsgTimeout = 3000;
+	CompressMsgBodyOverHowMuch int //done
+	//private int compressMsgBodyOverHowmuch = 1024 * 4;
+	ZipCompressLevel int //done
+	//private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
+	/**
+	 * Just for testing or demo program
+	 */
+	//	private String createTopicKey = MixAll.DEFAULT_TOPIC;
+
+	//DefaultTopicQueueNums            int
+	////private volatile int defaultTopicQueueNums = 4;
+
+	RetryTimesWhenSendFailed int
+	//private int retryTimesWhenSendFailed = 2;
+	RetryTimesWhenSendAsyncFailed int
+	//private int retryTimesWhenSendAsyncFailed = 2;
+	//
+	RetryAnotherBrokerWhenNotStoreOK bool
+	//private boolean retryAnotherBrokerWhenNotStoreOK = false;
+	MaxMessageSize int
+	//private int maxMessageSize = 1024 * 1024 * 4; // 4M
+
+	//for MQFaultStrategy todo to be done
+	SendLatencyFaultEnable bool    //false
+	LatencyMax             []int64 //=             {50L,   100L,   550L,       1000L,  2000L,      3000L,      15000L};
+	NotAvailableDuration   []int64 //   {0L,    0L,     30000L,     60000L, 120000L,    180000L,    600000L};
+}
+
+//set defaultValue
+func NewProducerConfig() (producerConfig *RocketMqProducerConfig) {
+	producerConfig = &RocketMqProducerConfig{
+		SendMsgTimeout:             3000,
+		CompressMsgBodyOverHowMuch: 1024 * 4,
+		ZipCompressLevel:           5,
+		MaxMessageSize:             1024 * 1024 * 4, // 4M
+
+		RetryTimesWhenSendFailed:         2,
+		RetryTimesWhenSendAsyncFailed:    2, //
+		RetryAnotherBrokerWhenNotStoreOK: false,
+		SendLatencyFaultEnable:           false,
+		LatencyMax:                       []int64{50, 100, 550, 1000, 2000, 3000, 15000},
+		NotAvailableDuration:             []int64{0, 0, 30000, 60000, 120000, 180000, 600000},
+	}
+	return
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/config/rocketmq_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/config/rocketmq_config.go b/rocketmq-go/model/config/rocketmq_config.go
deleted file mode 100644
index 56e89b9..0000000
--- a/rocketmq-go/model/config/rocketmq_config.go
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package config
-
-type RocketMqClientConfig struct {
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/constant/config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/config.go b/rocketmq-go/model/constant/config.go
index c48dfa5..5f7f5db 100644
--- a/rocketmq-go/model/constant/config.go
+++ b/rocketmq-go/model/constant/config.go
@@ -26,4 +26,4 @@ var USE_HEADER_SERIALIZETYPE = JSON_SERIALIZE
 
 var REMOTING_COMMAND_FLAG = 0
 var REMOTING_COMMAND_LANGUAGE = "OTHER"
-var REMOTING_COMMAND_VERSION int16 = 137
+var REMOTING_COMMAND_VERSION int16 = 213

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/constant/mix_all.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/constant/mix_all.go b/rocketmq-go/model/constant/mix_all.go
index 6abaabe..2c8f0a7 100644
--- a/rocketmq-go/model/constant/mix_all.go
+++ b/rocketmq-go/model/constant/mix_all.go
@@ -44,9 +44,6 @@ const (
 	CID_ONSAPI_PULL_GROUP       = "CID_ONSAPI_PULL"
 	CID_RMQ_SYS_PREFIX          = "CID_RMQ_SYS_"
 
-	//public static final List<String> LocalInetAddrs = getLocalInetAddress()
-	//Localhost = localhost()
-	//DEFAULT_CHARSET = "UTF-8"
 	MASTER_ID int64 = 0
 	CURRENT_JVM_PID
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/header/pull_message_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/pull_message_request_header.go b/rocketmq-go/model/header/pull_message_request_header.go
index 0133796..f341b49 100644
--- a/rocketmq-go/model/header/pull_message_request_header.go
+++ b/rocketmq-go/model/header/pull_message_request_header.go
@@ -17,8 +17,18 @@
 package header
 
 type PullMessageRequestHeader struct {
+	ConsumerGroup        string `json:"consumerGroup"`
+	Topic                string `json:"topic"`
+	QueueId              int32  `json:"queueId"`
+	QueueOffset          int64  `json:"queueOffset"`
+	MaxMsgNums           int32  `json:"maxMsgNums"`
+	SysFlag              int32  `json:"sysFlag"`
+	CommitOffset         int64  `json:"commitOffset"`
+	SuspendTimeoutMillis int64  `json:"suspendTimeoutMillis"`
+	Subscription         string `json:"subscription"`
+	SubVersion           int64  `json:"subVersion"`
 }
 
-func (header *PullMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
+func (self *PullMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
 	return
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/header/send_message_request_header.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/header/send_message_request_header.go b/rocketmq-go/model/header/send_message_request_header.go
index 5c828a8..80f17cc 100644
--- a/rocketmq-go/model/header/send_message_request_header.go
+++ b/rocketmq-go/model/header/send_message_request_header.go
@@ -17,21 +17,20 @@
 package header
 
 type SendMessageRequestHeader struct {
-	//CommandCustomHeader
-	ProducerGroup        string
-	Topic                string
-	DefaultTopic         string
-	DefaultTopicQueueNum int
-	QueueID              int
-	SysFlag              int
-	BornTimestamp        int
-	Flag                 int
-	Properties           string
-	ReconsumeTimes       int
-	UnitMode             bool
-	MaxReconsumeTimes    int
+	ProducerGroup         string `json:"producerGroup"`
+	Topic                 string `json:"topic"`
+	DefaultTopic          string `json:"defaultTopic"`
+	DefaultTopicQueueNums int    `json:"defaultTopicQueueNums"`
+	QueueId               int32  `json:"queueId"`
+	SysFlag               int    `json:"sysFlag"`
+	BornTimestamp         int64  `json:"bornTimestamp"`
+	Flag                  int    `json:"flag"`
+	Properties            string `json:"properties"`
+	ReconsumeTimes        int    `json:"reconsumeTimes"`
+	UnitMode              bool   `json:"unitMode"`
+	MaxReconsumeTimes     int    `json:"maxReconsumeTimes"`
 }
 
-func (header *SendMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
-	//TODO
+func (self *SendMessageRequestHeader) FromMap(headerMap map[string]interface{}) {
+	return
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/message/message_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/message/message_queue.go b/rocketmq-go/model/message/message_queue.go
deleted file mode 100644
index 20b47be..0000000
--- a/rocketmq-go/model/message/message_queue.go
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package message
-
-type MessageQueue struct {
-	topic      string
-	brokerName string
-	queueId    int32
-}
-
-func NewMessageQueue(topic string, brokerName string, queueId int32) *MessageQueue {
-	return &MessageQueue{
-		topic:      topic,
-		brokerName: brokerName,
-		queueId:    queueId,
-	}
-}
-
-func (queue *MessageQueue) clone() *MessageQueue {
-	no := new(MessageQueue)
-	no.topic = queue.topic
-	no.queueId = queue.queueId
-	no.brokerName = queue.brokerName
-	return no
-}
-
-func (queue MessageQueue) BrokerName() string {
-	return queue.brokerName
-}
-
-func (queue *MessageQueue) QueueID() int32 {
-	return queue.queueId
-}
-
-type MessageQueues []*MessageQueue
-
-func (queues MessageQueues) Less(i, j int) bool {
-	imq := queues[i]
-	jmq := queues[j]
-
-	if imq.topic < jmq.topic {
-		return true
-	}
-
-	if imq.topic < jmq.topic {
-		return false
-	}
-
-	if imq.brokerName < jmq.brokerName {
-		return true
-	}
-
-	if imq.brokerName < jmq.brokerName {
-		return false
-	}
-
-	if imq.queueId < jmq.queueId {
-		return true
-	}
-
-	return false
-}
-
-func (queues MessageQueues) Swap(i, j int) {
-	queues[i], queues[j] = queues[j], queues[i]
-}
-
-func (queues MessageQueues) Len() int {
-	return len(queues)
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/process_queue.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/process_queue.go b/rocketmq-go/model/process_queue.go
index 285cbda..e0075e0 100644
--- a/rocketmq-go/model/process_queue.go
+++ b/rocketmq-go/model/process_queue.go
@@ -191,424 +191,3 @@ func (self *ProcessQueue) PutMessage(msgs []MessageExt) (dispatchToConsume bool)
 	}
 	return
 }
-
-//func (self *ProcessQueue) TakeMessages(batchSize int) (messageToConsumeList  []MessageExt) {
-//	defer self.lockTreeMap.Unlock()
-//	self.lockTreeMap.Lock()
-//	self.lastConsumeTimestamp = time.Now()
-//	it := self.msgTreeMap.Iterator()
-//	nowIndex := 0
-//	for it.Next() {
-//		offset, message := it.Key(), it.Value()
-//		if (nowIndex >= batchSize) {
-//			break
-//		}
-//		self.msgTreeMap.Remove(offset)
-//		self.msgTreeMapToBeConsume.Put(offset, message)
-//		//messageToConsumeList = append(messageToConsumeList, message)
-//	}
-//	if (len(messageToConsumeList) == 0) {
-//		self.consuming = false
-//	}
-//	return
-//}
-
-/**
-#
-public final static long RebalanceLockMaxLiveTime =Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
-public final static long RebalanceLockInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
-#并发消费过期的
-        case CONSUME_PASSIVELY:
-                            pq.setDropped(true);
-                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
-                                it.remove();
-                                changed = true;
-                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
-                                        consumerGroup, mq);
-                            }
-                            break;
-private final static long PullMaxIdleTime = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
-private final Logger log = ClientLogger.getLog();
-private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
-
-private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
-private final AtomicLong msgCount = new AtomicLong();
-private final Lock lockConsume = new ReentrantLock();
-private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
-private final AtomicLong tryUnlockTimes = new AtomicLong(0);
-private volatile long queueOffsetMax = 0L;
-private volatile boolean dropped = false;
-private volatile long lastPullTimestamp = System.currentTimeMillis();
-private volatile long lastConsumeTimestamp = System.currentTimeMillis();
-private volatile boolean locked = false;
-private volatile long lastLockTimestamp = System.currentTimeMillis();
-private volatile boolean consuming = false;
-private volatile long msgAccCnt = 0;
-
-  public boolean isLockExpired() {
-        boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > RebalanceLockMaxLiveTime;
-        return result;
-    }
-
-
-    public boolean isPullExpired() {
-        boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PullMaxIdleTime;
-        return result;
-    }
-
-param pushConsumer
-cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
-if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
-return;
-}
-
-int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
-for (int i = 0; i < loop; i++) {
-MessageExt msg = null;
-try {
-this.lockTreeMap.readLock().lockInterruptibly();
-try {
-if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
-msg = msgTreeMap.firstEntry().getValue();
-} else {
-
-break;
-}
-} finally {
-this.lockTreeMap.readLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("getExpiredMsg exception", e);
-}
-
-try {
-
-pushConsumer.sendMessageBack(msg, 3);
-log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
-try {
-msgTreeMap.remove(msgTreeMap.firstKey());
-} catch (Exception e) {
-log.error("send expired msg exception", e);
-}
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("getExpiredMsg exception", e);
-}
-} catch (Exception e) {
-log.error("send expired msg exception", e);
-}
-}
-}
-
-
-public boolean putMessage(final List<MessageExt> msgs) {
-boolean dispatchToConsume = false;
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-int validMsgCnt = 0;
-for (MessageExt msg : msgs) {
-MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
-if (null == old) {
-validMsgCnt++;
-this.queueOffsetMax = msg.getQueueOffset();
-}
-}
-msgCount.addAndGet(validMsgCnt);
-
-if (!msgTreeMap.isEmpty() && !this.consuming) {
-dispatchToConsume = true;
-this.consuming = true;
-}
-
-if (!msgs.isEmpty()) {
-MessageExt messageExt = msgs.get(msgs.size() - 1);
-String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
-if (property != null) {
-long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
-if (accTotal > 0) {
-this.msgAccCnt = accTotal;
-}
-}
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("putMessage exception", e);
-}
-
-return dispatchToConsume;
-}
-
-
-public long getMaxSpan() {
-try {
-this.lockTreeMap.readLock().lockInterruptibly();
-try {
-if (!this.msgTreeMap.isEmpty()) {
-return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
-}
-} finally {
-this.lockTreeMap.readLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("getMaxSpan exception", e);
-}
-
-return 0;
-}
-
-
-public long removeMessage(final List<MessageExt> msgs) { //treeMap是维护了没有消费的 为了处理过期使用
-long result = -1;
-final long now = System.currentTimeMillis();
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-this.lastConsumeTimestamp = now;
-try {
-if (!msgTreeMap.isEmpty()) {
-result = this.queueOffsetMax + 1;
-int removedCnt = 0;
-for (MessageExt msg : msgs) {
-MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
-if (prev != null) {
-removedCnt--;
-}
-}
-msgCount.addAndGet(removedCnt);
-
-if (!msgTreeMap.isEmpty()) {
-result = msgTreeMap.firstKey();
-}
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (Throwable t) {
-log.error("removeMessage exception", t);
-}
-
-return result;
-}
-
-
-public TreeMap<Long, MessageExt> getMsgTreeMap() {
-return msgTreeMap;
-}
-
-
-public AtomicLong getMsgCount() {
-return msgCount;
-}
-
-
-public boolean isDropped() {
-return dropped;
-}
-
-
-public void setDropped(boolean dropped) {
-this.dropped = dropped;
-}
-
-public boolean isLocked() {
-return locked;
-}
-
-public void setLocked(boolean locked) {
-this.locked = locked;
-}
-
-public void rollback() {
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-this.msgTreeMap.putAll(this.msgTreeMapTemp);
-this.msgTreeMapTemp.clear();
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("rollback exception", e);
-}
-}
-
-
-public long commit() {
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-Long offset = this.msgTreeMapTemp.lastKey();
-msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
-this.msgTreeMapTemp.clear();
-if (offset != null) {
-return offset + 1;
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("commit exception", e);
-}
-
-return -1;
-}
-
-
-public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-for (MessageExt msg : msgs) {
-this.msgTreeMapTemp.remove(msg.getQueueOffset());
-this.msgTreeMap.put(msg.getQueueOffset(), msg);
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("makeMessageToCosumeAgain exception", e);
-}
-}
-
-
-public List<MessageExt> takeMessags(final int batchSize) {
-List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
-final long now = System.currentTimeMillis();
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-this.lastConsumeTimestamp = now;
-try {
-if (!this.msgTreeMap.isEmpty()) {
-for (int i = 0; i < batchSize; i++) {
-Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
-if (entry != null) {
-result.add(entry.getValue());
-msgTreeMapTemp.put(entry.getKey(), entry.getValue());
-} else {
-break;
-}
-}
-}
-
-if (result.isEmpty()) {
-consuming = false;
-}
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("take Messages exception", e);
-}
-
-return result;
-}
-
-
-public boolean hasTempMessage() {
-try {
-this.lockTreeMap.readLock().lockInterruptibly();
-try {
-return !this.msgTreeMap.isEmpty();
-} finally {
-this.lockTreeMap.readLock().unlock();
-}
-} catch (InterruptedException e) {
-}
-
-return true;
-}
-
-
-public void clear() {
-try {
-this.lockTreeMap.writeLock().lockInterruptibly();
-try {
-this.msgTreeMap.clear();
-this.msgTreeMapTemp.clear();
-this.msgCount.set(0);
-this.queueOffsetMax = 0L;
-} finally {
-this.lockTreeMap.writeLock().unlock();
-}
-} catch (InterruptedException e) {
-log.error("rollback exception", e);
-}
-}
-
-
-
-
-public void setLastLockTimestamp(long lastLockTimestamp) {
-this.lastLockTimestamp = lastLockTimestamp;
-}
-
-
-public Lock getLockConsume() {
-return lockConsume;
-}
-
-
-
-
-public void setLastPullTimestamp(long lastPullTimestamp) {
-this.lastPullTimestamp = lastPullTimestamp;
-}
-
-
-public long getMsgAccCnt() {
-return msgAccCnt;
-}
-
-
-
-public long getTryUnlockTimes() {
-return this.tryUnlockTimes.get();
-}
-
-
-public void incTryUnlockTimes() {
-this.tryUnlockTimes.incrementAndGet();
-}
-
-
-public void fillProcessQueueInfo(final ProcessQueueInfo info) {
-try {
-this.lockTreeMap.readLock().lockInterruptibly();
-
-if (!this.msgTreeMap.isEmpty()) {
-info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
-info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
-info.setCachedMsgCount(this.msgTreeMap.size());
-}
-
-if (!this.msgTreeMapTemp.isEmpty()) {
-info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
-info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
-info.setTransactionMsgCount(this.msgTreeMapTemp.size());
-}
-
-info.setLocked(this.locked);
-info.setTryUnlockTimes(this.tryUnlockTimes.get());
-info.setLastLockTimestamp(this.lastLockTimestamp);
-
-info.setDroped(this.dropped);
-info.setLastPullTimestamp(this.lastPullTimestamp);
-info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
-} catch (Exception e) {
-} finally {
-this.lockTreeMap.readLock().unlock();
-}
-}
-
-
-
-*/

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/process_queue_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/process_queue_info.go b/rocketmq-go/model/process_queue_info.go
index 6bd71bd..c221ef8 100644
--- a/rocketmq-go/model/process_queue_info.go
+++ b/rocketmq-go/model/process_queue_info.go
@@ -35,11 +35,3 @@ type ProcessQueueInfo struct {
 	LastPullTimestamp    int64 `json:"lastPullTimestamp"`
 	LastConsumeTimestamp int64 `json:"lastConsumeTimestamp"`
 }
-
-//func (self ProcessQueueInfo) BuildFromProcessQueue(processQueue ProcessQueue) (processQueueInfo ProcessQueueInfo) {
-//	processQueueInfo = ProcessQueueInfo{}
-//	//processQueueInfo.CommitOffset =
-//	processQueueInfo.CachedMsgCount = processQueue.GetMsgCount()
-//	processQueueInfo.CachedMsgCount
-//	return
-//}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/response_code.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/response_code.go b/rocketmq-go/model/response_code.go
index ed40a6d..a42120e 100644
--- a/rocketmq-go/model/response_code.go
+++ b/rocketmq-go/model/response_code.go
@@ -28,11 +28,11 @@ const (
 	// transaction failed, because of add db failed
 	TransactionFailed = 4
 	// Broker flush disk timeout
-	FlushDiskTimeout = 10
+	//FlushDiskTimeout = 10
 	// Broker slave unavailable, just for sync double write
 	SlaveNotAvailable = 11
 	// Broker write slave timeout, just for sync double write
-	FlushSlaveTimeout = 12
+	//FlushSlaveTimeout = 12
 	// Broker illegal message
 	MessageIllegal = 13
 	// Broker, Namesrv not available,maybe service is closing or incorrect permission

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/send_result.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/send_result.go b/rocketmq-go/model/send_result.go
index 4d3b31f..857b6c4 100644
--- a/rocketmq-go/model/send_result.go
+++ b/rocketmq-go/model/send_result.go
@@ -18,22 +18,21 @@ package model
 
 import (
 	"fmt"
-	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
 )
 
 type SendStatus int
 
 const (
 	SendOK SendStatus = iota
-	//FlushDiskTimeout
-	//FlushSlaveTimeout
+	FlushDiskTimeout
+	FlushSlaveTimeout
 	SlaveNotAvaliable
 )
 
 type SendResult struct {
 	sendStatus    SendStatus
 	msgID         string
-	messageQueue  *message.MessageQueue
+	messageQueue  MessageQueue
 	queueOffset   int64
 	transactionID string
 	offsetMsgID   string
@@ -41,7 +40,7 @@ type SendResult struct {
 	traceOn       bool
 }
 
-func NewSendResult(status SendStatus, msgID, offsetID string, queue *message.MessageQueue, queueOffset int64) *SendResult {
+func NewSendResult(status SendStatus, msgID, offsetID string, queue MessageQueue, queueOffset int64) *SendResult {
 	return &SendResult{
 		sendStatus:   status,
 		msgID:        msgID,
@@ -87,11 +86,11 @@ func (result *SendResult) SetSendStatus(status SendStatus) {
 	result.sendStatus = status
 }
 
-func (result *SendResult) MessageQueue() *message.MessageQueue {
+func (result *SendResult) MessageQueue() MessageQueue {
 	return result.messageQueue
 }
 
-func (result *SendResult) SetMessageQueue(queue *message.MessageQueue) {
+func (result *SendResult) SetMessageQueue(queue MessageQueue) {
 	result.messageQueue = queue
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/topic_publishInfo.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publishInfo.go b/rocketmq-go/model/topic_publishInfo.go
deleted file mode 100644
index b5f9e37..0000000
--- a/rocketmq-go/model/topic_publishInfo.go
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package model
-
-import (
-//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
-)
-
-//type TopicPublishInfo struct {
-//	orderTopic         bool
-//	havaTopicRouteInfo bool
-//	messageQueueList   []*message.MessageQueue
-//	topicRouteData     *TopicRouteData
-//}
-//
-//func (info *TopicPublishInfo) SetOrderTopic(b bool) {
-//	info.orderTopic = b
-//}
-//
-//func (info *TopicPublishInfo) Ok() bool {
-//	return false
-//}
-//
-//func (info *TopicPublishInfo) MessageQueueList() []*message.MessageQueue {
-//	return info.messageQueueList
-//}
-//
-//func (info *TopicPublishInfo) HaveTopicRouteInfo() bool {
-//	return info.havaTopicRouteInfo
-//}
-//
-//func (info *TopicPublishInfo) SetHaveTopicRouteInfo(b bool) {
-//	info.havaTopicRouteInfo = b
-//}
-//
-//func (info *TopicPublishInfo) TopicRouteData() *TopicRouteData {
-//	return info.topicRouteData
-//}
-//
-//func (info *TopicPublishInfo) SetTopicRouteData(routeDate *TopicRouteData) {
-//	info.topicRouteData = routeDate
-//}
-//
-//func (info *TopicPublishInfo) SelectOneMessageQueue() *message.MessageQueue {
-//	return nil //TODO
-//}
-//
-//func (info *TopicPublishInfo) selectOneMessageQueueWithBroker(brokerName string) *message.MessageQueue {
-//	if brokerName == "" {
-//		return info.SelectOneMessageQueue()
-//	}
-//	return nil //TODO
-//}
-//
-//func (info *TopicPublishInfo) QueueIdByBroker(brokerName string) int {
-//	return 0 //TODO
-//}
-//
-//func (info *TopicPublishInfo) String() string {
-//	return ""
-//}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/topic_publish_info.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_publish_info.go b/rocketmq-go/model/topic_publish_info.go
index 14ec088..26a541c 100644
--- a/rocketmq-go/model/topic_publish_info.go
+++ b/rocketmq-go/model/topic_publish_info.go
@@ -29,12 +29,6 @@ type TopicPublishInfo struct {
 	topicQueueIndex        int32
 }
 
-//private boolean orderTopic = false;
-//private boolean haveTopicRouterInfo = false;
-//private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
-//private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0); // todo
-//private TopicRouteData topicRouteData;
-
 func (self *TopicPublishInfo) JudgeTopicPublishInfoOk() (bIsTopicOk bool) {
 	bIsTopicOk = (len(self.MessageQueueList) > 0)
 	return

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/model/topic_route_data.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/model/topic_route_data.go b/rocketmq-go/model/topic_route_data.go
index 348479f..9c1ab27 100644
--- a/rocketmq-go/model/topic_route_data.go
+++ b/rocketmq-go/model/topic_route_data.go
@@ -18,114 +18,9 @@
 package model
 
 import (
-	//"fmt"
-	//"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/message"
 	"sync"
 )
 
-//
-//type BrokerData struct {
-//}
-//
-//type TopicRouteData struct {
-//	orderTopicConf    string
-//	queueDatas        []*message.MessageQueue
-//	brokerDatas       []*BrokerData
-//	filterServerTable map[string][]string
-//}
-//
-//func NewTopicRouteData() *TopicRouteData {
-//	return &TopicRouteData{}
-//}
-//
-//func (route *TopicRouteData) CloneTopicRouteData() (clonedRouteData *TopicRouteData) {
-//	clonedRouteData = &TopicRouteData{
-//		route.orderTopicConf,
-//		route.queueDatas,
-//		route.brokerDatas,
-//		route.filterServerTable,
-//	}
-//	// TODO: to complete
-//	return
-//}
-//
-//func (route *TopicRouteData) QueueDatas() []*message.MessageQueue {
-//	return route.queueDatas
-//}
-//
-//func (route *TopicRouteData) SetQueueDatas(data []*message.MessageQueue) {
-//	route.queueDatas = data
-//}
-//
-//func (route *TopicRouteData) BrokerDatas() []*BrokerData {
-//	return route.brokerDatas
-//}
-//
-//func (route *TopicRouteData) SetBrokerDatas(data []*BrokerData) {
-//	route.brokerDatas = data
-//}
-//
-//func (route *TopicRouteData) FilterServerTable() map[string][]string {
-//	return route.filterServerTable
-//}
-//
-//func (route *TopicRouteData) SetFilterServerTable(data map[string][]string) {
-//	route.filterServerTable = data
-//}
-//
-//func (route *TopicRouteData) OrderTopicConf() string {
-//	return route.orderTopicConf
-//}
-//
-//func (route *TopicRouteData) SetOrderTopicConf(s string) {
-//	route.orderTopicConf = s
-//}
-//
-//func (route *TopicRouteData) HashCode() (result int) {
-//	prime := 31
-//	result = 1
-//	result *= prime
-//	// TODO
-//
-//	return
-//}
-//
-//func (route *TopicRouteData) Equals(route1 interface{}) bool {
-//	if route == nil {
-//		return true
-//	}
-//	if route1 == nil {
-//		return false
-//	}
-//	//value, ok := route1.(TopicRouteData)
-//	//if !ok {
-//	//	return false
-//	//}
-//	// TODO
-//	//if route.brokerDatas == nil && value.brokerDatas != nil || len(route.brokerDatas) != len(value.brokerDatas) {
-//	//	return false
-//	//}
-//	//
-//	//if route.orderTopicConf == "" && value.orderTopicConf != "" || route.orderTopicConf != value.orderTopicConf {
-//	//	return false
-//	//}
-//	//
-//	//if route.queueDatas == nil && value.queueDatas != nil || route.queueDatas != value.queueDatas {
-//	//	return false
-//	//}
-//	//
-//	//if route.filterServerTable == nil && value.filterServerTable != nil ||
-//	//	route.filterServerTable != value.filterServerTable {
-//	//	return false
-//	//}
-//	return true
-//}
-//
-//func (route *TopicRouteData) String() string {
-//	return fmt.Sprintf("TopicRouteData [orderTopicConf=%s, queueDatas=%s, brokerDatas=%s, filterServerTable=%s]",
-//		route.orderTopicConf, route.queueDatas, route.brokerDatas, route.filterServerTable)
-//}
-
 type TopicRouteData struct {
 	OrderTopicConf string
 	QueueDatas     []*QueueData

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_client_manage.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_client_manage.go b/rocketmq-go/mq_client_manage.go
new file mode 100644
index 0000000..7903116
--- /dev/null
+++ b/rocketmq-go/mq_client_manage.go
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util/structs"
+	"github.com/golang/glog"
+	"strings"
+	"sync"
+	"time"
+)
+
+//@see com.alibaba.rocketmq.client.impl.factory.MQClientInstance
+type MqClientManager struct {
+	rocketMqManagerLock sync.Mutex
+	//ClientId            string
+	BootTimestamp int64
+
+	clientFactory *ClientFactory
+
+	NamesrvLock   sync.Mutex
+	HeartBeatLock sync.Mutex
+	//all producer and consumer use this
+	mqClient service.RocketMqClient
+	//all producer and consumer use this
+	//private final ClientRemotingProcessor clientRemotingProcessor;
+	//	private final PullMessageService pullMessageService;
+	//private final RebalanceService rebalanceService;
+	//	private final ConsumerStatsManager consumerStatsManager;
+	//	private final AtomicLong storeTimesTotal = new AtomicLong(0);
+	ServiceState int
+
+	//should be here because need all producer consumer
+	pullMessageController    *PullMessageController
+	cleanExpireMsgController *CleanExpireMsgController
+	rebalanceControllr       *RebalanceController
+	//should be here because need all producer consumer
+	defaultProducerService *service.DefaultProducerService //for send back message
+}
+
+func MqClientManagerInit(clientConfig *config.ClientConfig) (rocketMqManager *MqClientManager) {
+	rocketMqManager = &MqClientManager{}
+	rocketMqManager.BootTimestamp = time.Now().Unix()
+	rocketMqManager.clientFactory = ClientFactoryInit()
+	rocketMqManager.mqClient = service.MqClientInit(clientConfig, rocketMqManager.InitClientRequestProcessor()) // todo todo todo
+	rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
+	rocketMqManager.cleanExpireMsgController = NewCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
+	rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
+
+	return
+}
+
+//CHECK_TRANSACTION_STATE
+//NOTIFY_CONSUMER_IDS_CHANGED
+//RESET_CONSUMER_CLIENT_OFFSET
+//GET_CONSUMER_STATUS_FROM_CLIENT
+//GET_CONSUMER_RUNNING_INFO
+//CONSUME_MESSAGE_DIRECTLY
+func (self *MqClientManager) InitClientRequestProcessor() (clientRequestProcessor remoting.ClientRequestProcessor) {
+	clientRequestProcessor = func(cmd *remoting.RemotingCommand) (response *remoting.RemotingCommand) {
+		switch cmd.Code {
+		case remoting.CHECK_TRANSACTION_STATE:
+			glog.V(2).Info("receive_request_code CHECK_TRANSACTION_STATE")
+			// todo this version don't impl this
+			break
+		case remoting.NOTIFY_CONSUMER_IDS_CHANGED:
+			glog.V(1).Info("receive_request_code NOTIFY_CONSUMER_IDS_CHANGED")
+			self.rebalanceControllr.doRebalance()
+			break
+		case remoting.RESET_CONSUMER_CLIENT_OFFSET: //  struct json key supported
+			glog.V(2).Info("receive_request_code RESET_CONSUMER_CLIENT_OFFSET")
+			glog.V(2).Info("op=look cmd body", string(cmd.Body))
+			var resetOffsetRequestHeader = &header.ResetOffsetRequestHeader{}
+			if cmd.ExtFields != nil {
+				resetOffsetRequestHeader.FromMap(cmd.ExtFields) //change map[string]interface{} into CustomerHeader struct
+				glog.V(2).Info("op=look ResetOffsetRequestHeader", resetOffsetRequestHeader)
+				resetOffsetBody := &model.ResetOffsetBody{}
+				err := resetOffsetBody.Decode(cmd.Body)
+				if err != nil {
+					return
+				}
+				glog.V(2).Info("op=look resetOffsetBody xxxxx", resetOffsetBody)
+				self.resetConsumerOffset(resetOffsetRequestHeader.Topic, resetOffsetRequestHeader.Group, resetOffsetBody.OffsetTable)
+			}
+			break
+		case remoting.GET_CONSUMER_STATUS_FROM_CLIENT: // useless we can use GET_CONSUMER_RUNNING_INFO instead
+			glog.V(2).Info("receive_request_code GET_CONSUMER_STATUS_FROM_CLIENT")
+			break
+		case remoting.GET_CONSUMER_RUNNING_INFO:
+			glog.V(2).Info("receive_request_code GET_CONSUMER_RUNNING_INFO")
+			var getConsumerRunningInfoRequestHeader = &header.GetConsumerRunningInfoRequestHeader{}
+			if cmd.ExtFields != nil {
+				getConsumerRunningInfoRequestHeader.FromMap(cmd.ExtFields) //change map[string]interface{} into CustomerHeader struct
+				consumerRunningInfo := model.ConsumerRunningInfo{}
+				consumerRunningInfo.Properties = map[string]string{}
+				defaultMQPushConsumer := self.clientFactory.ConsumerTable[getConsumerRunningInfoRequestHeader.ConsumerGroup]
+				consumerConfigMap := structs.Map(defaultMQPushConsumer.ConsumerConfig) // todo test
+				for key, value := range consumerConfigMap {
+					consumerRunningInfo.Properties[key] = fmt.Sprintf("%v", value)
+				}
+
+				consumerRunningInfo.Properties["PROP_NAMESERVER_ADDR"] = strings.Join(defaultMQPushConsumer.mqClient.GetRemotingClient().GetNamesrvAddrList(), ";")
+				consumerRunningInfo.MqTable = defaultMQPushConsumer.rebalance.GetMqTableInfo()
+
+				glog.V(2).Info("op=look consumerRunningInfo", consumerRunningInfo)
+				jsonByte, err := consumerRunningInfo.Encode()
+				glog.V(2).Info("op=enCode jsonByte", string(jsonByte))
+				if err != nil {
+					glog.Error(err)
+					return
+				}
+				response = remoting.NewRemotingCommandWithBody(remoting.SUCCESS, nil, jsonByte)
+			}
+
+			break
+		case remoting.CONSUME_MESSAGE_DIRECTLY:
+			glog.V(2).Info("receive_request_code CONSUME_MESSAGE_DIRECTLY")
+			var consumeMessageDirectlyResultRequestHeader = &header.ConsumeMessageDirectlyResultRequestHeader{}
+			if cmd.ExtFields != nil {
+				consumeMessageDirectlyResultRequestHeader.FromMap(cmd.ExtFields)
+				messageExt := &DecodeMessage(cmd.Body)[0]
+				glog.V(2).Info("op=look", messageExt)
+				defaultMQPushConsumer := self.clientFactory.ConsumerTable[consumeMessageDirectlyResultRequestHeader.ConsumerGroup]
+				consumeResult, err := defaultMQPushConsumer.consumeMessageService.ConsumeMessageDirectly(messageExt, consumeMessageDirectlyResultRequestHeader.BrokerName)
+				if err != nil {
+					return
+				}
+				jsonByte, err := json.Marshal(consumeResult)
+				if err != nil {
+					glog.Error(err)
+					return
+				}
+				response = remoting.NewRemotingCommandWithBody(remoting.SUCCESS, nil, jsonByte)
+			}
+		default:
+			glog.Error("illeage requestCode ", cmd.Code)
+		}
+		return
+	}
+	return
+}
+func (self *MqClientManager) RegistProducer(producer *DefaultMQProducer) {
+	producer.producerService = service.NewDefaultProducerService(producer.producerGroup, producer.ProducerConfig, self.mqClient)
+	self.clientFactory.ProducerTable[producer.producerGroup] = producer
+	return
+}
+
+func (self *MqClientManager) resetConsumerOffset(topic, group string, offsetTable map[model.MessageQueue]int64) {
+	consumer := self.clientFactory.ConsumerTable[group]
+	if consumer == nil {
+		glog.Error("resetConsumerOffset beacuse consumer not online,group=", group)
+		return
+	}
+	consumer.resetOffset(offsetTable)
+}
+func (self *MqClientManager) RegistConsumer(consumer *DefaultMQPushConsumer) {
+	if self.defaultProducerService == nil {
+		self.defaultProducerService = service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, config.NewProducerConfig(), self.mqClient)
+	}
+	consumer.mqClient = self.mqClient
+	consumer.offsetStore = service.RemoteOffsetStoreInit(consumer.consumerGroup, self.mqClient)
+	self.clientFactory.ConsumerTable[consumer.consumerGroup] = consumer
+	consumer.rebalance = service.NewRebalance(consumer.consumerGroup, consumer.subscription, consumer.mqClient, consumer.offsetStore, consumer.ConsumerConfig)
+
+	fmt.Println(consumer.consumeMessageService)
+
+	consumer.consumeMessageService.Init(consumer.consumerGroup, self.mqClient, consumer.offsetStore, self.defaultProducerService, consumer.ConsumerConfig)
+	return
+}
+
+func (self *MqClientManager) Start() {
+	//self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat first
+	self.StartAllScheduledTask()
+}
+
+func (self MqClientManager) ShutDown() {
+
+}
+
+type ClientFactory struct {
+	ProducerTable map[string]*DefaultMQProducer     //group|RocketMQProducer
+	ConsumerTable map[string]*DefaultMQPushConsumer //group|Consumer
+}
+
+func ClientFactoryInit() (clientFactory *ClientFactory) {
+	clientFactory = &ClientFactory{}
+	clientFactory.ProducerTable = make(map[string]*DefaultMQProducer)
+	clientFactory.ConsumerTable = make(map[string]*DefaultMQPushConsumer)
+	return
+}
+
+//heart beat
+func (self MqClientManager) SendHeartbeatToAllBrokerWithLock() error {
+	heartbeatData := self.prepareHeartbeatData()
+	if len(heartbeatData.ConsumerDataSet) == 0 {
+		return errors.New("send heartbeat error")
+	}
+	self.mqClient.SendHeartbeatToAllBroker(heartbeatData)
+	return nil
+}
+
+//routeInfo
+func (self MqClientManager) UpdateTopicRouteInfoFromNameServer() {
+	var topicSet []string
+	for _, consumer := range self.clientFactory.ConsumerTable {
+		for key, _ := range consumer.subscription {
+			topicSet = append(topicSet, key)
+		}
+	}
+	topicSet = append(topicSet, self.mqClient.GetPublishTopicList()...)
+	for _, topic := range topicSet {
+		self.mqClient.UpdateTopicRouteInfoFromNameServer(topic)
+
+	}
+}
+
+func (self MqClientManager) prepareHeartbeatData() *model.HeartbeatData {
+	heartbeatData := new(model.HeartbeatData)
+	heartbeatData.ClientId = self.mqClient.GetClientId()
+	heartbeatData.ConsumerDataSet = make([]*model.ConsumerData, 0)
+	heartbeatData.ProducerDataSet = make([]*model.ProducerData, 0)
+	for group, consumer := range self.clientFactory.ConsumerTable {
+		consumerData := new(model.ConsumerData)
+		consumerData.GroupName = group
+		consumerData.ConsumeType = consumer.consumeType
+		consumerData.ConsumeFromWhere = consumer.ConsumerConfig.ConsumeFromWhere
+		consumerData.MessageModel = consumer.messageModel
+		consumerData.SubscriptionDataSet = consumer.Subscriptions()
+		consumerData.UnitMode = consumer.unitMode
+		heartbeatData.ConsumerDataSet = append(heartbeatData.ConsumerDataSet, consumerData)
+	}
+	for group := range self.clientFactory.ProducerTable {
+		producerData := new(model.ProducerData)
+		producerData.GroupName = group
+		heartbeatData.ProducerDataSet = append(heartbeatData.ProducerDataSet, producerData)
+	}
+	return heartbeatData
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_client_manager.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_client_manager.go b/rocketmq-go/mq_client_manager.go
deleted file mode 100644
index 731158f..0000000
--- a/rocketmq-go/mq_client_manager.go
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package rocketmq
-
-import (
-	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
-	"sync"
-	"time"
-)
-
-type MqClientManager struct {
-	clientFactory          *ClientFactory
-	rocketMqClient         service.RocketMqClient
-	pullMessageController  *PullMessageController
-	defaultProducerService RocketMQProducer //for send back message
-
-	rocketMqManagerLock sync.Mutex
-	//ClientId            string
-	BootTimestamp int64
-
-	NamesrvLock   sync.Mutex
-	HeartBeatLock sync.Mutex
-	//rebalanceControllr       *RebalanceController
-}
-
-type MqClientConfig struct {
-}
-
-func NewMqClientManager(clientConfig *MqClientConfig) (rocketMqManager *MqClientManager) {
-	rocketMqManager = &MqClientManager{}
-	rocketMqManager.BootTimestamp = time.Now().Unix()
-	rocketMqManager.clientFactory = clientFactoryInit()
-	//rocketMqManager.rocketMqClient =
-	//rocketMqManager.pullMessageController = NewPullMessageController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
-	//rocketMqManager.cleanExpireMsgController = NewCleanExpireMsgController(rocketMqManager.mqClient, rocketMqManager.clientFactory)
-	//rocketMqManager.rebalanceControllr = NewRebalanceController(rocketMqManager.clientFactory)
-
-	return
-}
-
-func (self *MqClientManager) RegisterProducer(producer *DefaultMQProducer) {
-	return
-}
-
-func (self *MqClientManager) RegisterConsumer(consumer RocketMQConsumer) {
-	// todo check config
-	//if (self.defaultProducerService == nil) {
-	//	self.defaultProducerService = service.NewDefaultProducerService(constant.CLIENT_INNER_PRODUCER_GROUP, mq_config.NewProducerConfig(), self.mqClient)
-	//}
-	return
-}
-
-func (self *MqClientManager) Start() {
-	//self.SendHeartbeatToAllBrokerWithLock()//we should send heartbeat first
-	self.startAllScheduledTask()
-}
-func (manager *MqClientManager) startAllScheduledTask() {
-
-}
-
-func clientFactoryInit() (clientFactory *ClientFactory) {
-	clientFactory = &ClientFactory{}
-	clientFactory.ProducerTable = make(map[string]RocketMQProducer)
-	clientFactory.ConsumerTable = make(map[string]RocketMQConsumer)
-	return
-}
-
-type ClientFactory struct {
-	ProducerTable map[string]RocketMQProducer //group|RocketMQProducer
-	ConsumerTable map[string]RocketMQConsumer //group|Consumer
-}
-
-type PullMessageController struct {
-	rocketMqClient service.RocketMqClient
-	clientFactory  *ClientFactory
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_consumer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_consumer.go b/rocketmq-go/mq_consumer.go
deleted file mode 100644
index 7112537..0000000
--- a/rocketmq-go/mq_consumer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package rocketmq
-
-import (
-	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
-	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
-)
-
-type RocketMQConsumer interface {
-}
-
-type MqConsumerConfig struct {
-}
-type DefaultMQPushConsumer struct {
-	offsetStore           service.OffsetStore //for consumer's offset
-	mqClient              service.RocketMqClient
-	rebalance             *service.Rebalance //Rebalance's impl depend on offsetStore
-	consumeMessageService service.ConsumeMessageService
-	consumerConfig        *MqConsumerConfig
-
-	consumerGroup string
-	//consumeFromWhere      string
-	consumeType  string
-	messageModel string
-	unitMode     bool
-
-	subscription    map[string]string   //topic|subExpression
-	subscriptionTag map[string][]string // we use it filter again
-	// 分配策略
-	pause bool //when reset offset we need pause
-}
-
-func NewDefaultMQPushConsumer(consumerGroup string, mqConsumerConfig *MqConsumerConfig) (defaultMQPushConsumer *DefaultMQPushConsumer) {
-	defaultMQPushConsumer = &DefaultMQPushConsumer{}
-	defaultMQPushConsumer.consumerConfig = mqConsumerConfig
-	return
-}
-
-func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) {
-	self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener)
-}
-func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
-	//self.subscription[topic] = subExpression
-	//if len(subExpression) == 0 || subExpression == "*" {
-	//	return
-	//}
-	//tags := strings.Split(subExpression, "||")
-	//tagsList := []string{}
-	//for _, tag := range tags {
-	//	t := strings.TrimSpace(tag)
-	//	if len(t) == 0 {
-	//		continue
-	//	}
-	//	tagsList = append(tagsList, t)
-	//}
-	//if len(tagsList) > 0 {
-	//	self.subscriptionTag[topic] = tagsList
-	//}
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_producer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_producer.go b/rocketmq-go/mq_producer.go
index d1a011b..098377d 100644
--- a/rocketmq-go/mq_producer.go
+++ b/rocketmq-go/mq_producer.go
@@ -1,31 +1,40 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
 package rocketmq
 
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+import ()
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+)
 
 type RocketMQProducer interface {
+	Send(message *model.Message) (sendResult *model.SendResult, err error)
+	SendWithTimeout(message *model.Message, timeout int) (sendResult *model.SendResult, err error)
+	//SendAsync(message *model.Message) (sendResult *model.SendResult,err error)
+	//SendAsyncWithTimeout(message *model.Message) (sendResult *model.SendResult,err error)
+	//SendOneWay(message *model.Message) (sendResult *model.SendResult,err error)
 }
+type DefaultMQProducer struct {
+	producerGroup  string
+	ProducerConfig *config.RocketMqProducerConfig
 
-type MqProducerConfig struct {
+	producerService service.ProducerService
 }
 
-type DefaultMQProducer struct {
-	producerGroup    string
-	mqProducerConfig *MqProducerConfig
-	producerService  service.ProducerService
+func NewDefaultMQProducer(producerGroup string) (rocketMQProducer *DefaultMQProducer) {
+	rocketMQProducer = &DefaultMQProducer{
+		producerGroup:  producerGroup,
+		ProducerConfig: config.NewProducerConfig(),
+	}
+	return
+}
+
+func (self *DefaultMQProducer) Send(message *model.Message) (sendResult *model.SendResult, err error) {
+	sendResult, err = self.producerService.SendDefaultImpl(message, constant.COMMUNICATIONMODE_SYNC, "", self.ProducerConfig.SendMsgTimeout)
+	return
+}
+func (self *DefaultMQProducer) SendWithTimeout(message *model.Message, timeout int64) (sendResult *model.SendResult, err error) {
+	sendResult, err = self.producerService.SendDefaultImpl(message, constant.COMMUNICATIONMODE_SYNC, "", timeout)
+	return
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/mq_push_consumer.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/mq_push_consumer.go b/rocketmq-go/mq_push_consumer.go
new file mode 100644
index 0000000..245bbe4
--- /dev/null
+++ b/rocketmq-go/mq_push_consumer.go
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+import (
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+	"github.com/golang/glog"
+	"strings"
+	"time"
+)
+
+type Consumer interface {
+	RegisterMessageListener(listener model.MessageListener)
+	Subscribe(topic string, subExpression string)
+}
+
+type DefaultMQPushConsumer struct {
+	consumerGroup string
+	//consumeFromWhere      string
+	consumeType  string
+	messageModel string
+	unitMode     bool
+
+	subscription          map[string]string   //topic|subExpression
+	subscriptionTag       map[string][]string // we use it filter again
+	offsetStore           service.OffsetStore
+	mqClient              service.RocketMqClient
+	rebalance             *service.Rebalance
+	pause                 bool //when reset offset we need pause
+	consumeMessageService service.ConsumeMessageService
+	ConsumerConfig        *config.RocketMqConsumerConfig
+}
+
+func NewDefaultMQPushConsumer(consumerGroup string) (defaultMQPushConsumer *DefaultMQPushConsumer) {
+	defaultMQPushConsumer = &DefaultMQPushConsumer{
+		consumerGroup: consumerGroup,
+		//consumeFromWhere:"CONSUME_FROM_FIRST_OFFSET", //todo  use config
+		consumeType:  "CONSUME_PASSIVELY",
+		messageModel: "CLUSTERING",
+		pause:        false}
+	defaultMQPushConsumer.subscription = make(map[string]string)
+	defaultMQPushConsumer.subscriptionTag = make(map[string][]string)
+	defaultMQPushConsumer.ConsumerConfig = config.NewRocketMqConsumerConfig()
+	return
+}
+func (self *DefaultMQPushConsumer) Subscribe(topic string, subExpression string) {
+	self.subscription[topic] = subExpression
+	if len(subExpression) == 0 || subExpression == "*" {
+		return
+	}
+	tags := strings.Split(subExpression, "||")
+	tagsList := []string{}
+	for _, tag := range tags {
+		t := strings.TrimSpace(tag)
+		if len(t) == 0 {
+			continue
+		}
+		tagsList = append(tagsList, t)
+	}
+	if len(tagsList) > 0 {
+		self.subscriptionTag[topic] = tagsList
+	}
+}
+
+func (self *DefaultMQPushConsumer) RegisterMessageListener(messageListener model.MessageListener) {
+	self.consumeMessageService = service.NewConsumeMessageConcurrentlyServiceImpl(messageListener)
+}
+
+func (self *DefaultMQPushConsumer) resetOffset(offsetTable map[model.MessageQueue]int64) {
+	self.pause = true
+	glog.Info("now we ClearProcessQueue 0 ", offsetTable)
+
+	self.rebalance.ClearProcessQueue(offsetTable)
+	glog.Info("now we ClearProcessQueue", offsetTable)
+	go func() {
+		waitTime := time.NewTimer(10 * time.Second)
+		<-waitTime.C
+		defer func() {
+			self.pause = false
+			self.rebalance.DoRebalance()
+		}()
+
+		for messageQueue, offset := range offsetTable {
+			processQueue := self.rebalance.GetProcessQueue(messageQueue)
+			if processQueue == nil || offset < 0 {
+				continue
+			}
+			glog.Info("now we UpdateOffset", messageQueue, offset)
+			self.offsetStore.UpdateOffset(&messageQueue, offset, false)
+			self.rebalance.RemoveProcessQueue(&messageQueue)
+		}
+	}()
+}
+
+func (self *DefaultMQPushConsumer) Subscriptions() []*model.SubscriptionData {
+	subscriptions := make([]*model.SubscriptionData, 0)
+	for _, subscription := range self.rebalance.SubscriptionInner {
+		subscriptions = append(subscriptions, subscription)
+	}
+	return subscriptions
+}
+
+func (self *DefaultMQPushConsumer) CleanExpireMsg() {
+	nowTime := int64(time.Now().UnixNano()) / 1000000 //will cause nowTime - consumeStartTime <0 ,but no matter
+	messageQueueList, processQueueList := self.rebalance.GetProcessQueueList()
+	for messageQueueIndex, processQueue := range processQueueList {
+		loop := processQueue.GetMsgCount()
+		if loop > 16 {
+			loop = 16
+		}
+		for i := 0; i < loop; i++ {
+			_, message := processQueue.GetMinMessageInTree()
+			if message == nil {
+				break
+			}
+			consumeStartTime := message.GetConsumeStartTime()
+			maxDiffTime := self.ConsumerConfig.ConsumeTimeout * 1000 * 60
+			//maxDiffTime := self.ConsumerConfig.ConsumeTimeout
+			glog.V(2).Info("look message.GetConsumeStartTime()", consumeStartTime)
+			glog.V(2).Infof("look diff %d  %d", nowTime-consumeStartTime, maxDiffTime)
+			//if(nowTime - consumeStartTime <0){
+			//	panic("nowTime - consumeStartTime <0")
+			//}
+			if nowTime-consumeStartTime < maxDiffTime {
+				break
+			}
+			glog.Info("look now we send expire message back", message.Topic, message.MsgId)
+			err := self.consumeMessageService.SendMessageBack(message, 3, messageQueueList[messageQueueIndex].BrokerName)
+			if err != nil {
+				glog.Error("op=send_expire_message_back_error", err)
+				continue
+			}
+			processQueue.DeleteExpireMsg(int(message.QueueOffset))
+		}
+	}
+	return
+}


Mime
View raw message