rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] 01/03: Add pull schedual service
Date Wed, 17 Jul 2019 06:35:21 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 2aae40f9256cb0a6a2b628f00df7dbe9718a963d
Author: duhenglucky <duheng0522@gmail.com>
AuthorDate: Thu Jun 20 23:32:33 2019 +0800

    Add pull schedual service
---
 .../consumer/MQPullConsumerScheduleService.java    |   2 +-
 .../client/impl/consumer/AssignedMessageQueue.java | 157 +++++++++++++
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |   6 +-
 .../impl/consumer/LiteMQPullConsumerImpl.java      | 255 +++++++++++++++++++++
 4 files changed, 416 insertions(+), 4 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
index 44b864e..685f4c8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -151,7 +151,7 @@ public class MQPullConsumerScheduleService {
         }
     }
 
-    class PullTaskImpl implements Runnable {
+    public class PullTaskImpl implements Runnable {
         private final MessageQueue messageQueue;
         private volatile boolean cancelled = false;
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
new file mode 100644
index 0000000..e9623a8
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class AssignedMessageQueue {
+
+    private ConcurrentHashMap<MessageQueue, MessageQueueStat> assignedMessageQueueState;
+
+    public AssignedMessageQueue() {
+        assignedMessageQueueState = new ConcurrentHashMap<MessageQueue, MessageQueueStat>();
+    }
+
+    public boolean isPaused(MessageQueue messageQueue) {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
+            return messageQueueStat.isPaused();
+        }
+        return false;
+    }
+
+    public void pause(Collection<MessageQueue> messageQueues) {
+        for (MessageQueue messageQueue : messageQueues) {
+            MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+            if (assignedMessageQueueState.get(messageQueue) != null) {
+                messageQueueStat.setPaused(true);
+            }
+        }
+    }
+
+    public void resume(Collection<MessageQueue> messageQueueCollection) {
+        for (MessageQueue messageQueue : messageQueueCollection) {
+            MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+            if (assignedMessageQueueState.get(messageQueue) != null) {
+                messageQueueStat.setPaused(false);
+            }
+        }
+    }
+
+    public long getNextOffset(MessageQueue messageQueue) throws MQClientException {
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (assignedMessageQueueState.get(messageQueue) != null) {
+            return messageQueueStat.getNextOffset();
+        }
+        return -1;
+    }
+
+    public void updateNextOffset(MessageQueue messageQueue, long offset) throws MQClientException
{
+        MessageQueueStat messageQueueStat = assignedMessageQueueState.get(messageQueue);
+        if (messageQueue == null) {
+            messageQueueStat = new MessageQueueStat(messageQueue, offset);
+            assignedMessageQueueState.putIfAbsent(messageQueue, messageQueueStat);
+        }
+        assignedMessageQueueState.get(messageQueue).setNextOffset(offset);
+    }
+
+    public void updateAssignedMessageQueue(Set<MessageQueue> assigned) {
+        synchronized (this.assignedMessageQueueState) {
+            Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
+                if (!assigned.contains(next.getKey())) {
+                    it.remove();
+                }
+            }
+
+            for (MessageQueue messageQueue : assigned) {
+                if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
+                    MessageQueueStat messageQueueStat = new MessageQueueStat(messageQueue);
+                    this.assignedMessageQueueState.put(messageQueue, messageQueueStat);
+                }
+            }
+        }
+    }
+
+    public void removeAssignedMessageQueue(String topic) {
+        synchronized (this.assignedMessageQueueState) {
+            Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = this.assignedMessageQueueState.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
+                if (next.getKey().getTopic().equals(topic)) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public Map<MessageQueue, Long> getNeedCommitOffsets() {
+        Map<MessageQueue, Long> map = new HashMap<MessageQueue, Long>();
+        Set<Map.Entry<MessageQueue, MessageQueueStat>> entries = this.assignedMessageQueueState.entrySet();
+        for (Map.Entry<MessageQueue, MessageQueueStat> entry : entries) {
+            map.put(entry.getKey(), entry.getValue().getNextOffset());
+        }
+        return map;
+    }
+
+    public class MessageQueueStat {
+        private MessageQueue messageQueue;
+        private boolean paused = false;
+        private long nextOffset = -1;
+
+        public MessageQueueStat(MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+        public MessageQueueStat(MessageQueue messageQueue, long nextOffset) {
+            this.messageQueue = messageQueue;
+            this.nextOffset = nextOffset;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+
+        public void setMessageQueue(MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+        public boolean isPaused() {
+            return paused;
+        }
+
+        public void setPaused(boolean paused) {
+            this.paused = paused;
+        }
+
+        public long getNextOffset() {
+            return nextOffset;
+        }
+
+        public void setNextOffset(long nextOffset) {
+            this.nextOffset = nextOffset;
+        }
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 8aff14b..bc0884a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -68,16 +68,16 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 
 public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     private final InternalLogger log = ClientLogger.getLog();
-    private final DefaultMQPullConsumer defaultMQPullConsumer;
+    protected final DefaultMQPullConsumer defaultMQPullConsumer;
     private final long consumerStartTimestamp = System.currentTimeMillis();
     private final RPCHook rpcHook;
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
     private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
     private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
-    private MQClientInstance mQClientFactory;
+    protected MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private OffsetStore offsetStore;
-    private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
+    protected RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
 
     public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final
RPCHook rpcHook) {
         this.defaultMQPullConsumer = defaultMQPullConsumer;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
new file mode 100644
index 0000000..7332818
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.RPCHook;
+
+public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
+    private final InternalLogger log = ClientLogger.getLog();
+
+    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
+        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+
+    private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
+
+    private List<ConsumeRequest> allConsumed = new ArrayList<ConsumeRequest>(256);
+
+    private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
+    ;
+
+    private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+    public LiteMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final
RPCHook rpcHook) {
+        super(defaultMQPullConsumer, rpcHook);
+    }
+
+    public void updateAssignedMessageQueue(String topic, Set<MessageQueue> assignedMessageQueue)
{
+        this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
+        updatePullTask(topic, assignedMessageQueue);
+    }
+
+    public void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
+        Iterator<Map.Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
+            if (next.getKey().getTopic().equals(topic)) {
+                if (!mqNewSet.contains(next.getKey())) {
+                    next.getValue().setCancelled(true);
+                    it.remove();
+                }
+            }
+        }
+
+        for (MessageQueue messageQueue : mqNewSet) {
+            if (!this.taskTable.containsKey(messageQueue)) {
+                PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
+                this.taskTable.put(messageQueue, pullTask);
+                this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    class MessageQueueListenerImpl implements MessageQueueListener {
+        @Override
+        public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue>
mqDivided) {
+            MessageModel messageModel = defaultMQPullConsumer.getMessageModel();
+            switch (messageModel) {
+                case BROADCASTING:
+                    updateAssignedMessageQueue(topic, mqAll);
+                    break;
+                case CLUSTERING:
+                    updateAssignedMessageQueue(topic, mqDivided);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    int nextPullBatchNums() {
+        return Math.min(10, consumeRequestCache.remainingCapacity());
+    }
+
+    @Override
+    public synchronized void start() throws MQClientException {
+        super.start();
+        final String group = this.defaultMQPullConsumer.getConsumerGroup();
+        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+            10, //this.pullThreadNums,
+            new ThreadFactoryImpl("PullMsgThread-" + group)
+        );
+        this.defaultMQPullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+    }
+
+    public void subscribe(String topic, String subExpression) throws MQClientException {
+        try {
+            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(defaultMQPullConsumer.getConsumerGroup(),
+                topic, subExpression);
+            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+            if (this.mQClientFactory != null) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscription exception", e);
+        }
+    }
+
+    void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+        try {
+            assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
+        } catch (MQClientException e) {
+            log.error("A error occurred in update consume: {} offset process.", remoteQueue,
e);
+        }
+    }
+
+    private void addToConsumed(ConsumeRequest consumeRequest) {
+        synchronized (this.allConsumed) {
+            allConsumed.add(consumeRequest);
+        }
+    }
+
+    void submitConsumeRequest(ConsumeRequest consumeRequest) {
+        try {
+            consumeRequestCache.put(consumeRequest);
+            addToConsumed(consumeRequest);
+        } catch (InterruptedException ex) {
+            log.error("Submit consumeRequest error", ex);
+        }
+    }
+
+    long nextPullOffset(MessageQueue remoteQueue) {
+        long offset = -1;
+        try {
+            offset = assignedMessageQueue.getNextOffset(remoteQueue);
+            if (offset == -1) {
+                offset = this.defaultMQPullConsumer.fetchConsumeOffset(remoteQueue, false);
+                assignedMessageQueue.updateNextOffset(remoteQueue, offset);
+            }
+        } catch (MQClientException e) {
+            log.error("An error occurred in fetch consume offset process.", e);
+        }
+        return offset;
+    }
+
+    public class PullTaskImpl implements Runnable {
+        private final MessageQueue messageQueue;
+        private volatile boolean cancelled = false;
+
+        public PullTaskImpl(final MessageQueue messageQueue) {
+            this.messageQueue = messageQueue;
+        }
+
+        @Override
+        public void run() {
+            String topic = this.messageQueue.getTopic();
+            if (!this.isCancelled()) {
+                if (assignedMessageQueue.isPaused(messageQueue)) {
+                    log.debug("Message Queue: {} has been paused!", messageQueue);
+                    return;
+                }
+                SubscriptionData subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
+                long offset = nextPullOffset(messageQueue);
+                try {
+                    PullResult pullResult = defaultMQPullConsumer.pull(messageQueue, subscriptionData.getSubString(),
offset, nextPullBatchNums());
+                    ProcessQueue processQueue = rebalanceImpl.getProcessQueueTable().get(messageQueue);
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            if (processQueue != null) {
+                                processQueue.putMessage(pullResult.getMsgFoundList());
+                                submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(),
messageQueue, processQueue));
+                            }
+                            break;
+                        default:
+                            break;
+                    }
+                    updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
+                } catch (Exception e) {
+                    log.error("An error occurred in pull message process.", e);
+                }
+            }
+        }
+
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        public void setCancelled(boolean cancelled) {
+            this.cancelled = cancelled;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+    }
+
+    public class ConsumeRequest {
+        private final List<MessageExt> messageExts;
+        private final MessageQueue messageQueue;
+        private final ProcessQueue processQueue;
+        private long startConsumeTimeMillis;
+
+        public ConsumeRequest(final List<MessageExt> messageExts, final MessageQueue
messageQueue,
+            final ProcessQueue processQueue) {
+            this.messageExts = messageExts;
+            this.messageQueue = messageQueue;
+            this.processQueue = processQueue;
+        }
+
+        public List<MessageExt> getMessageExts() {
+            return messageExts;
+        }
+
+        public MessageQueue getMessageQueue() {
+            return messageQueue;
+        }
+
+        public ProcessQueue getProcessQueue() {
+            return processQueue;
+        }
+
+        public long getStartConsumeTimeMillis() {
+            return startConsumeTimeMillis;
+        }
+
+        public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
+            this.startConsumeTimeMillis = startConsumeTimeMillis;
+        }
+    }
+}


Mime
View raw message