rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-streams] branch main updated: add bitset cache to optimize select rocketmq builder compatible metaq fixed message systemmessage not set in deepcopy fixed script compile script expression bugs window fire support test model
Date Thu, 02 Sep 2021 05:45:04 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new f261840  add bitset cache to optimize select rocketmq builder compatible metaq fixed
message systemmessage not set in deepcopy fixed script compile script expression bugs window
fire support test model
     new 2532777  Merge pull request #46 from yuanxiaodong/window
f261840 is described below

commit f261840faea3866e36dcdf5fc42098320eb26285
Author: yuanxiaodongn <yuan_xiao_dong@163.com>
AuthorDate: Thu Sep 2 10:36:37 2021 +0800

    add bitset cache to optimize select
    rocketmq builder compatible metaq
    fixed message systemmessage not set in deepcopy
    fixed script compile script expression bugs
    window fire support test model
---
 .../rocketmq/streams/RocketMQChannelBuilder.java   |  2 +-
 .../client/strategy/ConfiguableConnector.java      | 32 --------
 .../streams/client/strategy/StateStrategy.java     | 37 ---------
 ...CheckpointStrategy.java => WindowStrategy.java} | 30 +++-----
 .../rocketmq/streams/client/DataStreamTest.java    |  8 +-
 .../streams/common/cache/compress/BitSetCache.java | 87 ++++++++++++++++++++++
 .../common/channel/impl/OutputPrintChannel.java    | 12 +--
 .../channel/sink/AbstractSupportShuffleSink.java   |  7 +-
 .../rocketmq/streams/common/context/Message.java   |  2 +
 .../streams/common/context/MessageHeader.java      |  1 +
 .../rocketmq/streams/common/model/ServiceName.java |  4 +
 .../streams/common/topology/model/Pipeline.java    |  3 +
 .../topology/stages/AbstractWindowStage.java       | 14 +++-
 .../common/topology/stages/OutputChainStage.java   |  2 +-
 .../rocketmq/streams/common/utils/ReflectUtil.java |  8 +-
 .../filter/function/expression/ScriptFunction.java |  1 -
 .../script/function/model/FunctionConfigure.java   |  6 ++
 .../operator/expression/ScriptExpression.java      |  2 +-
 .../optimization/CompileScriptExpression.java      |  2 +-
 .../serviceloader/ServiceLoaderComponent.java      |  4 +
 .../streams/window/operator/AbstractWindow.java    |  4 +-
 .../streams/window/source/WindowRireSource.java    |  8 ++
 22 files changed, 160 insertions(+), 116 deletions(-)

diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
index b077642..6014cf3 100644
--- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
+++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/RocketMQChannelBuilder.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.streams.sink.RocketMQSink;
 import org.apache.rocketmq.streams.source.RocketMQSource;
 
 @AutoService(IChannelBuilder.class)
-@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource")
+@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource",name="metaq")
 public class RocketMQChannelBuilder extends AbstractSupportShuffleChannelBuilder {
     public static final String TYPE = "rocketmq";
 
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/ConfiguableConnector.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/ConfiguableConnector.java
deleted file mode 100644
index a268ff3..0000000
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/ConfiguableConnector.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.client.strategy;
-
-import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
-
-public enum ConfiguableConnector {
-
-    DB(IConfigurableService.DEFAULT_SERVICE_NAME),MEMORY(IConfigurableService.MEMORY_SERVICE_NAME),FILE(IConfigurableService.FILE_SERVICE_NAME);
-    private String name;
-    private ConfiguableConnector(String name){
-        this.name=name;
-    }
-
-    public String getName() {
-        return name;
-    }
-}
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/StateStrategy.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/StateStrategy.java
deleted file mode 100644
index c647759..0000000
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/StateStrategy.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.streams.client.strategy;
-
-import java.util.Properties;
-
-public class StateStrategy implements Strategy {
-
-    private Properties properties;
-
-    private StateStrategy() {
-    }
-
-    @Override
-    public Properties getStrategyProperties() {
-        return this.properties;
-    }
-
-    public static Strategy db() {
-        return new StateStrategy();
-    }
-
-}
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/CheckpointStrategy.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
similarity index 56%
rename from rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/CheckpointStrategy.java
rename to rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
index c0c06d2..0204308 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/CheckpointStrategy.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java
@@ -20,32 +20,21 @@ import java.util.Properties;
 import org.apache.rocketmq.streams.common.component.AbstractComponent;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
 
-public class CheckpointStrategy implements Strategy {
+public class WindowStrategy implements Strategy {
 
     private final Properties properties;
 
-    private CheckpointStrategy(Long pollingTime) {
+    private WindowStrategy() {
         properties = new Properties();
-        properties.put(AbstractComponent.CONNECT_TYPE, IConfigurableService.MEMORY_SERVICE_NAME);
-        properties.put(AbstractComponent.POLLING_TIME, pollingTime + "");
     }
 
-    private CheckpointStrategy(String filePath, Long pollingTime) {
-        properties = new Properties();
-        properties.put(AbstractComponent.CONNECT_TYPE, IConfigurableService.FILE_SERVICE_NAME);
-        properties.put(IConfigurableService.FILE_PATH_NAME, filePath);
-        properties.put(AbstractComponent.POLLING_TIME, pollingTime + "");
-    }
-
-    private CheckpointStrategy(String url, String username, String password, Long pollingTime)
{
+    private WindowStrategy(String url, String username, String password) {
         properties = new Properties();
         properties.put(AbstractComponent.JDBC_DRIVER, AbstractComponent.DEFAULT_JDBC_DRIVER);
         properties.put(AbstractComponent.JDBC_URL, url);
         properties.put(AbstractComponent.JDBC_USERNAME, username);
         properties.put(AbstractComponent.JDBC_PASSWORD, password);
         properties.put(AbstractComponent.JDBC_TABLE_NAME, AbstractComponent.DEFAULT_JDBC_TABLE_NAME);
-        properties.put(AbstractComponent.POLLING_TIME, pollingTime + "");
-        properties.put(AbstractComponent.CONNECT_TYPE, IConfigurableService.DEFAULT_SERVICE_NAME);
     }
 
     @Override
@@ -53,16 +42,15 @@ public class CheckpointStrategy implements Strategy {
         return this.properties;
     }
 
-    public static Strategy db(String url, String username, String password, Long pollingTime)
{
-        return new CheckpointStrategy(url, username, password, pollingTime);
+    public static Strategy exactlyOnce(String url, String username, String password) {
+        return new WindowStrategy(url, username, password);
     }
 
-    public static Strategy file(String filePath, Long pollingTime) {
-        return new CheckpointStrategy(filePath, pollingTime);
-    }
+    public static Strategy highPerformance() {
 
-    public static Strategy mem(Long pollingTime) {
-        return new CheckpointStrategy(pollingTime);
+        return new WindowStrategy();
     }
 
+
+
 }
diff --git a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
index 6af3ee8..49e8559 100644
--- a/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
+++ b/rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.streams.client;
 
 import com.alibaba.fastjson.JSONObject;
 import org.apache.rocketmq.streams.client.source.DataStreamSource;
-import org.apache.rocketmq.streams.client.strategy.CheckpointStrategy;
+import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
 import org.apache.rocketmq.streams.client.transform.window.Time;
 import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
 import org.apache.rocketmq.streams.common.functions.MapFunction;
@@ -64,7 +64,7 @@ public class DataStreamTest implements Serializable {
                 .fromRocketmq("topic_xxxx02", "consumer_xxxx02", "127.0.0.1:9876")
                 .map(message -> message + "--")
                 .toPrint(1)
-                .with(CheckpointStrategy.db("", "", "", 0L))
+                .with(WindowStrategy.exactlyOnce("", "", ""))
                 .start();
     }
 
@@ -74,7 +74,7 @@ public class DataStreamTest implements Serializable {
                 .fromFile("/Users/junjie.cheng/text.txt", false)
                 .map(message -> message + "--")
                 .toPrint(1)
-                .with(CheckpointStrategy.mem(0L))
+                .with(WindowStrategy.highPerformance())
                 .start();
     }
 
@@ -98,7 +98,7 @@ public class DataStreamTest implements Serializable {
                 .sum("score", "scoreValue")
                 .toDataSteam()
                 .toPrint(1)
-                .with(CheckpointStrategy.db("", "", "", 1000L))
+                .with(WindowStrategy.exactlyOnce("", "", ""))
                 .start();
     }
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
new file mode 100644
index 0000000..e57ced8
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
@@ -0,0 +1,87 @@
+package org.apache.rocketmq.streams.common.cache.compress;
+
+import org.apache.rocketmq.streams.common.utils.NumberUtils;
+
+public class BitSetCache {
+    protected ByteArrayValueKV cache;
+    protected int byteSetSize;
+    protected int capacity;
+
+    private class BitSet{
+        private byte[] bytes;
+
+        public BitSet(){
+            bytes=new byte[byteSetSize];
+        }
+        public BitSet(byte[] bytes){
+            this.bytes=bytes;
+        }
+        public void set(int index){
+            if(index>byteSetSize){
+                throw new RuntimeException("the index exceed max index, max index is "+byteSetSize+",
real is "+index);
+            }
+            int byteIndex=index/8;
+            int bitIndex=index%8;
+            byte byteElement=bytes[byteIndex];
+            byteElement = (byte) (byteElement|(1 << bitIndex));
+            bytes[byteIndex]=byteElement;
+        }
+        public boolean get(int index){
+            if(index>byteSetSize){
+                throw new RuntimeException("the index exceed max index, max index is "+byteSetSize+",
real is "+index);
+            }
+            int byteIndex=index/8;
+            int bitIndex=index%8;
+            byte byteElement=bytes[byteIndex];
+            boolean isTrue = ((byteElement & (1 << bitIndex)) != 0);
+            return isTrue;
+        }
+
+        public byte[] getBytes(){
+            return bytes;
+        }
+    }
+
+    public BitSet createBitSet(){
+        return new BitSet();
+    }
+
+
+    public BitSetCache(int bitSetSize, int capacity){
+        cache=new ByteArrayValueKV(capacity,true);
+        this.byteSetSize=bitSetSize/8+bitSetSize%8;
+        this.capacity=capacity;
+    }
+
+
+    public void put(String key,BitSet bitSet){
+        if(cache.size>cache.capacity){
+            synchronized (this){
+                if(cache.size>cache.capacity){
+                    cache=new ByteArrayValueKV(capacity,true);
+                }
+            }
+        }
+        cache.put(key,bitSet.getBytes());
+
+    }
+
+    public static void main(String[] args) {
+        BitSetCache bitSetCache=new BitSetCache(150,30000);
+        BitSet bitSet=bitSetCache.createBitSet();
+        bitSet.set(13);
+        bitSetCache.put("fdsdf",bitSet);
+        BitSet bitSet1=bitSetCache.get("fdsdf");
+        System.out.println(bitSet1.get(13));
+    }
+
+    public BitSet get(String key){
+        byte[] bytes=cache.get(key);
+        if(bytes==null){
+            return null;
+        }
+       return new BitSet(bytes);
+
+    }
+
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index d6b09da..1656c65 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -26,20 +26,16 @@ import org.apache.rocketmq.streams.common.utils.PrintUtil;
  */
 public class OutputPrintChannel extends AbstractSink {
 
-    private static int counter = 1;
-    private transient boolean start = false;
-    private static long startTime = System.currentTimeMillis();
-    private static long begin = startTime;
-    private static int step = 40000;
 
     @Override
     protected boolean batchInsert(List<IMessage> messages) {
-        StringBuilder stringBuilder = new StringBuilder();
         for (IMessage msg : messages) {
-            stringBuilder.append(msg.getMessageValue().toString() + PrintUtil.LINE);
+            System.out.println(msg.getMessageBody().toJSONString());
         }
-        System.out.println(stringBuilder.toString());
         return false;
     }
 
+
+
+
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleSink.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleSink.java
index da37adc..b2a1c46 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleSink.java
@@ -60,7 +60,12 @@ public abstract class AbstractSupportShuffleSink extends AbstractSink {
         if (!hasCreated) {
             synchronized (this) {
                 if (!hasCreated) {
-                    createTopicIfNotExist(splitNum);
+                    try {
+                        createTopicIfNotExist(splitNum);
+                    }catch (Exception e){
+                        e.printStackTrace();
+                    }
+
                     hasCreated = true;
                 }
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
index 9868cb5..070bdce 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/Message.java
@@ -94,6 +94,8 @@ public class Message implements IMessage {
             jsonObject.put(key, message.get(key));
         }
         Message message = new Message(jsonObject);
+        message.setSystemMessage(getSystemMessage());
+        message.isJsonMessage=isJsonMessage;
         message.header = getHeader().copy();
         return message;
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
index 500d611..4044b06 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java
@@ -126,6 +126,7 @@ public class MessageHeader {
         header.msgRouteFromLable = msgRouteFromLable;
         header.logFingerprintValue = logFingerprintValue;
         header.messageQueue = messageQueue;
+        header.checkpointQueueIds=checkpointQueueIds;
         return header;
     }
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/model/ServiceName.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/model/ServiceName.java
index 6bf004a..6d3d1ce 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/model/ServiceName.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/model/ServiceName.java
@@ -29,4 +29,8 @@ public @interface ServiceName {
     String value() default "";
 
     String aliasName() default "";
+
+
+
+    String name() default "";
 }
\ No newline at end of file
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/Pipeline.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/Pipeline.java
index 8b6af5b..98d3969 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/Pipeline.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/Pipeline.java
@@ -121,6 +121,9 @@ public class Pipeline<T extends IMessage> extends BasedConfigurable
implements I
             } else if (systemMessage instanceof RemoveSplitMessage) {
                 stage.removeSplit(t, context, (RemoveSplitMessage)systemMessage);
             } else {
+                if(systemMessage==null){
+                    return true;
+                }
                 throw new RuntimeException("can not support this system message " + systemMessage.getClass().getName());
             }
             if (stage.isAsyncNode()) {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
index fe364b0..e5dcc6c 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/AbstractWindowStage.java
@@ -37,13 +37,19 @@ public abstract class AbstractWindowStage<T extends IMessage> extends
ChainStage
 
     @Override
     public void checkpoint(IMessage message, AbstractContext context, CheckPointMessage checkPointMessage)
{
+       if(window.getWindowCache()==null){//over window windowcache  is null
+           return;
+       }
         if(message.getHeader().isNeedFlush()){
-            if(message.getHeader().getCheckpointQueueIds()!=null&&message.getHeader().getCheckpointQueueIds().size()>0){
+            if(window.getWindowCache()!=null&&message.getHeader().getCheckpointQueueIds()!=null&&message.getHeader().getCheckpointQueueIds().size()>0){
                 window.getWindowCache().checkpoint(message.getHeader().getCheckpointQueueIds());
             }else {
-                Set<String> queueIds=new HashSet<>();
-                queueIds.add(message.getHeader().getQueueId());
-                window.getWindowCache().checkpoint(queueIds);
+                if(window.getWindowCache()!=null){
+                    Set<String> queueIds=new HashSet<>();
+                    queueIds.add(message.getHeader().getQueueId());
+                    window.getWindowCache().checkpoint(queueIds);
+                }
+
             }
 
         }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
index 9ffba1a..599d430 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
@@ -82,7 +82,7 @@ public class OutputChainStage<T extends IMessage> extends ChainStage<T>
implemen
              */
             if(openMockChannel()){
                 if(mockSink!=null){
-                    mockSink.batchAdd(message);
+                    mockSink.batchAdd(message.deepCopy());
                     return message;
                 }
                 return message;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
index 7df7505..23facde 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/ReflectUtil.java
@@ -510,8 +510,12 @@ public class ReflectUtil {
     public static Object invoke(Object object, String methodName, Class[] classes, Object[]
objects) {
         try {
             Class clazz = object.getClass();
-            Method method= clazz.getDeclaredMethod(methodName, classes);
-            method.setAccessible(true);
+            Method method=clazz.getMethod(methodName, classes);
+            if(method==null){
+                method= clazz.getDeclaredMethod(methodName, classes);
+                method.setAccessible(true);
+            }
+
             return method.invoke(object, objects);
         } catch (Exception e) {
             throw new RuntimeException(
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/expression/ScriptFunction.java
b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/expression/ScriptFunction.java
index 67d5495..437e4dd 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/expression/ScriptFunction.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/function/expression/ScriptFunction.java
@@ -40,7 +40,6 @@ import org.apache.rocketmq.streams.script.utils.FunctionUtils;
 
 @Function
 public class ScriptFunction extends AbstractExpressionFunction {
-    ScriptComponent scriptComponent = ScriptComponent.getInstance();
     public static final String SPLIT_SIGN = "######";//对参数进行分隔
     public static final String QUOTATION_CONVERT = "^^^^";//单引号转换
 
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/model/FunctionConfigure.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/model/FunctionConfigure.java
index ec768ed..4c5c259 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/model/FunctionConfigure.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/model/FunctionConfigure.java
@@ -326,6 +326,12 @@ public class FunctionConfigure {
         }
         return parameters;
     }
+    public Object getRealValue(int parameterIndex, Object value) {
+
+        DataType datatype=this.parameterDataTypes[parameterIndex];
+        return getRealValue(datatype,value);
+
+    }
 
     private Object getRealValue(DataType dataType, Object value) {
         try {
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
index 230b359..6f43dd6 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/operator/expression/ScriptExpression.java
@@ -123,7 +123,7 @@ public class ScriptExpression implements IScriptExpression {
             System.out.println("");
         }
         Object value = functionConfigure.execute(ps);
-        compileScriptExpression = new CompileScriptExpression(this, functionConfigure);
+            compileScriptExpression = new CompileScriptExpression(this, functionConfigure);
         if (StringUtil.isNotEmpty(newFieldName) && value != null) {
             setValue2Var(message, context, newFieldName, value);
             //message.getMessageBody().put(newFieldName, value);
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/CompileScriptExpression.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/CompileScriptExpression.java
index 397deb6..97f8773 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/CompileScriptExpression.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/CompileScriptExpression.java
@@ -105,7 +105,7 @@ public class CompileScriptExpression {
                 Entry<Integer, CompileParameter> entry = it.next();
                 Integer index = entry.getKey();
                 CompileParameter compileParameter = entry.getValue();
-                parameters[index] = compileParameter.getValue(message, context);
+                parameters[index] = functionConfigure.getRealValue(index,compileParameter.getValue(message,
context));
             }
         }
         Object value = functionConfigure.directReflectExecute(parameters);
diff --git a/rocketmq-streams-serviceloader/src/main/java/org/apache/rocketmq/streams/serviceloader/ServiceLoaderComponent.java
b/rocketmq-streams-serviceloader/src/main/java/org/apache/rocketmq/streams/serviceloader/ServiceLoaderComponent.java
index 3051aea..2dc4d5f 100644
--- a/rocketmq-streams-serviceloader/src/main/java/org/apache/rocketmq/streams/serviceloader/ServiceLoaderComponent.java
+++ b/rocketmq-streams-serviceloader/src/main/java/org/apache/rocketmq/streams/serviceloader/ServiceLoaderComponent.java
@@ -147,6 +147,10 @@ public class ServiceLoaderComponent<T> extends AbstractComponent<IServiceLoaderS
             serviceNames.add(annotation.aliasName());
         }
 
+        if (StringUtil.isNotEmpty(annotation.name())) {
+            serviceNames.add(annotation.name());
+        }
+
         return serviceNames;
     }
 }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index cfa7513..aea126e 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@ -144,14 +144,14 @@ public abstract class AbstractWindow extends BasedConfigurable implements
IWindo
     /**
      * 默认为空,窗口的触发类似flink,在测试模式下,因为消息有界,期望当消息发送完成后能触发,可以设置两条消息的最大间隔,超过这个间隔,将直接触发消息
      */
-    protected Long msgMaxGapSecond;
+    protected Long msgMaxGapSecond=10L;
 
     /**
      * 是否支持过期数据的计算 过期:当前时间大于数据所在窗口的触发时间
      */
     protected int fireMode=0;//0:普通触发,firetime后收到数据丢弃;1:多实例多次独立触发,在watermark时间内,同starttime,endtime创建多个实例,多次触发;2.单实例,多次独立触发,每次触发是最新值
 
-    protected boolean isLocalStorageOnly=false;//是否只用本地存储,可以提高性能,但不保证可靠性
+    protected boolean isLocalStorageOnly=true;//是否只用本地存储,可以提高性能,但不保证可靠性
     protected String reduceSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
     protected transient IReducer reducer;
     /**
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/source/WindowRireSource.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/source/WindowRireSource.java
index 3810cbc..fad5ecb 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/source/WindowRireSource.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/source/WindowRireSource.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBac
 import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMutilSplitMessageCache;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
 import org.apache.rocketmq.streams.common.channel.source.AbstractSupportOffsetResetSource;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.context.Message;
@@ -258,6 +259,13 @@ public class WindowRireSource extends AbstractSupportOffsetResetSource
implement
             return new FireResult();
         }
         Date fireTime=DateUtil.parseTime(windowInstance.getFireTime());
+        Boolean isTest= ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
+        if(isTest){
+            if(System.currentTimeMillis()-fireTime.getTime()>0){
+                System.out.println("window instance is fired");
+                return new FireResult(true,3);
+            }
+        }
         /**
          * 未到触发时间
          */

Mime
View raw message