rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] branch develop updated: feat(pullconsumer) add pull sys flag (#1658)
Date Fri, 27 Dec 2019 12:11:53 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f1b856a  feat(pullconsumer) add pull sys flag (#1658)
f1b856a is described below

commit f1b856a091a3ee441fb0fe0b5a1d5892b7fafad0
Author: Heng Du <duhengforever@apache.org>
AuthorDate: Fri Dec 27 20:11:43 2019 +0800

    feat(pullconsumer) add pull sys flag (#1658)
    
    feat(lite_pull_consumer) add support for consume from where
    
    test(pull_consumer) add unit test for compute from where
    
    test(PullSysFlag) add lite pull sys flag unit test
    
    feat(pull_consumer) add the verification of ConsumerFromWhere value
---
 .../client/consumer/DefaultLitePullConsumer.java   | 31 ++++++++++
 .../client/impl/consumer/AssignedMessageQueue.java |  2 +-
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 17 +++---
 .../client/impl/consumer/RebalanceImpl.java        |  5 --
 .../impl/consumer/RebalanceLitePullImpl.java       | 71 ++++++++++++++++++++--
 .../consumer/DefaultLitePullConsumerTest.java      | 45 ++++++++++++++
 .../rocketmq/common/sysflag/PullSysFlag.java       | 16 +++++
 .../rocketmq/common/sysflag/PullSysFlagTest.java   | 32 +++++-----
 .../example/simple/LitePullConsumerSubscribe.java  |  4 +-
 9 files changed, 184 insertions(+), 39 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 52060ea..782d29b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -24,6 +24,8 @@ import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
@@ -138,6 +140,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements
LitePullCon
      */
     private long topicMetadataCheckIntervalMillis = 30 * 1000;
 
+    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+
+    /**
+     * Backtracking consumption time with second precision. Time format is 20131223171201<br>
Implying Seventeen twelve
+     * and 01 seconds on December 23, 2013 year<br> Default backtracking consumption
time Half an hour ago.
+     */
+    private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis()
- (1000 * 60 * 30));
+
     /**
      * Default constructor.
      */
@@ -431,4 +441,25 @@ public class DefaultLitePullConsumer extends ClientConfig implements
LitePullCon
     public void setConsumerGroup(String consumerGroup) {
         this.consumerGroup = consumerGroup;
     }
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        if (consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
+            && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
+            && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
+            throw new RuntimeException("Invalid ConsumeFromWhere Value", null);
+        }
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+    public String getConsumeTimestamp() {
+        return consumeTimestamp;
+    }
+
+    public void setConsumeTimestamp(String consumeTimestamp) {
+        this.consumeTimestamp = consumeTimestamp;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index c0c6f60..0b090e3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -90,7 +90,7 @@ public class AssignedMessageQueue {
         }
     }
 
-    public long getConusmerOffset(MessageQueue messageQueue) {
+    public long getConsumerOffset(MessageQueue messageQueue) {
         MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue);
         if (messageQueueState != null) {
             return messageQueueState.getConsumeOffset();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index f44eea7..cd4d4cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -597,7 +597,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     public synchronized void commitSync() {
         try {
             for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
-                long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
+                long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
                 if (consumerOffset != -1) {
                     ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
                     long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY);
@@ -618,7 +618,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     private synchronized void commitAll() {
         try {
             for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
-                long consumerOffset = assignedMessageQueue.getConusmerOffset(messageQueue);
+                long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
                 if (consumerOffset != -1) {
                     ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
                     long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY);
@@ -650,9 +650,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         }
     }
 
-    private long fetchConsumeOffset(MessageQueue messageQueue, boolean fromStore) {
+    private long fetchConsumeOffset(MessageQueue messageQueue) {
         checkServiceState();
-        return this.offsetStore.readOffset(messageQueue, fromStore ? ReadOffsetType.READ_FROM_STORE
: ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+        long offset = this.rebalanceImpl.computePullFromWhere(messageQueue);
+        return offset;
     }
 
     public long committed(MessageQueue messageQueue) throws MQClientException {
@@ -685,10 +686,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         } else {
             offset = assignedMessageQueue.getPullOffset(messageQueue);
             if (offset == -1) {
-                offset = fetchConsumeOffset(messageQueue, false);
-                if (offset == -1 && defaultLitePullConsumer.getMessageModel() ==
MessageModel.BROADCASTING) {
-                    offset = 0;
-                }
+                offset = fetchConsumeOffset(messageQueue);
             }
         }
         return offset;
@@ -779,7 +777,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                     }
                     PullResult pullResult = pull(messageQueue, subscriptionData, offset,
nextPullBatchSize());
 
-
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
                             final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
@@ -850,7 +847,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
             throw new MQClientException("maxNums <= 0", null);
         }
 
-        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
+        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false, true);
 
         long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend()
: timeout;
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 146fce6..b8972a9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -40,11 +40,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
-/**
- * This class will be removed in 2022, and a better implementation {@link RebalanceLitePullImpl}
is recommend to use
- * in the scenario of actively pulling messages.
- */
-@Deprecated
 public abstract class RebalanceImpl {
     protected static final InternalLogger log = ClientLogger.getLog();
     protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new
ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 0b8ec67..9d1ea74 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -16,16 +16,20 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
-import java.util.List;
-import java.util.Set;
-
 public class RebalanceLitePullImpl extends RebalanceImpl {
 
     private final DefaultLitePullConsumerImpl litePullConsumerImpl;
@@ -72,7 +76,66 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
 
     @Override
     public long computePullFromWhere(MessageQueue mq) {
-        return 0;
+        ConsumeFromWhere consumeFromWhere = litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeFromWhere();
+        long result = -1;
+        switch (consumeFromWhere) {
+            case CONSUME_FROM_LAST_OFFSET: {
+                long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+                if (lastOffset >= 0) {
+                    result = lastOffset;
+                } else if (-1 == lastOffset) {
+                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // First
start, no offset
+                        result = 0L;
+                    } else {
+                        try {
+                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+                        } catch (MQClientException e) {
+                            result = -1;
+                        }
+                    }
+                } else {
+                    result = -1;
+                }
+                break;
+            }
+            case CONSUME_FROM_FIRST_OFFSET: {
+                long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+                if (lastOffset >= 0) {
+                    result = lastOffset;
+                } else if (-1 == lastOffset) {
+                    result = 0L;
+                } else {
+                    result = -1;
+                }
+                break;
+            }
+            case CONSUME_FROM_TIMESTAMP: {
+                long lastOffset = litePullConsumerImpl.getOffsetStore().readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+                if (lastOffset >= 0) {
+                    result = lastOffset;
+                } else if (-1 == lastOffset) {
+                    if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+                        try {
+                            result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+                        } catch (MQClientException e) {
+                            result = -1;
+                        }
+                    } else {
+                        try {
+                            long timestamp = UtilAll.parseDate(this.litePullConsumerImpl.getDefaultLitePullConsumer().getConsumeTimestamp(),
+                                UtilAll.YYYYMMDDHHMMSS).getTime();
+                            result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq,
timestamp);
+                        } catch (MQClientException e) {
+                            result = -1;
+                        }
+                    }
+                } else {
+                    result = -1;
+                }
+                break;
+            }
+        }
+        return result;
     }
 
     @Override
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index d2cb057..cc8d5e2 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
 import org.apache.rocketmq.client.impl.consumer.RebalanceService;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -419,6 +420,50 @@ public class DefaultLitePullConsumerTest {
 
     }
 
+    @Test
+    public void testComputePullFromWhereReturnedNotFound() throws Exception{
+        DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+        defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        MessageQueue messageQueue = createMessageQueue();
+        when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
+        long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+        assertThat(offset).isEqualTo(0);
+    }
+
+    @Test
+    public void testComputePullFromWhereReturned() throws Exception{
+        DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+        defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        MessageQueue messageQueue = createMessageQueue();
+        when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(100L);
+        long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+        assertThat(offset).isEqualTo(100);
+    }
+
+
+    @Test
+    public void testComputePullFromLast() throws Exception{
+        DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+        defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        MessageQueue messageQueue = createMessageQueue();
+        when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
+        when(mQClientFactory.getMQAdminImpl().maxOffset(any(MessageQueue.class))).thenReturn(100L);
+        long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+        assertThat(offset).isEqualTo(100);
+    }
+
+    @Test
+    public void testComputePullByTimeStamp() throws Exception{
+        DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+        defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
+        defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
+        MessageQueue messageQueue = createMessageQueue();
+        when(offsetStore.readOffset(any(MessageQueue.class), any(ReadOffsetType.class))).thenReturn(-1L);
+        when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),anyLong())).thenReturn(100L);
+        long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+        assertThat(offset).isEqualTo(100);
+    }
+
     private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws
Exception {
 
         Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
index d476a35..ce7558f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -21,6 +21,7 @@ public class PullSysFlag {
     private final static int FLAG_SUSPEND = 0x1 << 1;
     private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
     private final static int FLAG_CLASS_FILTER = 0x1 << 3;
+    private final static int FLAG_LITE_PULL_MESSAGE = 0x1 << 4;
 
     public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
         final boolean subscription, final boolean classFilter) {
@@ -45,6 +46,17 @@ public class PullSysFlag {
         return flag;
     }
 
+    public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
+        final boolean subscription, final boolean classFilter, final boolean litePull) {
+        int flag = buildSysFlag(commitOffset, suspend, subscription, classFilter);
+
+        if (litePull) {
+            flag |= FLAG_LITE_PULL_MESSAGE;
+        }
+
+        return flag;
+    }
+
     public static int clearCommitOffsetFlag(final int sysFlag) {
         return sysFlag & (~FLAG_COMMIT_OFFSET);
     }
@@ -64,4 +76,8 @@ public class PullSysFlag {
     public static boolean hasClassFilterFlag(final int sysFlag) {
         return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
     }
+
+    public static boolean hasLitePullFlag(final int sysFlag) {
+        return (sysFlag & FLAG_LITE_PULL_MESSAGE) == FLAG_LITE_PULL_MESSAGE;
+    }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
b/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java
similarity index 50%
copy from example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
copy to common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java
index 1bfe49d..60e1812 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
+++ b/common/src/test/java/org/apache/rocketmq/common/sysflag/PullSysFlagTest.java
@@ -14,27 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.example.simple;
+package org.apache.rocketmq.common.sysflag;
 
-import java.util.List;
-import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Test;
 
-public class LitePullConsumerSubscribe {
+import static org.assertj.core.api.Assertions.assertThat;
 
-    public static volatile boolean running = true;
+public class PullSysFlagTest {
 
-    public static void main(String[] args) throws Exception {
-        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
-        litePullConsumer.subscribe("TopicTest", "*");
-        litePullConsumer.start();
-        try {
-            while (running) {
-                List<MessageExt> messageExts = litePullConsumer.poll();
-                System.out.printf("%s%n", messageExts);
-            }
-        } finally {
-            litePullConsumer.shutdown();
-        }
+    @Test
+    public void testLitePullFlag() {
+        int flag = PullSysFlag.buildSysFlag(false, false, false, false, true);
+        assertThat(PullSysFlag.hasLitePullFlag(flag)).isTrue();
+    }
+
+    @Test
+    public void testLitePullFlagFalse() {
+        int flag = PullSysFlag.buildSysFlag(false, false, false, false, false);
+        assertThat(PullSysFlag.hasLitePullFlag(flag)).isFalse();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
index 1bfe49d..e5c1a61 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerSubscribe.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.example.simple;
 
 import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 
 public class LitePullConsumerSubscribe {
@@ -25,7 +26,8 @@ public class LitePullConsumerSubscribe {
     public static volatile boolean running = true;
 
     public static void main(String[] args) throws Exception {
-        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
+        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
+        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         litePullConsumer.subscribe("TopicTest", "*");
         litePullConsumer.start();
         try {


Mime
View raw message