rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-streams] branch main updated: a runnable window example reading data from rocketmq. (#54)
Date Tue, 07 Sep 2021 01:37:21 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new a897b47  a runnable window example reading data from rocketmq. (#54)
a897b47 is described below

commit a897b47f801457220bb99eb35fb0717cd896a692
Author: Ni Ze <31175234+ni-ze@users.noreply.github.com>
AuthorDate: Tue Sep 7 09:37:16 2021 +0800

    a runnable window example reading data from rocketmq. (#54)
    
    Co-authored-by: nize <unclerangoo@gmail.com>
---
 .../rocketmq/streams/RocketMQChannelBuilder.java   |  36 ++--
 .../apache/rocketmq/streams/sink/RocketMQSink.java | 111 +++++++------
 .../rocketmq/streams/source/RocketMQSource.java    |   2 +-
 .../streams/client/source/DataStreamSource.java    |   4 +-
 .../streams/client/transform/DataStream.java       |   2 +-
 .../common/channel/impl/OutputPrintChannel.java    |   2 +-
 .../channel/sinkcache/impl/MessageCache.java       |   1 -
 .../topology/stages/AbstractWindowStage.java       |  13 +-
 .../rocketmqsource/RocketMQSourceExample2.java     |  27 ++-
 ...ceExample2.java => RocketMQSourceExample3.java} |  24 ++-
 ...SourceExample2.java => RocketmqWindowTest.java} |  31 +++-
 .../streams/window/operator/AbstractWindow.java    |   3 +-
 .../window/shuffle/AbstractSystemChannel.java      |  11 +-
 .../streams/window/shuffle/ShuffleCache.java       |   4 +-
 .../streams/window/shuffle/ShuffleChannel.java     | 185 ++++++++++-----------
 .../rocketmq/streams/window/sqlcache/SQLCache.java | 110 ++++++------
 .../streams/window/state/impl/WindowValue.java     |  13 +-
 17 files changed, 300 insertions(+), 279 deletions(-)

diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
index 6014cf3..012a06e 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.streams;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.auto.service.AutoService;
+
 import java.util.Properties;
+
 import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
@@ -31,32 +33,32 @@ import org.apache.rocketmq.streams.sink.RocketMQSink;
 import org.apache.rocketmq.streams.source.RocketMQSource;
 
 @AutoService(IChannelBuilder.class)
-@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource",name="metaq")
+@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource", name = "metaq")
 public class RocketMQChannelBuilder extends AbstractSupportShuffleChannelBuilder {
     public static final String TYPE = "rocketmq";
 
     @Override
     public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
 
-        RocketMQSource rocketMQSource = (RocketMQSource) ConfigurableUtil.create(RocketMQSource.class.getName(),namespace,name,createFormatProperty(properties),null);
+        RocketMQSource rocketMQSource = (RocketMQSource) ConfigurableUtil.create(RocketMQSource.class.getName(), namespace, name, createFormatProperty(properties), null);
         return rocketMQSource;
     }
 
-    protected JSONObject createFormatProperty(Properties properties){
-        JSONObject formatProperties=new JSONObject();
-        for(Object object:properties.keySet()){
-            String key=(String)object;
+    protected JSONObject createFormatProperty(Properties properties) {
+        JSONObject formatProperties = new JSONObject();
+        for (Object object : properties.keySet()) {
+            String key = (String) object;
             if ("type".equals(key)) {
                 continue;
             }
-            formatProperties.put(key,properties.getProperty(key));
+            formatProperties.put(key, properties.getProperty(key));
         }
-        IChannelBuilder.formatPropertiesName(formatProperties,properties,"topic","topic");
-        IChannelBuilder.formatPropertiesName(formatProperties,properties,"tags","tag");
-        IChannelBuilder.formatPropertiesName(formatProperties,properties,"maxThread","thread.max.count");
-        IChannelBuilder.formatPropertiesName(formatProperties,properties,"pullIntervalMs","pullIntervalMs");
-        IChannelBuilder.formatPropertiesName(formatProperties,properties,"offsetTime","offsetTime");
-        IChannelBuilder.formatPropertiesName(formatProperties,properties,"namesrvAddr","namesrvAddr");
+        IChannelBuilder.formatPropertiesName(formatProperties, properties, "topic", "topic");
+        IChannelBuilder.formatPropertiesName(formatProperties, properties, "tags", "tag");
+        IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
+        IChannelBuilder.formatPropertiesName(formatProperties, properties, "pullIntervalMs", "pullIntervalMs");
+        IChannelBuilder.formatPropertiesName(formatProperties, properties, "offsetTime", "offsetTime");
+        IChannelBuilder.formatPropertiesName(formatProperties, properties, "namesrvAddr", "namesrvAddr");
         if (properties.getProperty("group") != null) {
             String group = properties.getProperty("group");
             if (group.startsWith("GID_")) {
@@ -78,16 +80,16 @@ public class RocketMQChannelBuilder extends AbstractSupportShuffleChannelBuilder
 
     @Override
     public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
-        RocketMQSink rocketMQSink = (RocketMQSink) ConfigurableUtil.create(RocketMQSink.class.getName(),namespace,name,createFormatProperty(properties),null);
+        RocketMQSink rocketMQSink = (RocketMQSink) ConfigurableUtil.create(RocketMQSink.class.getName(), namespace, name, createFormatProperty(properties), null);
         return rocketMQSink;
     }
 
     @Override
     public ISink createBySource(ISource pipelineSource) {
-        RocketMQSource source = (RocketMQSource)pipelineSource;
-        String topic = source.getTopic();
+        RocketMQSource source = (RocketMQSource) pipelineSource;
         RocketMQSink sink = new RocketMQSink();
-        sink.setTopic(topic);
+        sink.setNamesrvAddr(source.getNamesrvAddr());
+        sink.setTopic(source.getTopic());
         sink.setTags(source.getTags());
         return sink;
     }
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
index 773633e..a8293ec 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -44,21 +45,27 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 
 public class RocketMQSink extends AbstractSupportShuffleSink {
 
-    protected static final Log LOG = LogFactory.getLog(RocketMQSink.class);
+    private static final Log LOG = LogFactory.getLog(RocketMQSink.class);
     @ENVDependence
-    protected String tags = "*";
+    private String tags = "*";
 
-    protected String topic;
-    protected String groupName;
+    private String topic;
+    private String groupName;
 
-    private transient List<DefaultMQPushConsumer> consumers=new ArrayList<>();
-    protected transient DefaultMQProducer producer;
+    private transient List<DefaultMQPushConsumer> consumers = new ArrayList<>();
+    private transient DefaultMQProducer producer;
 
-    protected Long pullIntervalMs;
-    protected String namesrvAddr;
+    private Long pullIntervalMs;
+    private String namesrvAddr;
 
+    public RocketMQSink() {
+    }
 
-    public RocketMQSink(){}
+    public RocketMQSink(String namesrvAddr, String topic, String groupName) {
+        this.namesrvAddr = namesrvAddr;
+        this.topic = topic;
+        this.groupName = groupName;
+    }
 
 
     @Override
@@ -81,45 +88,45 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
         initProducer();
 
         try {
-            Map<String,List<Message>> msgsByQueueId=new HashMap<>();// group by queueId, if the message not contains queue info ,the set default string as default queueId
-            Map<String, MessageQueue> messageQueueMap=new HashMap<>();//if has queue id in message, save the map for queueid 2 messagequeeue
-            String defaultQueueId="<null>";//message is not contains queue ,use default
-            for(IMessage msg:messages){
+            Map<String, List<Message>> msgsByQueueId = new HashMap<>();// group by queueId, if the message not contains queue info ,the set default string as default queueId
+            Map<String, MessageQueue> messageQueueMap = new HashMap<>();//if has queue id in message, save the map for queueid 2 messagequeeue
+            String defaultQueueId = "<null>";//message is not contains queue ,use default
+            for (IMessage msg : messages) {
                 ISplit<RocketMQMessageQueue, MessageQueue> channelQueue = getSplit(msg);
-                String queueId=defaultQueueId;
+                String queueId = defaultQueueId;
                 if (channelQueue != null) {
-                    queueId=channelQueue.getQueueId();
-                    RocketMQMessageQueue metaqMessageQueue=(RocketMQMessageQueue)channelQueue;
-                    messageQueueMap.put(queueId,metaqMessageQueue.getQueue());
+                    queueId = channelQueue.getQueueId();
+                    RocketMQMessageQueue metaqMessageQueue = (RocketMQMessageQueue) channelQueue;
+                    messageQueueMap.put(queueId, metaqMessageQueue.getQueue());
                 }
-                List<Message> messageList=msgsByQueueId.get(queueId);
-                if(messageList==null){
-                    messageList=new ArrayList<>();
-                    msgsByQueueId.put(queueId,messageList);
+                List<Message> messageList = msgsByQueueId.get(queueId);
+                if (messageList == null) {
+                    messageList = new ArrayList<>();
+                    msgsByQueueId.put(queueId, messageList);
                 }
                 messageList.add(new Message(topic, tags, null, msg.getMessageBody().toJSONString().getBytes("UTF-8")));
             }
-            List<Message> messageList=msgsByQueueId.get(defaultQueueId);
-            if(messageList!=null){
-                for(Message message:messageList){
+            List<Message> messageList = msgsByQueueId.get(defaultQueueId);
+            if (messageList != null) {
+                for (Message message : messageList) {
                     producer.sendOneway(message);
                 }
                 messageQueueMap.remove(defaultQueueId);
             }
-            if(messageQueueMap.size()<=0){
+            if (messageQueueMap.size() <= 0) {
                 return true;
             }
-            for(String queueId:msgsByQueueId.keySet()){
-                messageList=msgsByQueueId.get(queueId);
-                for(Message message:messageList){
-                    MessageQueue queue=messageQueueMap.get(queueId);
-                    producer.send(message,queue);
+            for (String queueId : msgsByQueueId.keySet()) {
+                messageList = msgsByQueueId.get(queueId);
+                for (Message message : messageList) {
+                    MessageQueue queue = messageQueueMap.get(queueId);
+                    producer.send(message, queue);
                 }
 
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             e.printStackTrace();
-            throw new RuntimeException("batch insert error ",e);
+            throw new RuntimeException("batch insert error ", e);
         }
 
         return true;
@@ -127,19 +134,21 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
 
 
     protected void initProducer() {
-        if(producer==null){
-            synchronized (this){
-                if(producer==null){
+        if (producer == null) {
+            synchronized (this) {
+                if (producer == null) {
                     destroy();
                     producer = new DefaultMQProducer(groupName + "producer", true, null);
                     try {
-                        if (this.namesrvAddr != null && !"".equalsIgnoreCase(this.namesrvAddr)) {
-                            producer.setNamesrvAddr(this.namesrvAddr);
+                        if (this.namesrvAddr == null || "".equals(this.namesrvAddr)) {
+                            throw new RuntimeException("namesrvAddr can not be null.");
                         }
+
+                        producer.setNamesrvAddr(this.namesrvAddr);
                         producer.start();
                     } catch (Exception e) {
                         setInitSuccess(false);
-                        throw new RuntimeException("创建队列失败," + topic + ",msg=" + e.getMessage(), e);
+                        throw new RuntimeException("create producer failed," + topic + ",msg=" + e.getMessage(), e);
                     }
                 }
             }
@@ -151,7 +160,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
         if (producer != null) {
             try {
                 producer.shutdown();
-                producer=null;
+                producer = null;
             } catch (Throwable t) {
                 if (LOG.isWarnEnabled()) {
                     LOG.warn(t.getMessage(), t);
@@ -160,14 +169,6 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
         }
     }
 
-    public static void main(String[] args) {
-        String topic = "shuffle_TOPIC_DIPPER_SYSTEM_MSG_6_namespace_name1";
-        RocketMQSink metaqSink = new RocketMQSink();
-        metaqSink.setTopic(topic);
-        metaqSink.setSplitNum(5);
-        metaqSink.init();
-        System.out.println(metaqSink.getSplitList().size());
-    }
 
     @Override
     public void destroy() {
@@ -195,7 +196,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
                 }
             }
 
-            //defaultMQAdminExt.createTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topic, splitNum, 1);
+
             defaultMQAdminExt.createTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, topic, splitNum, 1);
         } catch (Exception e) {
             e.printStackTrace();
@@ -209,7 +210,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
     @Override
     public List<ISplit> getSplitList() {
         initProducer();
-        List<ISplit> messageQueues=new ArrayList<>();
+        List<ISplit> messageQueues = new ArrayList<>();
         try {
 
             if (messageQueues == null || messageQueues.size() == 0) {
@@ -223,7 +224,7 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
                 Collections.sort(queueList);
                 messageQueues = queueList;
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
 
@@ -232,13 +233,13 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
 
     @Override
     public int getSplitNum() {
-        List<ISplit> splits=getSplitList();
-        if(splits==null||splits.size()==0){
+        List<ISplit> splits = getSplitList();
+        if (splits == null || splits.size() == 0) {
             return 0;
         }
-        Set<Integer> splitNames=new HashSet<>();
-        for(ISplit split:splits){
-            MessageQueue messageQueue= (MessageQueue)split.getQueue();
+        Set<Integer> splitNames = new HashSet<>();
+        for (ISplit split : splits) {
+            MessageQueue messageQueue = (MessageQueue) split.getQueue();
             splitNames.add(messageQueue.getQueueId());
         }
         return splitNames.size();
diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
index 2d35f80..b45df86 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java
@@ -291,7 +291,7 @@ public class RocketMQSource extends AbstractSupportOffsetResetSource {
                     ConcurrentMap<MessageQueue, AtomicLong> offsetTable = ReflectUtil.getDeclaredField(this, "offsetTable");
                     DebugWriter.getInstance(getTopic()).writeSaveOffset(mq, offsetTable.get(mq));
                 }
-                LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
+//                LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
                 super.updateConsumeOffsetToBroker(mq, offset, isOneway);
             }
         };
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
index 7797832..e30acdb 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java
@@ -23,7 +23,9 @@ import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
 import org.apache.rocketmq.streams.source.RocketMQSource;
 
-public class DataStreamSource {
+import java.io.Serializable;
+
+public class DataStreamSource implements Serializable {
     protected PipelineBuilder mainPipelineBuilder;
 
     public DataStreamSource(String namespace, String pipelineName) {
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
index 22d139a..f9260ea 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
@@ -135,7 +135,7 @@ public class DataStream implements Serializable {
                         if (result instanceof JSONObject) {
                             subMessage=new Message((JSONObject)t);
                         } else {
-                            subMessage=new Message(new UserDefinedMessage(result));
+                            subMessage=new Message(new UserDefinedMessage(t));
                         }
                         splitMessages.add(subMessage);
                     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index 1656c65..bd7029f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -30,7 +30,7 @@ public class OutputPrintChannel extends AbstractSink {
     @Override
     protected boolean batchInsert(List<IMessage> messages) {
         for (IMessage msg : messages) {
-            System.out.println(msg.getMessageBody().toJSONString());
+            System.out.println(msg.getMessageValue());
         }
         return false;
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
index b5171bc..04dc08f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java
@@ -42,7 +42,6 @@ public class MessageCache<R> implements IMessageCache<R> {
     protected volatile int autoFlushSize=300;
     protected volatile int autoFlushTimeGap=1000;
 
-    protected transient AtomicBoolean LOCK=new AtomicBoolean(false);
 
     public MessageCache(IMessageFlushCallBack<R> flushCallBack) {
         this.flushCallBack = flushCallBack;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
index e5dcc6c..0d71b6b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
@@ -61,21 +61,10 @@ public abstract class AbstractWindowStage<T extends IMessage> extends ChainStage
     @Override
     public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessage newSplitMessage) {
 
-
-        //do nothigh
     }
     @Override
     public void removeSplit(IMessage message, AbstractContext context, RemoveSplitMessage removeSplitMessage) {
-        //if(message.getHeader().isNeedFlush()){
-        //    if(message.getHeader().getCheckpointQueueIds()!=null&&message.getHeader().getCheckpointQueueIds().size()>0){
-        //        window.getWindowCache().flush(message.getHeader().getCheckpointQueueIds());
-        //    }else {
-        //        Set<String> queueIds=new HashSet<>();
-        //        queueIds.add(message.getHeader().getQueueId());
-        //        window.getWindowCache().flush(queueIds);
-        //    }
-        //
-        //}
+
     }
 
     @Override
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
index 387152b..c9d9bda 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
@@ -18,12 +18,16 @@ package org.apache.rocketmq.streams.examples.rocketmqsource;
 
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.source.RocketMQSource;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
+import org.apache.rocketmq.streams.client.transform.window.WindowInfo;
+
+import java.util.Arrays;
 
 public class RocketMQSourceExample2 {
     public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
-    public static final String RMQ_TOPIC = "topic_tiger_0901_01";
-    public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
+    public static final String RMQ_TOPIC = "NormalTestTopic";
+    public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-01";
     public static final String TAGS = "*";
 
     public static void main(String[] args) {
@@ -34,9 +38,26 @@ public class RocketMQSourceExample2 {
                 RMQ_CONSUMER_GROUP_NAME,
                 false,
                 NAMESRV_ADDRESS)
+                .forEach((message)->{
+                    System.out.println("forEach: before===========");
+                    System.out.println("forEach: "+message);
+                    System.out.println("forEach: after===========");
+                })
                 .map(message -> message)
+                .filter((value) -> {
+                    System.out.println("filter: ===========");
+                    String messageValue = (String)value;
+                    return !messageValue.contains("RocketMQ");
+                })
+                .flatMap((message)->{
+                    String value = (String) message;
+                    String[] result = value.split(" ");
+                    return Arrays.asList(result);
+                })
                 .toPrint(1)
                 .start();
 
     }
+
+
 }
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
similarity index 70%
copy from rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
copy to rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
index 387152b..1af6206 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample3.java
@@ -14,16 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.streams.examples.rocketmqsource;
 
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.source.RocketMQSource;
 
-public class RocketMQSourceExample2 {
+import java.util.Arrays;
+
+public class RocketMQSourceExample3 {
     public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
-    public static final String RMQ_TOPIC = "topic_tiger_0901_01";
-    public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
+    public static final String RMQ_TOPIC = "NormalTestTopic";
+    public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-03";
     public static final String TAGS = "*";
 
     public static void main(String[] args) {
@@ -34,9 +36,23 @@ public class RocketMQSourceExample2 {
                 RMQ_CONSUMER_GROUP_NAME,
                 false,
                 NAMESRV_ADDRESS)
+                .forEach((message) -> {
+                    System.out.println("forEach: " + message);
+                })
                 .map(message -> message)
+                .filter((value) -> {
+                    String messageValue = (String) value;
+                    return messageValue.contains("RocketMQ");
+                })
+                .flatMap((message) -> {
+                    String value = (String) message;
+                    String[] result = value.split(" ");
+                    return Arrays.asList(result);
+                })
                 .toPrint(1)
                 .start();
 
     }
+
+
 }
diff --git a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
similarity index 55%
copy from rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
copy to rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
index 387152b..a50d6cb 100644
--- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketMQSourceExample2.java
+++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/RocketmqWindowTest.java
@@ -14,16 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.streams.examples.rocketmqsource;
 
+import com.alibaba.fastjson.JSONObject;
 import org.apache.rocketmq.streams.client.StreamBuilder;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.source.RocketMQSource;
+import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
+import org.apache.rocketmq.streams.client.transform.window.Time;
+import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
 
-public class RocketMQSourceExample2 {
+public class RocketmqWindowTest {
     public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
-    public static final String RMQ_TOPIC = "topic_tiger_0901_01";
-    public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
+    public static final String RMQ_TOPIC = "NormalTestTopic";
+    public static final String RMQ_CONSUMER_GROUP_NAME = "group-03";
     public static final String TAGS = "*";
 
     public static void main(String[] args) {
@@ -34,9 +38,26 @@ public class RocketMQSourceExample2 {
                 RMQ_CONSUMER_GROUP_NAME,
                 false,
                 NAMESRV_ADDRESS)
-                .map(message -> message)
+                .filter((message) -> {
+                    try {
+                        JSONObject.parseObject((String) message);
+                    } catch (Throwable t) {
+                        // if can not convert to json, discard it.because all operator are base on json.
+                        return true;
+                    }
+                    return false;
+                })
+                .map(message -> JSONObject.parseObject((String) message))
+                .window(TumblingWindow.of(Time.seconds(1)))
+                .groupBy("ProjectName", "LogStore")
+                .count("total")
+                .waterMark(1)
+                .setLocalStorageOnly(true)
+                .toDataSteam()
                 .toPrint(1)
+                .with(WindowStrategy.highPerformance())
                 .start();
 
     }
+
 }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index aea126e..6b05827 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@ -619,8 +619,7 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
     }
 
     @Override
-    public void setFireReceiver(
-        PiplineRecieverAfterCurrentNode fireReceiver) {
+    public void setFireReceiver(PiplineRecieverAfterCurrentNode fireReceiver) {
         this.fireReceiver = fireReceiver;
     }
 
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
index 6d1df78..09b6555 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java
@@ -77,22 +77,22 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
             synchronized (this) {
                 if (!hasCreateShuffleChannel) {
                     ISource piplineSource = pipeline.getSource();
-                    ServiceLoaderComponent serviceLoaderComponent = ComponentCreator.getComponent(
-                        IChannelBuilder.class.getName(), ServiceLoaderComponent.class);
+                    ServiceLoaderComponent serviceLoaderComponent = ComponentCreator.getComponent(IChannelBuilder.class.getName(), ServiceLoaderComponent.class);
+
                     IChannelBuilder builder = (IChannelBuilder)serviceLoaderComponent.loadService(piplineSource.getClass().getSimpleName());
                     if (builder == null) {
                         throw new RuntimeException("can not create shuffle channel, not find channel builder " + piplineSource.toJson());
                     }
-                    if (!IShuffleChannelBuilder.class.isInstance(builder)) {
+                    if (!(builder instanceof IShuffleChannelBuilder)) {
                         throw new RuntimeException("can not create shuffle channel, builder not imp IShuffleChannelBuilder " + piplineSource.toJson());
                     }
                     IShuffleChannelBuilder shuffleChannelBuilder = (IShuffleChannelBuilder)builder;
                     ISink sink = shuffleChannelBuilder.createBySource(piplineSource);
-                    if (!MemoryChannel.class.isInstance(sink) && !AbstractSupportShuffleSink.class.isInstance(sink)) {
+                    if (!(sink instanceof MemoryChannel) && !(sink instanceof AbstractSupportShuffleSink)) {
                         throw new RuntimeException("can not create shuffle channel, sink not extends AbstractSupportShuffleSink " + piplineSource.toJson());
                     }
                     ISource source = null;
-                    if (MemoryChannel.class.isInstance(sink)) {
+                    if (sink instanceof MemoryChannel) {
                         MemoryCache memoryCache = new MemoryCache();
                         memoryCache.setNameSpace(createShuffleChannelNameSpace(pipeline));
                         memoryCache.setConfigureName(createShuffleChannelName(pipeline));
@@ -108,6 +108,7 @@ public abstract class AbstractSystemChannel implements IConfigurableIdentificati
                     putDynamicPropertyValue(new HashSet<>(), properties);
 
                     AbstractSupportShuffleSink shuffleSink = (AbstractSupportShuffleSink)sink;
+                    //todo 为什么这里的分区数量要和源头topic的分区数量一直?
                     shuffleSink.setSplitNum(getShuffleSplitCount(shuffleSink));
                     shuffleSink.setNameSpace(createShuffleChannelNameSpace(pipeline));
                     shuffleSink.setConfigureName(createShuffleChannelName(pipeline));
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
index e1f3e62..1ea07b3 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java
@@ -85,14 +85,14 @@ public class ShuffleCache extends WindowCache {
      */
     protected void saveSplitProgress(String queueId, List<IMessage> messages) {
         Map<String,String> queueId2OrigOffset=new HashMap<>();
-        Set<String> oriQueueIds=new HashSet<>();
+//        Set<String> oriQueueIds=new HashSet<>();
         Boolean isLong=false;
         for(IMessage message:messages){
             isLong=message.getMessageBody().getBoolean(ORIGIN_QUEUE_IS_LONG);
             String oriQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
             String oriOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
             queueId2OrigOffset.put(oriQueueId,oriOffset);
-            oriQueueIds.add(oriQueueId);
+//            oriQueueIds.add(oriQueueId);
         }
         Map<String,WindowMaxValue> windowMaxValueMap=window.getWindowMaxValueManager().saveMaxOffset(isLong,window.getConfigureName(),queueId,queueId2OrigOffset);
         window.getSqlCache().addCache(new SplitSQLElement(queueId,ORMUtil.createBatchReplacetSQL(new ArrayList<>(windowMaxValueMap.values()))));
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index 922de4b..cb6cb4e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 
 
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
 import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
@@ -76,7 +77,6 @@ public class ShuffleChannel extends AbstractSystemChannel {
     protected ShuffleCache shuffleCache;
 
 
-
     protected Map<String, ISplit> queueMap = new ConcurrentHashMap<>();
     protected List<ISplit> queueList;//所有的分片
 
@@ -87,28 +87,28 @@ public class ShuffleChannel extends AbstractSystemChannel {
     /**
      * 每个分片,已经确定处理的最大offset
      */
-    protected transient Map<String,String> split2MaxOffsets=new HashMap<>();
+    protected transient Map<String, String> split2MaxOffsets = new HashMap<>();
 
     public ShuffleChannel(AbstractShuffleWindow window) {
         this.window = window;
         channelConfig = new HashMap<>();
         channelConfig.put(CHANNEL_PROPERTY_KEY_PREFIX, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_PROPERTY_PREFIX);
         channelConfig.put(CHANNEL_TYPE, ConfigureFileKey.WINDOW_SHUFFLE_CHANNEL_TYPE);
-        this.consumer = createSource(window.getNameSpace(),window.getConfigureName());
+        this.consumer = createSource(window.getNameSpace(), window.getConfigureName());
 
-        this.producer = createSink(window.getNameSpace(),window.getConfigureName());
-        if(this.consumer==null||this.producer==null){
+        this.producer = createSink(window.getNameSpace(), window.getConfigureName());
+        if (this.consumer == null || this.producer == null) {
             autoCreateShuffleChannel(window.getFireReceiver().getPipeline());
         }
-        if(this.consumer instanceof AbstractSource){
-            ((AbstractSource)this.consumer).setJsonData(true);
+        if (this.consumer instanceof AbstractSource) {
+            ((AbstractSource) this.consumer).setJsonData(true);
         }
 
         this.shuffleCache = new ShuffleCache(window);
         this.shuffleCache.init();
         this.shuffleCache.openAutoFlush();
 
-        if (producer!=null&&(queueList == null  || queueList.size() == 0) ){
+        if (producer != null && (queueList == null || queueList.size() == 0)) {
             queueList = producer.getSplitList();
             Map<String, ISplit> tmp = new ConcurrentHashMap<>();
             for (ISplit queue : queueList) {
@@ -128,24 +128,25 @@ public class ShuffleChannel extends AbstractSystemChannel {
      * @return
      */
 
-    protected transient AtomicLong COUNT=new AtomicLong(0);
+    protected transient AtomicLong COUNT = new AtomicLong(0);
+
     @Override
     public Object doMessage(IMessage oriMessage, AbstractContext context) {
         if (oriMessage.getHeader().isSystemMessage()) {
-            doSystemMessage(oriMessage,context);
+            doSystemMessage(oriMessage, context);
             return null;
 
         }
         /**
          * 过滤不是这个window的消息,一个shuffle通道,可能多个window共享,这里过滤掉非本window的消息
          */
-        boolean isFilter=filterNotOwnerMessage(oriMessage);
-        if(isFilter){
+        boolean isFilter = filterNotOwnerMessage(oriMessage);
+        if (isFilter) {
             return null;
         }
-        String queueId=oriMessage.getHeader().getQueueId();
+        String queueId = oriMessage.getHeader().getQueueId();
         JSONArray messages = oriMessage.getMessageBody().getJSONArray(SHUFFLE_MESSAGES);
-        if(messages==null){
+        if (messages == null) {
             return null;
         }
 
@@ -154,23 +155,23 @@ public class ShuffleChannel extends AbstractSystemChannel {
             TraceUtil.debug(traceId, "shuffle message in", "received message size:" + messages.size());
         }
 
-        for (Object obj: messages) {
+        for (Object obj : messages) {
             IMessage message = new Message((JSONObject) obj);
             message.getHeader().setQueueId(queueId);
             window.updateMaxEventTime(message);
-            if(isRepeateMessage(message,queueId)){
+            if (isRepeateMessage(message, queueId)) {
                 continue;
             }
-            List<WindowInstance> windowInstances=window.queryOrCreateWindowInstance(message,queueId);
-            if(windowInstances==null||windowInstances.size()==0){
+            List<WindowInstance> windowInstances = window.queryOrCreateWindowInstance(message, queueId);
+            if (windowInstances == null || windowInstances.size() == 0) {
                 LOG.warn("the message is out of window instance, the message is discard");
                 continue;
             }
-            for(WindowInstance windowInstance:windowInstances){
+            for (WindowInstance windowInstance : windowInstances) {
                 String windowInstanceId = windowInstance.createWindowInstanceId();
                 //new instance, not need load data from remote
-                if(windowInstance.isNewWindowInstance()){
-                    window.getSqlCache().addCache(new SQLElement(windowInstance.getSplitId(),windowInstanceId, ORMUtil.createBatchReplacetSQL(windowInstance)));
+                if (windowInstance.isNewWindowInstance()) {
+                    window.getSqlCache().addCache(new SQLElement(windowInstance.getSplitId(), windowInstanceId, ORMUtil.createBatchReplacetSQL(windowInstance)));
                     windowInstance.setNewWindowInstance(false);
                     ShufflePartitionManager.getInstance().setWindowInstanceFinished(windowInstance.createWindowInstanceId());
                 }
@@ -178,21 +179,17 @@ public class ShuffleChannel extends AbstractSystemChannel {
 
             message.getMessageBody().put(WindowInstance.class.getSimpleName(), windowInstances);
             message.getMessageBody().put(AbstractWindow.class.getSimpleName(), window);
-            long count=COUNT.incrementAndGet();
-//            if(count>25000){
-//                System.out.println("shufffle reciever is "+count);
-//            }
 
-            if(DebugWriter.getDebugWriter(window.getConfigureName()).isOpenDebug()){
-                List<IMessage> msgs=new ArrayList<>();
+            if (DebugWriter.getDebugWriter(window.getConfigureName()).isOpenDebug()) {
+                List<IMessage> msgs = new ArrayList<>();
                 msgs.add(message);
-                DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceiveBeforeCache(window,msgs,queueId);
+                DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceiveBeforeCache(window, msgs, queueId);
             }
 
 
-            beforeBatchAdd(oriMessage,message);
+            beforeBatchAdd(oriMessage, message);
 
-            for(WindowInstance windowInstance:windowInstances){
+            for (WindowInstance windowInstance : windowInstances) {
                 window.getWindowFireSource().updateWindowInstanceLastUpdateTime(windowInstance);
             }
             shuffleCache.batchAdd(message);
@@ -203,38 +200,39 @@ public class ShuffleChannel extends AbstractSystemChannel {
 
     @Override
     public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessage newSplitMessage) {
-        this.currentQueueIds=newSplitMessage.getCurrentSplitIds();
+        this.currentQueueIds = newSplitMessage.getCurrentSplitIds();
         loadSplitProgress(newSplitMessage);
 
-        List<WindowInstance> allWindowInstances=WindowInstance.queryAllWindowInstance(DateUtil.getCurrentTimeString(),window,newSplitMessage.getSplitIds());
-        if(CollectionUtil.isNotEmpty(allWindowInstances)){
-            Map<String,Set<WindowInstance>> queueId2WindowInstances=new HashMap<>();
-            for(WindowInstance windowInstance:allWindowInstances){
+        List<WindowInstance> allWindowInstances = WindowInstance.queryAllWindowInstance(DateUtil.getCurrentTimeString(), window, newSplitMessage.getSplitIds());
+        if (CollectionUtil.isNotEmpty(allWindowInstances)) {
+            Map<String, Set<WindowInstance>> queueId2WindowInstances = new HashMap<>();
+            for (WindowInstance windowInstance : allWindowInstances) {
                 windowInstance.setNewWindowInstance(false);
-                window.getWindowInstanceMap().putIfAbsent(windowInstance.createWindowInstanceTriggerId(),windowInstance);
-                window.getWindowFireSource().registFireWindowInstanceIfNotExist(windowInstance,window);
-                String queueId=windowInstance.getSplitId();
-                window.getStorage().loadSplitData2Local(queueId,windowInstance.createWindowInstanceId(),window.getWindowBaseValueClass(),new WindowRowOperator(windowInstance,queueId,window));
+                window.getWindowInstanceMap().putIfAbsent(windowInstance.createWindowInstanceTriggerId(), windowInstance);
+                window.getWindowFireSource().registFireWindowInstanceIfNotExist(windowInstance, window);
+                String queueId = windowInstance.getSplitId();
+                window.getStorage().loadSplitData2Local(queueId, windowInstance.createWindowInstanceId(), window.getWindowBaseValueClass(), new WindowRowOperator(windowInstance, queueId, window));
                 window.initWindowInstanceMaxSplitNum(windowInstance);
             }
 
 
-        }else {
-            for(String queueId:newSplitMessage.getSplitIds()){
+        } else {
+            for (String queueId : newSplitMessage.getSplitIds()) {
                 ShufflePartitionManager.getInstance().setSplitFinished(queueId);
             }
         }
-        window.getFireReceiver().doMessage(message,context);
+        window.getFireReceiver().doMessage(message, context);
     }
 
     /**
      * load ori split consume offset
+     *
      * @param newSplitMessage
      */
     protected void loadSplitProgress(NewSplitMessage newSplitMessage) {
-        for(String queueId:newSplitMessage.getSplitIds()){
-            Map<String,String> result=window.getWindowMaxValueManager().loadOffsets(window.getConfigureName(),queueId);
-            if(result!=null){
+        for (String queueId : newSplitMessage.getSplitIds()) {
+            Map<String, String> result = window.getWindowMaxValueManager().loadOffsets(window.getConfigureName(), queueId);
+            if (result != null) {
                 this.split2MaxOffsets.putAll(result);
             }
         }
@@ -242,10 +240,10 @@ public class ShuffleChannel extends AbstractSystemChannel {
 
     @Override
     public void removeSplit(IMessage message, AbstractContext context, RemoveSplitMessage removeSplitMessage) {
-        this.currentQueueIds=removeSplitMessage.getCurrentSplitIds();
-        Set<String> queueIds=removeSplitMessage.getSplitIds();
-        if(queueIds!=null){
-            for(String queueId:queueIds){
+        this.currentQueueIds = removeSplitMessage.getCurrentSplitIds();
+        Set<String> queueIds = removeSplitMessage.getSplitIds();
+        if (queueIds != null) {
+            for (String queueId : queueIds) {
                 ShufflePartitionManager.getInstance().setSplitInValidate(queueId);
                 window.clearCache(queueId);
 
@@ -253,55 +251,57 @@ public class ShuffleChannel extends AbstractSystemChannel {
             window.getWindowMaxValueManager().removeKeyPrefixFromLocalCache(queueIds);
             //window.getWindowFireSource().removeSplit(queueIds);
         }
-        window.getFireReceiver().doMessage(message,context);
+        window.getFireReceiver().doMessage(message, context);
     }
 
     @Override
     public void checkpoint(IMessage message, AbstractContext context, CheckPointMessage checkPointMessage) {
-        if(message.getHeader().isNeedFlush()){
+        if (message.getHeader().isNeedFlush()) {
             this.flush(message.getHeader().getCheckpointQueueIds());
             window.getSqlCache().flush(message.getHeader().getCheckpointQueueIds());
         }
-        CheckPointState checkPointState=  new CheckPointState();
+        CheckPointState checkPointState = new CheckPointState();
         checkPointState.setQueueIdAndOffset(this.shuffleCache.getFinishedQueueIdAndOffsets(checkPointMessage));
         checkPointMessage.reply(checkPointState);
     }
 
     /**
      * do system message
+     *
      * @param oriMessage
      * @param context
      */
     protected void doSystemMessage(IMessage oriMessage, AbstractContext context) {
-        ISystemMessage systemMessage=oriMessage.getSystemMessage();
-        if(systemMessage instanceof CheckPointMessage){
-            this.checkpoint(oriMessage, context,(CheckPointMessage)systemMessage);
-        }else if(systemMessage instanceof NewSplitMessage){
-            this.addNewSplit(oriMessage,context,(NewSplitMessage)systemMessage);
-        }else if(systemMessage instanceof RemoveSplitMessage){
-            this.removeSplit(oriMessage,context,(RemoveSplitMessage)systemMessage);
-        }else {
-            throw new RuntimeException("can not support this system message "+systemMessage.getClass().getName());
+        ISystemMessage systemMessage = oriMessage.getSystemMessage();
+        if (systemMessage instanceof CheckPointMessage) {
+            this.checkpoint(oriMessage, context, (CheckPointMessage) systemMessage);
+        } else if (systemMessage instanceof NewSplitMessage) {
+            this.addNewSplit(oriMessage, context, (NewSplitMessage) systemMessage);
+        } else if (systemMessage instanceof RemoveSplitMessage) {
+            this.removeSplit(oriMessage, context, (RemoveSplitMessage) systemMessage);
+        } else {
+            throw new RuntimeException("can not support this system message " + systemMessage.getClass().getName());
         }
-        afterFlushCallback(oriMessage,context);
+        afterFlushCallback(oriMessage, context);
     }
 
 
     /**
      * if the message offset is old filter the repeate message
+     *
      * @param message
      * @param queueId
      * @return
      */
     protected boolean isRepeateMessage(IMessage message, String queueId) {
-        boolean isOrigOffsetLong=message.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG);
+        boolean isOrigOffsetLong = message.getMessageBody().getBoolean(WindowCache.ORIGIN_QUEUE_IS_LONG);
         String oriQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
         String oriOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
-        String key=MapKeyUtil.createKey(window.getConfigureName(),queueId,oriQueueId);
-        String offset=this.split2MaxOffsets.get(key);
-        if(offset!=null){
-            MessageOffset messageOffset=new MessageOffset(oriOffset,isOrigOffsetLong);
-            if(!messageOffset.greateThan(offset)){
+        String key = MapKeyUtil.createKey(window.getConfigureName(), queueId, oriQueueId);
+        String offset = this.split2MaxOffsets.get(key);
+        if (offset != null) {
+            MessageOffset messageOffset = new MessageOffset(oriOffset, isOrigOffsetLong);
+            if (!messageOffset.greateThan(offset)) {
                 System.out.println("the message offset is old, the message is discard ");
                 return true;
             }
@@ -315,13 +315,13 @@ public class ShuffleChannel extends AbstractSystemChannel {
     }
 
     @Override
-    protected void putDynamicPropertyValue(Set<String> dynamiPropertySet,Properties properties){
-        String groupName="groupName";
-        if(!dynamiPropertySet.contains(groupName)){
-            properties.put(groupName,getDynamicPropertyValue());
+    protected void putDynamicPropertyValue(Set<String> dynamiPropertySet, Properties properties) {
+        String groupName = "groupName";
+        if (!dynamiPropertySet.contains(groupName)) {
+            properties.put(groupName, getDynamicPropertyValue());
         }
-        if(!dynamiPropertySet.contains("tags")){
-            properties.put("tags",getDynamicPropertyValue());
+        if (!dynamiPropertySet.contains("tags")) {
+            properties.put("tags", getDynamicPropertyValue());
         }
     }
 
@@ -335,7 +335,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
     @Override
     protected String createShuffleTopic(String topic, ChainPipeline pipeline) {
         return "shuffle_" + topic + "_" + pipeline.getSource().getNameSpace().replaceAll("\\.", "_") + "_" + pipeline
-            .getConfigureName().replaceAll("\\.", "_").replaceAll(";", "_");
+                .getConfigureName().replaceAll("\\.", "_").replaceAll(";", "_");
     }
 
     /**
@@ -361,11 +361,9 @@ public class ShuffleChannel extends AbstractSystemChannel {
     }
 
 
-
-
     @Override
     public String getConfigureName() {
-        return window.getConfigureName()+"_shuffle";
+        return window.getConfigureName() + "_shuffle";
     }
 
     @Override
@@ -379,19 +377,17 @@ public class ShuffleChannel extends AbstractSystemChannel {
     }
 
 
-
-
-    public ISplit getSplit(Integer index){
+    public ISplit getSplit(Integer index) {
         return queueList.get(index);
     }
 
-    public JSONObject createMsg(JSONArray messages,ISplit split) {
+    public JSONObject createMsg(JSONArray messages, ISplit split) {
 
         JSONObject msg = new JSONObject();
 
         msg.put(SHUFFLE_QUEUE_ID, split.getQueueId());//分片id
         msg.put(SHUFFLE_MESSAGES, messages);//合并的消息
-        msg.put(MSG_OWNER,getDynamicPropertyValue());//消息owner
+        msg.put(MSG_OWNER, getDynamicPropertyValue());//消息owner
 
         StringBuilder traceIds = new StringBuilder();
         for (int i = 0; i < messages.size(); i++) {
@@ -406,18 +402,18 @@ public class ShuffleChannel extends AbstractSystemChannel {
         return msg;
     }
 
-    public JSONArray getMsgs(JSONObject msg){
+    public JSONArray getMsgs(JSONObject msg) {
         return msg.getJSONArray(SHUFFLE_MESSAGES);
     }
 
-    public ISplit getChannelQueue(String key){
-        int index=hash(key);
+    public ISplit getChannelQueue(String key) {
+        int index = hash(key);
         ISplit targetQueue = queueList.get(index);
         return targetQueue;
     }
 
-    public  int hash(Object key) {
-        int mValue=queueList.size();
+    public int hash(Object key) {
+        int mValue = queueList.size();
         int h = 0;
         if (key != null) {
             h = key.hashCode() ^ (h >>> 16);
@@ -457,23 +453,24 @@ public class ShuffleChannel extends AbstractSystemChannel {
      * @return
      */
     protected boolean filterNotOwnerMessage(IMessage oriMessage) {
-        String owner=oriMessage.getMessageBody().getString(MSG_OWNER);
-        if(owner!=null&&owner.equals(getDynamicPropertyValue())){
+        String owner = oriMessage.getMessageBody().getString(MSG_OWNER);
+        if (owner != null && owner.equals(getDynamicPropertyValue())) {
             return false;
         }
         return true;
     }
+
     @Override
     protected String getDynamicPropertyValue() {
-        String dynamicPropertyValue= MapKeyUtil.createKey(window.getNameSpace(),window.getConfigureName());
-        dynamicPropertyValue = dynamicPropertyValue.replaceAll("\\.", "_").replaceAll(";","_");
+        String dynamicPropertyValue = MapKeyUtil.createKey(window.getNameSpace(), window.getConfigureName());
+        dynamicPropertyValue = dynamicPropertyValue.replaceAll("\\.", "_").replaceAll(";", "_");
         return dynamicPropertyValue;
     }
 
     @Override
     protected int getShuffleSplitCount(AbstractSupportShuffleSink shuffleSink) {
-        int splitNum=shuffleSink.getSplitNum();
-        return splitNum>0?splitNum:32;
+        int splitNum = shuffleSink.getSplitNum();
+        return splitNum > 0 ? splitNum : 32;
     }
 
     public Set<String> getCurrentQueueIds() {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
index 8e1b363..03e4224 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
@@ -32,106 +32,90 @@ import org.apache.rocketmq.streams.db.driver.JDBCDriver;
 /**
  * cache sql, async and batch commit
  */
+
 public class SQLCache extends AbstractMultiSplitMessageCache<ISQLElement> {
     protected Boolean isOpenCache=true;//if false,then execute sql when receive sql
     protected Set<String> firedWindowInstances=new HashSet<>();//fired window instance ,if the owned sqls have not commit, can cancel the sqls
     protected Map<String,Integer> windowInstance2Index=new HashMap<>();//set index to ISQLElement group by window instance
+
     protected boolean isLocalOnly;
-    public SQLCache(boolean isLocalOnly ){
+
+    public SQLCache(boolean isLocalOnly) {
         super(null);
-        this.isLocalOnly=isLocalOnly;
+        this.isLocalOnly = isLocalOnly;
         this.flushCallBack = new MessageFlushCallBack(new SQLCacheCallback());
         this.setBatchSize(1000);
-        this.setAutoFlushTimeGap(30*1000);
+        this.setAutoFlushTimeGap(30 * 1000);
         this.setAutoFlushSize(100);
         this.openAutoFlush();
     }
 
-    @Override public int addCache(ISQLElement isqlElement) {
-        if(isLocalOnly){
+    @Override
+    public int addCache(ISQLElement isqlElement) {
+        if (isLocalOnly) {
             return 0;
         }
-        if(isOpenCache==false){
+        if (isOpenCache == false) {
             DriverBuilder.createDriver().execute(isqlElement.getSQL());
             return 1;
         }
-        if(isqlElement.isFireNotify()){
+        if (isqlElement.isFireNotify()) {
             firedWindowInstances.add(isqlElement.getWindowInstanceId());
-        }else if(isqlElement.isWindowInstanceSQL()){
-            Integer index=windowInstance2Index.get(isqlElement.getWindowInstanceId());
-            if(index==null){
-                index=0;
+        } else if (isqlElement.isWindowInstanceSQL()) {
+            Integer index = windowInstance2Index.get(isqlElement.getWindowInstanceId());
+            if (index == null) {
+                index = 0;
             }
             index++;
             isqlElement.setIndex(index);
-            windowInstance2Index.put(isqlElement.getWindowInstanceId(),index);
+            windowInstance2Index.put(isqlElement.getWindowInstanceId(), index);
         }
 
         return super.addCache(isqlElement);
     }
 
-    @Override protected String createSplitId(ISQLElement msg) {
+    @Override
+    protected String createSplitId(ISQLElement msg) {
         return msg.getQueueId();
     }
 
-    protected AtomicInteger executeSQLCount=new AtomicInteger(0);
-    protected AtomicInteger cancelQLCount=new AtomicInteger(0);
-    protected class SQLCacheCallback implements IMessageFlushCallBack<ISQLElement> {
-        Set<String> canCancelWindowIntances=new HashSet<>();
-        @Override public boolean flushMessage(List<ISQLElement> messages) {
-                    List<String> sqls=new ArrayList<>();
+    protected AtomicInteger executeSQLCount = new AtomicInteger(0);
+    protected AtomicInteger cancelQLCount = new AtomicInteger(0);
 
-                    for(ISQLElement isqlElement:messages){
-                        if(isqlElement.isSplitSQL()){
-                            sqls.add(isqlElement.getSQL());
-                        }else if(isqlElement.isWindowInstanceSQL()){
-//                            if(canCancel(isqlElement)){
-//                                cancelQLCount.incrementAndGet();
-//                                continue;
-//                            }else {
-                                sqls.add(isqlElement.getSQL());
-//                            }
-                        }else if(isqlElement.isFireNotify()){
-                            windowInstance2Index.remove(isqlElement.getWindowInstanceId());
-                            firedWindowInstances.remove(isqlElement.getWindowInstanceId());
+    protected class SQLCacheCallback implements IMessageFlushCallBack<ISQLElement> {
 
-                        }
-                    }
-                    if(sqls.size()==0){
-                        return true;
-                    }
-                    JDBCDriver dataSource = DriverBuilder.createDriver();
-                    try {
-                        executeSQLCount.addAndGet(sqls.size());
-                        dataSource.executSqls(sqls);
-                        System.out.println("execute sql count is "+executeSQLCount.get()+";  cancel sql count is "+cancelQLCount.get());
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        throw new RuntimeException(e);
-                    } finally {
-                        if (dataSource != null) {
-                            dataSource.destroy();
-                        }
-                    }
-                    return true;
-                    }
+        @Override
+        public boolean flushMessage(List<ISQLElement> messages) {
+            List<String> sqls = new ArrayList<>();
 
+            for (ISQLElement isqlElement : messages) {
+                if (isqlElement.isSplitSQL()) {
+                    sqls.add(isqlElement.getSQL());
+                } else if (isqlElement.isWindowInstanceSQL()) {
+                    sqls.add(isqlElement.getSQL());
+                } else if (isqlElement.isFireNotify()) {
+                    windowInstance2Index.remove(isqlElement.getWindowInstanceId());
+                    firedWindowInstances.remove(isqlElement.getWindowInstanceId());
 
-        protected boolean canCancel(ISQLElement element) {
-            String windowInstanceId=element.getWindowInstanceId();
-            if(!firedWindowInstances.contains(windowInstanceId)){
-                return false;
+                }
             }
-            if(canCancelWindowIntances.contains(windowInstanceId)){
+            if (sqls.size() == 0) {
                 return true;
             }
-            if(element.getIndex()==1){
-                canCancelWindowIntances.add(windowInstanceId);
-                return true;
+            JDBCDriver dataSource = DriverBuilder.createDriver();
+            try {
+                executeSQLCount.addAndGet(sqls.size());
+                dataSource.executSqls(sqls);
+                System.out.println("execute sql count is " + executeSQLCount.get() + ";  cancel sql count is " + cancelQLCount.get());
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            } finally {
+                if (dataSource != null) {
+                    dataSource.destroy();
+                }
             }
-            return false;
+            return true;
         }
     }
-
-
 }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
index 202f2eb..b4ae904 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
@@ -506,18 +506,7 @@ public class WindowValue extends WindowBaseValue implements Serializable {
             StringUtil.createMD5Str(clonedValue.getWindowInstancePartitionId()));
         return clonedValue;
     }
-    //
-    //public WindowValue toOriginValue(boolean supportOutDate) {
-    //    WindowValue clonedValue = clone();
-    //    String windowInstanceId = WindowInstance.getWindowInstanceId(getNameSpace(), getConfigureName(), getStartTime(),
-    //        getEndTime(), getFireTime(), supportOutDate);
-    //    clonedValue.setMsgKey(MapKeyUtil
-    //        .createKey(getPartition(), windowInstanceId, getGroupBy()));
-    //    clonedValue.setWindowInstanceId(windowInstanceId);
-    //    clonedValue.setWindowInstancePartitionId(
-    //        MapKeyUtil.createKey(windowInstanceId, getPartition()));
-    //    return clonedValue;
-    //}
+
 
     public Long getLastUpdateTime() {
         return lastUpdateTime;

Mime
View raw message