rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-streams] branch main updated: add update logic for the DBSinker 、 upgrade the concat_ws function (#57)
Date Wed, 15 Sep 2021 03:37:02 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 7752a44  add update logic for the DBSinker 、 upgrade the concat_ws function (#57)
7752a44 is described below

commit 7752a44aa45a2a174d71cfe62db053630043354c
Author: 零号程序 <jet.j.j.cheng@gmail.com>
AuthorDate: Wed Sep 15 11:36:58 2021 +0800

    add update logic for the DBSinker 、 upgrade the concat_ws function (#57)
    
    * add update logic for the DBSinker 、 upgrade the concat_ws function
    
    * Add the field level cache to reduce duplicate data entry #60
    
    Co-authored-by: junjie.cheng <junjie.cheng@alibaba-inc.com>
---
 .../apache/rocketmq/streams/db/sink/DBSink.java    |  96 ++++++++--------
 .../rocketmq/streams/db/sink/DBSinkBuilder.java    |  41 +++----
 .../rocketmq/streams/client/DataStreamAction.java  |   6 +-
 .../client/transform/window/HoppingWindow.java     |   5 +-
 .../common/cache/compress/AdditionStore.java       |   7 +-
 .../streams/common/cache/compress/BitSetCache.java |  88 +++++++-------
 .../streams/common/cache/compress/ByteArray.java   |  13 ++-
 .../common/cache/compress/ByteArrayValueKV.java    |   9 +-
 .../streams/common/cache/compress/CacheKV.java     |  18 +--
 .../streams/common/cache/compress/ICacheKV.java    |   1 -
 .../streams/common/cache/compress/KVElement.java   |   2 +-
 .../cache/compress/impl/FixedLenRowCacheKV.java    |   4 +-
 .../common/cache/compress/impl/IntValueKV.java     |   8 +-
 .../common/cache/compress/impl/LongValueKV.java    |  67 +++++++++++
 .../common/cache/compress/impl/MutilValueKV.java   |  46 ++++----
 .../AbstractSupportShuffleChannelBuilder.java      |   2 +-
 .../common/channel/impl/OutputPrintChannel.java    |   7 +-
 .../streams/common/channel/sink/AbstractSink.java  |  35 +++---
 .../streams/common/context/AbstractContext.java    |  19 ++--
 .../streams/common/topology/ChainPipeline.java     |  95 +++++++++++++---
 .../common/topology/stages/OutputChainStage.java   |  52 ++++-----
 .../rocketmq/streams/common/utils/SQLUtil.java     |  54 +++++----
 .../service/AbstractConfigurableService.java       |   6 +-
 .../streams/filter/context/RuleContext.java        |   2 +-
 .../lease/service/storages/DBLeaseStorage.java     |   1 +
 .../streams/script/context/FunctionContext.java    |   2 +-
 .../script/function/impl/field/FieldFunction.java  |  48 ++++----
 .../function/impl/flatmap/SplitArrayFunction.java  |   4 +-
 .../performance/AbstractScriptProxy.java           |  11 +-
 .../performance/CaseScriptExpressionProxy.java     |  31 ++---
 .../optimization/performance/EqualsProxy.java      |   5 +-
 .../optimization/performance/RegexProxy.java       |   8 +-
 .../performance/ScriptExpressionGroupsProxy.java   |  26 +++--
 .../performance/ScriptOptimization.java            |  62 ++++------
 .../performance/ScriptProxyFactory.java            |  26 ++---
 .../performance/SimpleScriptExpressionProxy.java   |  26 ++---
 .../streams/window/operator/AbstractWindow.java    | 126 +++++++++++----------
 .../rocketmq/streams/window/sqlcache/SQLCache.java |   9 +-
 38 files changed, 594 insertions(+), 474 deletions(-)

diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
index 4a2b50e..1aab2ba 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -17,13 +17,7 @@
 package org.apache.rocketmq.streams.db.sink;
 
 import com.alibaba.fastjson.JSONObject;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Set;
+import com.google.common.collect.Lists;
 import org.apache.rocketmq.streams.common.channel.IChannel;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
@@ -38,6 +32,14 @@ import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.db.driver.DriverBuilder;
 import org.apache.rocketmq.streams.db.driver.JDBCDriver;
 
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+
 /**
  * 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐 sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}') MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
  */
@@ -45,6 +47,8 @@ public class DBSink extends AbstractSink {
 
     protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
 
+    protected String duplicateSQLTemplate; //通过on duplicate key update 来对已经存在的信息进行更新
+
     protected MetaData metaData;//可以指定meta data,和insertSQL二选一
 
     protected String tableName; //指定要插入的数据表
@@ -58,7 +62,7 @@ public class DBSink extends AbstractSink {
     @ENVDependence
     protected String password;
 
-    protected boolean openSqlCache=false;
+    protected boolean openSqlCache = true;
 
     protected transient IMessageCache<String> sqlCache;//cache sql, batch submit sql
 
@@ -111,26 +115,27 @@ public class DBSink extends AbstractSink {
                 ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
                 this.metaData = MetaData.createMetaData(metaResult);
                 this.metaData.setTableName(this.tableName);
-                sqlCache=new MessageCache<>(new IMessageFlushCallBack<String>() {
-                    @Override public boolean flushMessage(List<String> sqls) {
-                        JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
-                        try {
-                            dataSource.executSqls(sqls);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                            throw new RuntimeException(e);
-                        } finally {
-                            if (dataSource != null) {
-                                dataSource.destroy();
-                            }
+            }
+            sqlCache = new MessageCache<>(new IMessageFlushCallBack<String>() {
+                @Override
+                public boolean flushMessage(List<String> sqls) {
+                    JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
+                    try {
+                        dataSource.executSqls(sqls);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        throw new RuntimeException(e);
+                    } finally {
+                        if (dataSource != null) {
+                            dataSource.destroy();
                         }
-                        return true;
                     }
-                });
-                ((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
-                ((MessageCache<String>) sqlCache).setAutoFlushSize(50);
-                sqlCache.openAutoFlush();
-            }
+                    return true;
+                }
+            });
+            ((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
+            ((MessageCache<String>) sqlCache).setAutoFlushSize(50);
+            sqlCache.openAutoFlush();
             return super.initConfigurable();
         } catch (ClassNotFoundException | SQLException e) {
             e.printStackTrace();
@@ -148,7 +153,8 @@ public class DBSink extends AbstractSink {
             List<JSONObject> messages = convertJsonObjectFromMessage(messageList);
             if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
                 String sql = SQLUtil.createInsertSql(metaData, messages.get(0));
-                sql = sql + SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
+                sql += SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
+                sql += this.duplicateSQLTemplate;
                 executeSQL(dbDataSource, sql);
                 return true;
             }
@@ -156,26 +162,17 @@ public class DBSink extends AbstractSink {
             if (StringUtil.isEmpty(insertValueSQL) || insertSQLTemplate.replace(insertValueSQL, "").contains("#{")) {
                 for (JSONObject message : messages) {
                     String sql = parseSQL(message, insertSQLTemplate);
+                    sql += this.duplicateSQLTemplate;
                     executeSQL(dbDataSource, sql);
                 }
                 return true;
             } else {
-                StringBuilder sb = new StringBuilder();
-                String insertSQL;
-                boolean isFirst = true;
-                int i = 0;
+                List<String> subInsert = Lists.newArrayList();
                 for (JSONObject message : messages) {
-                    insertSQL = parseSQL(message, insertValueSQL);
-                    if (isFirst) {
-                        isFirst = false;
-                    } else {
-                        sb.append(",");
-                    }
-                    i++;
-
-                    sb.append(insertSQL);
+                    subInsert.add(parseSQL(message, insertValueSQL));
                 }
-                insertSQL = this.insertSQLTemplate.replace(insertValueSQL, sb.toString());
+                String insertSQL = this.insertSQLTemplate.replace(insertValueSQL, String.join(",", subInsert));
+                insertSQL += this.duplicateSQLTemplate;
                 executeSQL(dbDataSource, insertSQL);
                 return true;
             }
@@ -184,17 +181,18 @@ public class DBSink extends AbstractSink {
         }
     }
 
-    @Override public boolean checkpoint(Set<String> splitIds) {
-        if(sqlCache!=null){
+    @Override
+    public boolean checkpoint(Set<String> splitIds) {
+        if (sqlCache != null) {
             sqlCache.flush(splitIds);
         }
         return true;
     }
 
     protected void executeSQL(JDBCDriver dbDataSource, String sql) {
-        if(isOpenSqlCache()){
+        if (isOpenSqlCache()) {
             this.sqlCache.addCache(sql);
-        }else {
+        } else {
             dbDataSource.execute(sql);
         }
 
@@ -285,4 +283,12 @@ public class DBSink extends AbstractSink {
     public void setOpenSqlCache(boolean openSqlCache) {
         this.openSqlCache = openSqlCache;
     }
+
+    public String getDuplicateSQLTemplate() {
+        return duplicateSQLTemplate;
+    }
+
+    public void setDuplicateSQLTemplate(String duplicateSQLTemplate) {
+        this.duplicateSQLTemplate = duplicateSQLTemplate;
+    }
 }
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
index c960bae..87f5fb7 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
@@ -17,15 +17,16 @@
 package org.apache.rocketmq.streams.db.sink;
 
 import com.google.auto.service.AutoService;
-import java.util.List;
-import java.util.Properties;
+import com.google.common.collect.Lists;
 import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
 import org.apache.rocketmq.streams.common.metadata.MetaDataField;
 import org.apache.rocketmq.streams.common.model.ServiceName;
-import org.apache.rocketmq.streams.common.utils.DataTypeUtil;
+
+import java.util.List;
+import java.util.Properties;
 
 @AutoService(IChannelBuilder.class)
 @ServiceName(DBSinkBuilder.TYPE)
@@ -36,29 +37,23 @@ public class DBSinkBuilder implements IChannelBuilder {
     public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
         DBSink sink = new DBSink();
         sink.setUrl(properties.getProperty("url"));
-        sink.setUserName("userName");
-        sink.setPassword("password");
+        sink.setUserName(properties.getProperty("userName"));
+        sink.setPassword(properties.getProperty("password"));
         List<MetaDataField> fieldList = metaData.getMetaDataFields();
-        StringBuilder insertSQL = new StringBuilder();
-        StringBuilder insertValueSQL = new StringBuilder();
-        boolean isFirst = true;
-        for (MetaDataField field : fieldList) {
+
+        List<String> insertFields = Lists.newArrayList();
+        List<String> insertValues = Lists.newArrayList();
+        List<String> duplicateKeys = Lists.newArrayList();
+        fieldList.forEach(field -> {
             String fieldName = field.getFieldName();
-            if (isFirst) {
-                isFirst = false;
-            } else {
-                insertSQL.append(",");
-                insertValueSQL.append(",");
-            }
-            insertSQL.append(fieldName);
-            if (DataTypeUtil.isNumber(field.getDataType())) {
-                insertValueSQL.append(fieldName);
-            } else {
-                insertValueSQL.append("'#{" + fieldName + "}'");
-            }
-        }
-        String sql = "insert into " + properties.getProperty("tableName") + "(" + insertSQL.toString() + ")values(" + insertValueSQL.toString() + ")";
+            insertFields.add(fieldName);
+            insertValues.add("'#{" + fieldName + "}'");
+            duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
+        });
+
+        String sql = "insert into " + properties.getProperty("tableName") + "(" + String.join(",", insertFields) + ") values (" + String.join(",", insertValues) + ")  ";
         sink.setInsertSQLTemplate(sql);
+        sink.setDuplicateSQLTemplate(" on duplicate key update " + String.join(",", duplicateKeys));
         return sink;
     }
 
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
index 91f16fa..0a052fa 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java
@@ -18,9 +18,11 @@
 package org.apache.rocketmq.streams.client;
 
 import com.google.common.collect.Maps;
+
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+
 import org.apache.rocketmq.streams.client.strategy.Strategy;
 import org.apache.rocketmq.streams.client.transform.DataStream;
 import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -79,11 +81,11 @@ public class DataStreamAction extends DataStream {
         }
 
         ConfigurableComponent configurableComponent = ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(), ConfigurableComponent.class, kvs);
-        ChainPipeline pipeline = this.mainPipelineBuilder.build(configurableComponent.getService());
+        ChainPipeline<?> pipeline = this.mainPipelineBuilder.build(configurableComponent.getService());
         pipeline.startChannel();
         if (this.otherPipelineBuilders != null) {
             for (PipelineBuilder builder : otherPipelineBuilders) {
-                ChainPipeline otherPipeline = builder.build(configurableComponent.getService());
+                ChainPipeline<?> otherPipeline = builder.build(configurableComponent.getService());
                 otherPipeline.startChannel();
             }
         }
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/HoppingWindow.java b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/HoppingWindow.java
index 0c945f0..9078b16 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/HoppingWindow.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/window/HoppingWindow.java
@@ -20,10 +20,11 @@ package org.apache.rocketmq.streams.client.transform.window;
 public class HoppingWindow {
     /**
      * 滑动窗口信息
+     *
      * @return
      */
-    public static WindowInfo of(Time windowSize,Time windowSlide){
-        WindowInfo windowInfo=new WindowInfo();
+    public static WindowInfo of(Time windowSize, Time windowSlide) {
+        WindowInfo windowInfo = new WindowInfo();
         windowInfo.setType(WindowInfo.HOPPING_WINDOW);
         windowInfo.setWindowSize(windowSize);
         windowInfo.setWindowSlide(windowSlide);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AdditionStore.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AdditionStore.java
index 0b441bf..e8485b7 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AdditionStore.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/AdditionStore.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.streams.common.cache.compress;
 import java.util.ArrayList;
 import java.util.List;
 
-
 public class AdditionStore {
 
     /**
@@ -104,8 +103,8 @@ public class AdditionStore {
         CacheKV.MapAddress address = new CacheKV.MapAddress(conflictIndex, conflictOffset);
         if (isVarLen) {
             int size = value.length;
-            bytes[conflictOffset] = (byte)(size & 0xff);
-            bytes[conflictOffset + 1] = (byte)(size >> 8 & 0xff);
+            bytes[conflictOffset] = (byte) (size & 0xff);
+            bytes[conflictOffset + 1] = (byte) (size >> 8 & 0xff);
             conflictOffset = conflictOffset + 2;
         }
         for (int i = 0; i < value.length; i++) {
@@ -131,7 +130,7 @@ public class AdditionStore {
         if (bytes == null) {
             return null;
         }
-        if (isVarLen == false) {
+        if (!isVarLen) {
             return new ByteArray(bytes, mapAddress.offset, elementSize);
         } else {
             int len = new ByteArray(bytes, mapAddress.offset, 2).castInt(0, 2);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
index 7a4c5fb..bc4a6f1 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java
@@ -26,81 +26,81 @@ public class BitSetCache {
     protected int capacity;
     protected int bitSetSize;
 
-    public class BitSet{
+    public class BitSet {
         private byte[] bytes;
 
-        public BitSet(){
-            bytes=new byte[byteSetSize];
+        public BitSet() {
+            bytes = new byte[byteSetSize];
         }
-        public BitSet(byte[] bytes){
-            this.bytes=bytes;
+
+        public BitSet(byte[] bytes) {
+            this.bytes = bytes;
         }
-        public void set(int index){
-            if(index>bitSetSize){
-                throw new RuntimeException("the index exceed max index, max index is "+byteSetSize+", real is "+index);
+
+        public void set(int index) {
+            if (index > bitSetSize) {
+                throw new RuntimeException("the index exceed max index, max index is " + byteSetSize + ", real is " + index);
             }
-            int byteIndex=index/8;
-            int bitIndex=index%8;
-            byte byteElement=bytes[byteIndex];
-            byteElement = (byte) (byteElement|(1 << bitIndex));
-            bytes[byteIndex]=byteElement;
+            int byteIndex = index / 8;
+            int bitIndex = index % 8;
+            byte byteElement = bytes[byteIndex];
+            byteElement = (byte) (byteElement | (1 << bitIndex));
+            bytes[byteIndex] = byteElement;
         }
-        public boolean get(int index){
-            if(index>bitSetSize){
-                throw new RuntimeException("the index exceed max index, max index is "+byteSetSize+", real is "+index);
+
+        public boolean get(int index) {
+            if (index > bitSetSize) {
+                throw new RuntimeException("the index exceed max index, max index is " + byteSetSize + ", real is " + index);
             }
-            int byteIndex=index/8;
-            int bitIndex=index%8;
-            byte byteElement=bytes[byteIndex];
-            boolean isTrue = ((byteElement & (1 << bitIndex)) != 0);
-            return isTrue;
+            int byteIndex = index / 8;
+            int bitIndex = index % 8;
+            byte byteElement = bytes[byteIndex];
+            return ((byteElement & (1 << bitIndex)) != 0);
         }
 
-        public byte[] getBytes(){
+        public byte[] getBytes() {
             return bytes;
         }
     }
 
-    public BitSet createBitSet(){
+    public BitSet createBitSet() {
         return new BitSet();
     }
 
-
-    public BitSetCache(int bitSetSize, int capacity){
-        cache=new ByteArrayValueKV(capacity,true);
-        this.byteSetSize=bitSetSize/8+bitSetSize%8;
-        this.capacity=capacity;
-        this.bitSetSize=bitSetSize;
+    public BitSetCache(int bitSetSize, int capacity) {
+        cache = new ByteArrayValueKV(capacity, true);
+        this.byteSetSize = bitSetSize / 8 + bitSetSize % 8;
+        this.capacity = capacity;
+        this.bitSetSize = bitSetSize;
     }
 
-
-    public void put(String key,BitSet bitSet){
-        if(cache.size>cache.capacity){
-            synchronized (this){
-                if(cache.size>cache.capacity){
-                    cache=new ByteArrayValueKV(capacity,true);
+    public void put(String key, BitSet bitSet) {
+        if (cache.size > cache.capacity) {
+            synchronized (this) {
+                if (cache.size > cache.capacity) {
+                    cache = new ByteArrayValueKV(capacity, true);
                 }
             }
         }
-        cache.put(key,bitSet.getBytes());
+        cache.put(key, bitSet.getBytes());
 
     }
 
     public static void main(String[] args) {
-        BitSetCache bitSetCache=new BitSetCache(150,30000);
-        BitSet bitSet=bitSetCache.createBitSet();
+        BitSetCache bitSetCache = new BitSetCache(150, 30000);
+        BitSet bitSet = bitSetCache.createBitSet();
         bitSet.set(13);
-        bitSetCache.put("fdsdf",bitSet);
-        BitSet bitSet1=bitSetCache.get("fdsdf");
+        bitSetCache.put("fdsdf", bitSet);
+        BitSet bitSet1 = bitSetCache.get("fdsdf");
         System.out.println(bitSet1.get(13));
     }
 
-    public BitSet get(String key){
-        byte[] bytes=cache.get(key);
-        if(bytes==null){
+    public BitSet get(String key) {
+        byte[] bytes = cache.get(key);
+        if (bytes == null) {
             return null;
         }
-       return new BitSet(bytes);
+        return new BitSet(bytes);
 
     }
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArray.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArray.java
index 969d299..cc48286 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArray.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArray.java
@@ -59,6 +59,15 @@ public class ByteArray {
         return res;
     }
 
+    public long castLong(int offset, int size) {
+        int index = startIndex + offset;
+        long res = 0L;
+        for (int i = 0; i < size; i++) {
+            res += (long) (bytes[i + index] & 0xff) << (i * 8);
+        }
+        return res;
+    }
+
     public byte getByte(int offset) {
         int index = startIndex + offset;
         return bytes[index];
@@ -94,9 +103,9 @@ public class ByteArray {
     protected void flush(int value) {
         for (int i = 0; i < 4; i++) {
             if (i == 0) {
-                this.bytes[i + this.startIndex] = (byte)(value & 0xff);
+                this.bytes[i + this.startIndex] = (byte) (value & 0xff);
             } else {
-                this.bytes[i + this.startIndex] = (byte)(value >> (i * 8) & 0xff);
+                this.bytes[i + this.startIndex] = (byte) (value >> (i * 8) & 0xff);
             }
         }
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArrayValueKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArrayValueKV.java
index 0e3d789..6d3bd38 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArrayValueKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ByteArrayValueKV.java
@@ -89,18 +89,13 @@ public class ByteArrayValueKV extends CacheKV<byte[]> {
 
     @Override
     public int calMemory() {
-        int value = super.calMemory() + (this.conflicts.getConflictIndex() + 1) * this.conflicts
-            .getBlockSize();
-        return value;
+        return super.calMemory() + (this.conflicts.getConflictIndex() + 1) * this.conflicts.getBlockSize();
     }
 
     @Override
     public boolean contains(String key) {
         byte[] bytes = get(key);
-        if (bytes == null) {
-            return false;
-        }
-        return true;
+        return bytes != null;
     }
 
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/CacheKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/CacheKV.java
index 08bb056..4af9ba1 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/CacheKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/CacheKV.java
@@ -75,7 +75,9 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
     public abstract void put(String key, T value);
 
     public T remove(String key) {
-        if (StringUtil.isEmpty(key)) { return null; }
+        if (StringUtil.isEmpty(key)) {
+            return null;
+        }
         MapElementContext context = queryMapElementByHashCode(key);
         /**
          * TODO:
@@ -113,11 +115,11 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
 
         KVElement mapElement = context.mapElement;
         //如果没有发生冲突,说明当前节点无被占用,直接写入
-        if (context.isOccurConflict == false) {
+        if (!context.isOccurConflict) {
             size++;
 
             mapElement.keyHashCode.flush(mapElement.getKeyHashCode());
-            if (mapElement.isNoValue() == false) {
+            if (!mapElement.isNoValue()) {
                 mapElement.value.flush(value);
             }
 
@@ -125,7 +127,7 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
         } else {
             //如果key已经存在,覆盖value
             if (context.isMatchKey) {
-                if (mapElement.isNoValue() == false) {
+                if (!mapElement.isNoValue()) {
                     if (!supportUpdate) {
                         return false;
                     }
@@ -280,7 +282,7 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
             this.mapAddress = mapAddress;
             this.mapElement = mapElement;
             this.isMatchKey = isMatchKey;
-            if (mapElement.isEmpty() == false) {
+            if (!mapElement.isEmpty()) {
                 isOccurConflict = true;
             }
         }
@@ -327,7 +329,7 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
         }
 
         public boolean isEmpty() {
-            return isConflict == false && conflictIndex == 0 && offset == 0;
+            return !isConflict && conflictIndex == 0 && offset == 0;
         }
 
         /**
@@ -356,14 +358,14 @@ public abstract class CacheKV<T> implements ICacheKV<T> {
         public byte[] createBytes() {
             byte[] bytes = NumberUtils.toByte(offset);
             int value = 0;
-            byte fisrtByte = (byte)(conflictIndex & 0xff);
+            byte fisrtByte = (byte) (conflictIndex & 0xff);
             if (isConflict) {
                 value = (fisrtByte | (1 << 7));//把第一位变成1
             } else {
                 return bytes;
             }
 
-            bytes[bytes.length - 1] = (byte)(value & 0xff);
+            bytes[bytes.length - 1] = (byte) (value & 0xff);
             return bytes;
         }
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ICacheKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ICacheKV.java
index 9a766e0..885affc 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ICacheKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/ICacheKV.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.streams.common.cache.compress;
 
 /**
  * kv提供的对外接口,通过二进制实现存储,减少java对象带来的头部开销。 需要指定初始容量,会在创建对象时分配内存。
- *
  */
 public interface ICacheKV<T> {
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/KVElement.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/KVElement.java
index 60f2f0b..d494882 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/KVElement.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/KVElement.java
@@ -51,7 +51,7 @@ public class KVElement {
     }
 
     public static byte[] createByteArray(CacheKV.MapAddress nextAddress, byte[] keyHashCode, int value,
-                                         int elementSize) {
+        int elementSize) {
         KVElement element = new KVElement(nextAddress, keyHashCode, value);
         element.setElementSize(elementSize);
         return element.getBytes();
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/FixedLenRowCacheKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/FixedLenRowCacheKV.java
index 56a300f..d595da3 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/FixedLenRowCacheKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/FixedLenRowCacheKV.java
@@ -133,8 +133,8 @@ public class FixedLenRowCacheKV {
             Object o = values[i];
             byte[] byteValue = null;
             if (dataType instanceof SetDataType) {
-                byteValue = new byte[] {(byte)i};
-                add2Set(i, (Set<String>)o);
+                byteValue = new byte[] {(byte) i};
+                add2Set(i, (Set<String>) o);
             } else {
                 byteValue = dataType.toBytes(o, false);
             }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
index a7577d3..23b354e 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/IntValueKV.java
@@ -35,7 +35,6 @@ import org.junit.Assert;
 
 /**
  * 支持key是string,value是int的场景,支持size不大于10000000.只支持int,long,boolean,string类型
- *
  */
 public class IntValueKV extends CacheKV<Integer> {
 
@@ -79,9 +78,7 @@ public class IntValueKV extends CacheKV<Integer> {
 
     @Override
     public int calMemory() {
-        int value = super.calMemory() + (this.conflicts.getConflictIndex() + 1) * this.conflicts
-            .getBlockSize();
-        return value;
+        return super.calMemory() + (this.conflicts.getConflictIndex() + 1) * this.conflicts.getBlockSize();
     }
 
     /**
@@ -89,9 +86,6 @@ public class IntValueKV extends CacheKV<Integer> {
      *
      * @return
      */
-    //public Integer remove(String key) {
-    //    return null;
-    //}
     public IntValueKV(int capacity) {
         super(capacity);
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongValueKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongValueKV.java
new file mode 100644
index 0000000..78bd032
--- /dev/null
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/LongValueKV.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.common.cache.compress.impl;
+
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.streams.common.cache.compress.ByteArrayValueKV;
+import org.apache.rocketmq.streams.common.cache.compress.CacheKV;
+
+public class LongValueKV extends CacheKV<Long> {
+
+    private final ByteArrayValueKV byteArrayValueKV;
+
+    public LongValueKV(int capacity) {
+        super(capacity, 8);
+        byteArrayValueKV = new ByteArrayValueKV(capacity, true);
+    }
+
+    @Override
+    public void put(String key, Long value) {
+        ByteBuffer buffer = ByteBuffer.allocate(8);
+        buffer.putLong(0, value);
+        byte[] bytes = buffer.array();
+        byteArrayValueKV.put(key, bytes);
+    }
+
+    @Override
+    public boolean contains(String key) {
+        return byteArrayValueKV.contains(key);
+    }
+
+    @Override
+    public int getSize() {
+        return byteArrayValueKV.getSize();
+    }
+
+    @Override
+    public int calMemory() {
+        return byteArrayValueKV.calMemory();
+    }
+
+    @Override
+    public Long get(String key) {
+        ByteBuffer buffer = ByteBuffer.allocate(8);
+        byte[] bytes = byteArrayValueKV.get(key);
+        if (bytes == null) {
+            return null;
+        }
+        buffer.put(bytes, 0, 8);
+        buffer.flip();
+        return buffer.getLong();
+    }
+
+}
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/MutilValueKV.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/MutilValueKV.java
index 3065a46..0a6009a 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/MutilValueKV.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/impl/MutilValueKV.java
@@ -23,26 +23,25 @@ import org.apache.rocketmq.streams.common.cache.compress.ICacheKV;
 
 public abstract class MutilValueKV<T> implements ICacheKV<T> {
     //按固定大小分割存储
-    protected List<ICacheKV<T>> valueKVS=new ArrayList<>();
+    protected List<ICacheKV<T>> valueKVS = new ArrayList<>();
     //当前存储的索引
-    protected int currentIndex=0;
+    protected int currentIndex = 0;
     //每个分片的大小
     protected int capacity;
 
-    public MutilValueKV(int capacity){
-        this.capacity=capacity;
+    public MutilValueKV(int capacity) {
+        this.capacity = capacity;
     }
 
-
     @Override
     public T get(String key) {
-        if(valueKVS==null){
+        if (valueKVS == null) {
             return null;
         }
-        for(ICacheKV<T> cacheKV:valueKVS){
-            if(cacheKV!=null){
-               T value=cacheKV.get(key);
-                if(value!=null){
+        for (ICacheKV<T> cacheKV : valueKVS) {
+            if (cacheKV != null) {
+                T value = cacheKV.get(key);
+                if (value != null) {
                     return value;
                 }
             }
@@ -52,32 +51,32 @@ public abstract class MutilValueKV<T> implements ICacheKV<T> {
 
     @Override
     public void put(String key, T value) {
-        if(valueKVS==null){
+        if (valueKVS == null) {
             return;
         }
-        ICacheKV<T> cacheKV= valueKVS.get(currentIndex);
-        if(cacheKV.getSize()>=capacity){
-            synchronized (this){
-                cacheKV= valueKVS.get(currentIndex);
-                if(cacheKV.getSize()>=capacity){
-                    cacheKV=create();
+        ICacheKV<T> cacheKV = valueKVS.get(currentIndex);
+        if (cacheKV.getSize() >= capacity) {
+            synchronized (this) {
+                cacheKV = valueKVS.get(currentIndex);
+                if (cacheKV.getSize() >= capacity) {
+                    cacheKV = create();
                     valueKVS.add(cacheKV);
                     currentIndex++;
                 }
             }
         }
-        cacheKV.put(key,value);
+        cacheKV.put(key, value);
     }
 
     @Override
     public boolean contains(String key) {
-        if(valueKVS==null){
+        if (valueKVS == null) {
             return false;
         }
-        for(ICacheKV<T> cacheKV:valueKVS){
-            if(cacheKV!=null){
-                boolean isMatch=cacheKV.contains(key);
-                if(isMatch){
+        for (ICacheKV<T> cacheKV : valueKVS) {
+            if (cacheKV != null) {
+                boolean isMatch = cacheKV.contains(key);
+                if (isMatch) {
                     return true;
                 }
             }
@@ -85,7 +84,6 @@ public abstract class MutilValueKV<T> implements ICacheKV<T> {
         return false;
     }
 
-
     protected abstract ICacheKV<T> create();
 
     @Override
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
index 2119a6b..211fc17 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/builder/AbstractSupportShuffleChannelBuilder.java
@@ -25,6 +25,6 @@ public abstract class AbstractSupportShuffleChannelBuilder implements IChannelBu
     @Override
     public ISource copy(ISource pipelineSource) {
         JSONObject jsonObject = JSONObject.parseObject(pipelineSource.toJson());
-        return (ISource)ConfigurableUtil.create(pipelineSource.getNameSpace(), pipelineSource.getConfigureName(), jsonObject, pipelineSource.getClass().getName());
+        return ConfigurableUtil.create(pipelineSource.getNameSpace(), pipelineSource.getConfigureName(), jsonObject, pipelineSource.getClass().getName());
     }
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
index bd7029f..8c4f63c 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.streams.common.channel.impl;
 
 import java.util.List;
+
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.utils.PrintUtil;
@@ -26,16 +27,12 @@ import org.apache.rocketmq.streams.common.utils.PrintUtil;
  */
 public class OutputPrintChannel extends AbstractSink {
 
-
     @Override
     protected boolean batchInsert(List<IMessage> messages) {
         for (IMessage msg : messages) {
-            System.out.println(msg.getMessageValue());
+            //System.out.println(msg.getMessageValue());
         }
         return false;
     }
 
-
-
-
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
index e56c30a..82218c2 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java
@@ -17,12 +17,14 @@
 package org.apache.rocketmq.streams.common.channel.sink;
 
 import com.alibaba.fastjson.JSONObject;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
@@ -72,13 +74,12 @@ public abstract class AbstractSink extends BasedConfigurable implements ISink<Ab
     }
 
     public ISplit getSplit(IMessage message) {
-        return (ISplit)message.getMessageBody().get(TARGET_QUEUE);
+        return (ISplit) message.getMessageBody().get(TARGET_QUEUE);
     }
 
     @Override
     public boolean batchAdd(IMessage fieldName2Value) {
         messageCache.addCache(fieldName2Value);
-
         return true;
     }
 
@@ -127,7 +128,7 @@ public abstract class AbstractSink extends BasedConfigurable implements ISink<Ab
     public boolean flush(Set<String> splitIds) {
         int size = messageCache.flush(splitIds);
         if (size > 0) {
-            System.out.println(this.getClass().getSimpleName()+ " finish flush data " + size);
+            System.out.println(this.getClass().getSimpleName() + " finish flush data " + size);
         }
 
         return size > 0;
@@ -135,11 +136,11 @@ public abstract class AbstractSink extends BasedConfigurable implements ISink<Ab
 
     @Override
     public boolean flush(String... splitIds) {
-        if(splitIds==null){
+        if (splitIds == null) {
             return true;
         }
-        Set<String> splitIdSet =new HashSet<>();
-        for(String splitId:splitIds){
+        Set<String> splitIdSet = new HashSet<>();
+        for (String splitId : splitIds) {
             splitIdSet.add(splitId);
         }
         return flush(splitIdSet);
@@ -172,16 +173,18 @@ public abstract class AbstractSink extends BasedConfigurable implements ISink<Ab
         return success;
     }
 
-    @Override public boolean checkpoint(Set<String> splitIds) {
+    @Override
+    public boolean checkpoint(Set<String> splitIds) {
         return flush(splitIds);
     }
 
-    @Override public boolean checkpoint(String... splitIds) {
-        if(splitIds==null){
+    @Override
+    public boolean checkpoint(String... splitIds) {
+        if (splitIds == null) {
             return false;
         }
-        Set<String> splitSet=new HashSet<>();
-        for(String splitId: splitIds){
+        Set<String> splitSet = new HashSet<>();
+        for (String splitId : splitIds) {
             splitSet.add(splitId);
         }
 
@@ -242,13 +245,13 @@ public abstract class AbstractSink extends BasedConfigurable implements ISink<Ab
 
     @Override
     public Map<String, MessageOffset> getFinishedQueueIdAndOffsets(CheckPointMessage checkPointMessage) {
-        String piplineName = null;
-        if (IConfigurableIdentification.class.isInstance(checkPointMessage.getStreamOperator())) {
-            IConfigurableIdentification configurable = (IConfigurableIdentification)checkPointMessage.getStreamOperator();
-            piplineName = configurable.getConfigureName();
+        String pipelineName = null;
+        if (checkPointMessage.getStreamOperator() instanceof IConfigurableIdentification) {
+            IConfigurableIdentification configurable = (IConfigurableIdentification) checkPointMessage.getStreamOperator();
+            pipelineName = configurable.getConfigureName();
         }
         SourceState sourceState = this.sourceName2State.get(
-            CheckPointManager.createSourceName(checkPointMessage.getSource(), piplineName));
+            CheckPointManager.createSourceName(checkPointMessage.getSource(), pipelineName));
         if (sourceState != null) {
             return sourceState.getQueueId2Offsets();
         }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
index 955e950..4e49b63 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/AbstractContext.java
@@ -150,23 +150,24 @@ public abstract class AbstractContext<T extends IMessage> extends HashMap {
     /**
      * cache filter(regex,like,equals)result
      */
-    private static String FILTER_CACHE_PREPIX="__filter_cache_prefix";
-    public void setFilterCache(String expressionStr,String varValue, boolean result){
-        this.put(MapKeyUtil.createKey(FILTER_CACHE_PREPIX,expressionStr,varValue),result);
+    private static String FILTER_CACHE_PREPIX = "__filter_cache_prefix";
+
+    public void setFilterCache(String expressionStr, String varValue, boolean result) {
+        this.put(MapKeyUtil.createKey(FILTER_CACHE_PREPIX, expressionStr, varValue), result);
     }
 
     /**
      * get cache result
+     *
      * @param expressionStr
      * @param varValue
      * @return
      */
-    public Boolean getFilterCache(String expressionStr,String varValue){
-        String key=MapKeyUtil.createKey(FILTER_CACHE_PREPIX,expressionStr,varValue);
+    public Boolean getFilterCache(String expressionStr, String varValue) {
+        String key = MapKeyUtil.createKey(FILTER_CACHE_PREPIX, expressionStr, varValue);
         return (Boolean) this.get(key);
     }
 
-
     /**
      * 获取基于字段缓存的某些值
      *
@@ -176,7 +177,7 @@ public abstract class AbstractContext<T extends IMessage> extends HashMap {
      */
     @Deprecated
     public <T> T getValue(String fieldName) {
-        return (T)values.get(fieldName);
+        return (T) values.get(fieldName);
     }
 
     /**
@@ -222,7 +223,7 @@ public abstract class AbstractContext<T extends IMessage> extends HashMap {
     }
 
     public static <R, C extends AbstractContext> List<IMessage> executeScript(IMessage channelMessage, C context,
-                                                                              List<? extends IBaseStreamOperator<IMessage, R, C>> scriptExpressions) {
+        List<? extends IBaseStreamOperator<IMessage, R, C>> scriptExpressions) {
         List<IMessage> messages = new ArrayList<>();
         if (scriptExpressions == null) {
             return messages;
@@ -356,7 +357,7 @@ public abstract class AbstractContext<T extends IMessage> extends HashMap {
         context.setSplitModel(this.isSplitModel());
         List<T> messages = new ArrayList<>();
         for (T tmp : this.getSplitMessages()) {
-            messages.add(tmp.copy());
+            messages.add(tmp.deepCopy());
         }
         context.setSplitMessages(messages);
         context.monitor = this.monitor;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
index 13f1312..244ed3b 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java
@@ -17,13 +17,18 @@
 package org.apache.rocketmq.streams.common.topology;
 
 import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.streams.common.cache.compress.impl.LongValueKV;
 import org.apache.rocketmq.streams.common.channel.IChannel;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.component.ComponentCreator;
 import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
 import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
 import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
@@ -48,6 +53,12 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
 
     private static final long serialVersionUID = -5189371682717444347L;
 
+    private final transient int duplicateCacheSize = 1000000;
+    private transient LongValueKV duplicateCache;
+    //    private transient Map<String, Long> duplicateCache;
+    private transient List<String> duplicateFields;
+    private transient int duplicateCacheExpirationTime;
+
     /**
      * 是否自动启动channel
      */
@@ -91,6 +102,7 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
     /**
      * 启动一个channel,并给channel应用pipeline
      */
+
     public void startChannel() {
         final String monitorName = createPipelineMonitorName();
         if (isInitSuccess()) {
@@ -104,8 +116,9 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
                 pipelineMonitorForChannel = IMonitor.createMonitor(this);
             }
             try {
-                source.start((IStreamOperator<T, T>)(message, context) -> {
+                source.start((IStreamOperator<T, T>) (message, context) -> {
                     //每条消息一个,监控整个链路
+
                     IMonitor pipelineMonitorForStage = context.startMonitor(monitorName);
                     pipelineMonitorForStage.setType(IMonitor.TYPE_DATAPROCESS);
                     message.getHeader().setPiplineName(this.getConfigureName());
@@ -127,10 +140,21 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
 
     }
 
+    private String createDuplicateKey(IMessage message) {
+        List<String> duplicateValues = Lists.newArrayList();
+        for (String field : duplicateFields) {
+            duplicateValues.add(message.getMessageBody().getString(field));
+        }
+        return StringUtil.createMD5Str(String.join("", duplicateValues));
+    }
+
     private String createPipelineMonitorName() {
         return MapKeyUtil.createKeyBySign(".", getType(), getNameSpace(), getConfigureName());
     }
 
+    private static AtomicInteger total = new AtomicInteger(0);
+    private static AtomicInteger hitCache = new AtomicInteger(0);
+
     /**
      * 可以替换某个阶段的阶段,而不用配置的阶段
      *
@@ -141,6 +165,26 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
      */
     @Override
     protected T doMessageInner(T t, AbstractContext context, AbstractStage... replaceStage) {
+        if (this.duplicateCache != null && this.duplicateFields != null && !this.duplicateFields.isEmpty() && !t.getHeader().isSystemMessage()) {
+            total.incrementAndGet();
+            String duplicateKey = createDuplicateKey(t);
+            Long cacheTime = this.duplicateCache.get(duplicateKey);
+            Long currentTime = System.currentTimeMillis();
+            if (cacheTime != null && currentTime - cacheTime < this.duplicateCacheExpirationTime) {
+                hitCache.incrementAndGet();
+                context.breakExecute();
+                return t;
+            } else {
+                this.duplicateCache.put(duplicateKey, currentTime);
+                if (this.duplicateCache.getSize() > duplicateCacheSize) {
+                    this.duplicateCache = new LongValueKV(this.duplicateCacheSize);
+                }
+            }
+            if (total.get() % 5000 == 0) {
+                System.out.printf("total: %s, hit: %s%n", total.get(), hitCache.get());
+            }
+        }
+
         if (!t.getHeader().isSystemMessage()) {
             MessageGloableTrace.joinMessage(t);//关联全局监控器
         }
@@ -164,7 +208,8 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
         return isTopology(this.channelNextStageLabel);
     }
 
-    public void doNextStages(AbstractContext context, String msgPrewSourceName, List<String> nextStageLabel, String prewSQLNodeName, AbstractStage... replaceStage) {
+    public void doNextStages(AbstractContext context, String msgPrewSourceName, List<String> nextStageLabel,
+        String prewSQLNodeName, AbstractStage... replaceStage) {
 
         if (!isTopology(nextStageLabel)) {
             return;
@@ -178,7 +223,7 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
             if (size > 1) {
                 copyContext = context.copy();
             }
-            T msg = (T)copyContext.getMessage();
+            T msg = (T) copyContext.getMessage();
             AbstractStage oriStage = stageMap.get(lable);
             if (oriStage == null) {
                 if (stages != null && stages.size() > 0) {
@@ -224,7 +269,7 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
                 continue;
             } else {
                 if (ChainStage.class.isInstance(stage)) {
-                    ChainStage chainStage = (ChainStage)stage;
+                    ChainStage chainStage = (ChainStage) stage;
                     String msgSourceName = chainStage.getMsgSourceName();
                     if (StringUtil.isNotEmpty(msgSourceName)) {
                         msgPrewSourceName = msgSourceName;
@@ -286,7 +331,7 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
         JSONObject jsonObject = null;
         if (stage instanceof ChainStage) {
             jsonObject = new JSONObject();
-            ChainStage chainStage = (ChainStage)stage;
+            ChainStage chainStage = (ChainStage) stage;
             return chainStage.toJsonObject();
             //String entityName = chainStage.getEntityName();
             ////todo 需要改写
@@ -324,18 +369,15 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
     public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
         for (AbstractStage stage : getStages()) {
             stage.setPipeline(this);
-            if (IAfterConfigurableRefreshListener.class.isInstance(stage)) {
-                if (AbstractConfigurable.class.isInstance(stage)) {
-                    AbstractConfigurable abstractConfigurable = (AbstractConfigurable)stage;
-                    if (abstractConfigurable.isInitSuccess() == false && this.isInitSuccess() == false) {
-                        this.setInitSuccess(false);
-                        return;
-                    }
+            if (stage instanceof IAfterConfigurableRefreshListener) {
+                if (!stage.isInitSuccess() && !this.isInitSuccess()) {
+                    this.setInitSuccess(false);
+                    return;
                 }
-                IAfterConfigurableRefreshListener afterConfiguableRefreshListerner =
-                    (IAfterConfigurableRefreshListener)stage;
+                IAfterConfigurableRefreshListener afterConfigurableRefreshListener =
+                    (IAfterConfigurableRefreshListener) stage;
 
-                afterConfiguableRefreshListerner.doProcessAfterRefreshConfigurable(configurableService);
+                afterConfigurableRefreshListener.doProcessAfterRefreshConfigurable(configurableService);
 
             }
         }
@@ -349,9 +391,9 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
             startChannel();
         }
         this.source = source;
-        if (AbstractConfigurable.class.isInstance(source)) {
-            AbstractConfigurable abstractConfigurable = (AbstractConfigurable)source;
-            if (abstractConfigurable.isInitSuccess() == false && this.isInitSuccess()) {
+        if (source instanceof AbstractConfigurable) {
+            AbstractConfigurable abstractConfigurable = (AbstractConfigurable) source;
+            if (!abstractConfigurable.isInitSuccess() && this.isInitSuccess()) {
                 this.setInitSuccess(false);
                 return;
             }
@@ -361,6 +403,23 @@ public class ChainPipeline<T extends IMessage> extends Pipeline<T> implements IA
         if ((isAutoStart || isPublish()) && isInitSuccess()) {
             startChannel();
         }
+
+        //增加去重的逻辑
+        String duplicateFieldNameStr = ComponentCreator.getProperties().getProperty(getConfigureName() + ".duplicate.fields.names");
+        if (duplicateFieldNameStr != null && !duplicateFieldNameStr.isEmpty()) {
+            this.duplicateFields = Lists.newArrayList();
+            this.duplicateFields.addAll(Arrays.asList(duplicateFieldNameStr.split(";")));
+        }
+        if (this.duplicateCache == null && this.duplicateFields != null) {
+            this.duplicateCache = new LongValueKV(this.duplicateCacheSize);
+        }
+        String duplicateCacheExpirationStr = ComponentCreator.getProperties().getProperty(getConfigureName() + ".duplicate.expiration.time");
+        if (duplicateCacheExpirationStr != null && !duplicateCacheExpirationStr.isEmpty()) {
+            this.duplicateCacheExpirationTime = Integer.parseInt(duplicateCacheExpirationStr);
+        } else {
+            this.duplicateCacheExpirationTime = 86400000;
+        }
+
     }
 
     public Map<String, AbstractStage> createStageMap() {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
index 4dff54f..5cbac0f 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
@@ -80,14 +80,14 @@ public class OutputChainStage<T extends IMessage> extends ChainStage<T> implemen
             /**
              * 主要是输出可能影响线上数据,可以通过配置文件的开关,把所有的输出,都指定到一个其他输出中
              */
-            if(openMockChannel()){
-                if(mockSink!=null){
+            if (openMockChannel()) {
+                if (mockSink != null) {
                     mockSink.batchAdd(message.deepCopy());
                     return message;
                 }
                 return message;
             }
-            sink.batchAdd(message);
+            sink.batchAdd(message.deepCopy());
 
             return message;
         }
@@ -100,23 +100,23 @@ public class OutputChainStage<T extends IMessage> extends ChainStage<T> implemen
 
     @Override
     public void checkpoint(IMessage message, AbstractContext context, CheckPointMessage checkPointMessage) {
-        ISink realSink=null;
-        if(openMockChannel()&&mockSink!=null){
-            realSink=mockSink;
-        }else {
-            realSink=sink;
+        ISink realSink = null;
+        if (openMockChannel() && mockSink != null) {
+            realSink = mockSink;
+        } else {
+            realSink = sink;
         }
-        if(message.getHeader().isNeedFlush()){
-            Set<String> queueIds=new HashSet<>();
-            if(message.getHeader().getCheckpointQueueIds()!=null){
+        if (message.getHeader().isNeedFlush()) {
+            Set<String> queueIds = new HashSet<>();
+            if (message.getHeader().getCheckpointQueueIds() != null) {
                 queueIds.addAll(message.getHeader().getCheckpointQueueIds());
             }
-            if(StringUtil.isNotEmpty(message.getHeader().getQueueId())){
+            if (StringUtil.isNotEmpty(message.getHeader().getQueueId())) {
                 queueIds.add(message.getHeader().getQueueId());
             }
             realSink.checkpoint(queueIds);
         }
-        CheckPointState checkPointState=  new CheckPointState();
+        CheckPointState checkPointState = new CheckPointState();
         checkPointState.setQueueIdAndOffset(realSink.getFinishedQueueIdAndOffsets(checkPointMessage));
         checkPointMessage.reply(checkPointState);
 
@@ -182,33 +182,33 @@ public class OutputChainStage<T extends IMessage> extends ChainStage<T> implemen
 
     @Override
     public void doProcessAfterRefreshConfigurable(IConfigurableService configurableService) {
-        sink=configurableService.queryConfigurable(ISink.TYPE, sinkName);
-        if(sink==null){
+        sink = configurableService.queryConfigurable(ISink.TYPE, sinkName);
+        if (sink == null) {
             sink = configurableService.queryConfigurable(IChannel.TYPE, sinkName);
         }
 
         metaData = configurableService.queryConfigurable(MetaData.TYPE, metaDataName);
-        mockSink=getMockChannel(configurableService,sink.getNameSpace());
+        mockSink = getMockChannel(configurableService, sink.getNameSpace());
     }
 
-    private ISink getMockChannel(IConfigurableService configurableService,String nameSpace) {
-        String type=ComponentCreator.getProperties().getProperty("out.mock.type");
-        if(type==null){
+    private ISink getMockChannel(IConfigurableService configurableService, String nameSpace) {
+        String type = ComponentCreator.getProperties().getProperty("out.mock.type");
+        if (type == null) {
             return null;
         }
-        ISink mockSink= configurableService.queryConfigurable(ISink.TYPE,OUT_MOCK_SWITCH+"_"+type);
-        if(mockSink==null){
-            mockSink= configurableService.queryConfigurable(IChannel.TYPE,OUT_MOCK_SWITCH+"_"+type);
+        ISink mockSink = configurableService.queryConfigurable(ISink.TYPE, OUT_MOCK_SWITCH + "_" + type);
+        if (mockSink == null) {
+            mockSink = configurableService.queryConfigurable(IChannel.TYPE, OUT_MOCK_SWITCH + "_" + type);
         }
         return mockSink;
     }
 
-    protected boolean openMockChannel(){
-        String swtich=ComponentCreator.getProperties().getProperty(OUT_MOCK_SWITCH);
-        if(swtich==null){
+    protected boolean openMockChannel() {
+        String swtich = ComponentCreator.getProperties().getProperty(OUT_MOCK_SWITCH);
+        if (swtich == null) {
             return false;
         }
-        if("true".equals(swtich)){
+        if ("true".equals(swtich)) {
             return true;
         }
         return false;
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
index 9f8f46d..fe8a3bb 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/utils/SQLUtil.java
@@ -34,7 +34,8 @@ public class SQLUtil {
     private static final String INSERT_IGNORE = "INSERT IGNORE INTO";
     private static final String REPLACE = "REPLACE INTO";
 
-    public static String createReplacesInsertSql(MetaData metaData, Map<String, Object> fieldName2Value, Boolean containsIdField) {
+    public static String createReplacesInsertSql(MetaData metaData, Map<String, Object> fieldName2Value,
+        Boolean containsIdField) {
         String insertSQL = createInsertSql(metaData, fieldName2Value, containsIdField);
         insertSQL = insertSQL.replaceFirst(INSERT, REPLACE);
         return insertSQL;
@@ -60,7 +61,8 @@ public class SQLUtil {
         return stringBuilder.toString();
     }
 
-    public static String createIgnoreInsertSql(MetaData metaData, Map<String, Object> fieldName2Value, Boolean containsIdField) {
+    public static String createIgnoreInsertSql(MetaData metaData, Map<String, Object> fieldName2Value,
+        Boolean containsIdField) {
         String insertSQL = createInsertSql(metaData, fieldName2Value, containsIdField);
         insertSQL = insertSQL.replaceFirst(INSERT, INSERT_IGNORE);
         return insertSQL;
@@ -70,7 +72,8 @@ public class SQLUtil {
         return createInsertSql(metaData, fieldName2Value, null);
     }
 
-    public static String createInsertSql(MetaData metaData, Map<String, Object> fieldName2Value, Boolean containsIdField) {
+    public static String createInsertSql(MetaData metaData, Map<String, Object> fieldName2Value,
+        Boolean containsIdField) {
 
         StringBuilder sql = new StringBuilder(INSERT + " " + metaData.getTableName() + "(");
         StringBuilder fieldSql = new StringBuilder();
@@ -106,7 +109,8 @@ public class SQLUtil {
         return stringBuilder.toString();
     }
 
-    protected static String createInsertValuesSQL(MetaData metaData, Map<String, Object> fieldName2Value, StringBuilder fieldSql, StringBuilder valueSql) {
+    protected static String createInsertValuesSQL(MetaData metaData, Map<String, Object> fieldName2Value,
+        StringBuilder fieldSql, StringBuilder valueSql) {
         boolean isIncrement = true;
         if (fieldName2Value.containsKey(metaData.getIdFieldName())) {
             isIncrement = false;
@@ -114,7 +118,8 @@ public class SQLUtil {
         return createInsertValuesSQL(metaData, fieldName2Value, fieldSql, valueSql, isIncrement);
     }
 
-    protected static String createInsertValuesSQL(MetaData metaData, Map<String, Object> fieldName2Value, StringBuilder fieldSql, StringBuilder valueSql, boolean containsIdField) {
+    protected static String createInsertValuesSQL(MetaData metaData, Map<String, Object> fieldName2Value,
+        StringBuilder fieldSql, StringBuilder valueSql, boolean containsIdField) {
         boolean isFirst = true;
         valueSql.append("(");
         //if (fieldName2Value.containsKey(metaData.getIdFieldName())) {
@@ -172,16 +177,16 @@ public class SQLUtil {
             // }
             String result = null;
             if (DataTypeUtil.isDate(field.getDataType().getDataClass())) {
-                result = DateUtil.format((Date)value);
+                result = DateUtil.format((Date) value);
             } else if (JSONObject.class.isInstance(value)) {
-                result = ((JSONObject)value).toJSONString();
+                result = ((JSONObject) value).toJSONString();
             } else {
                 result = value.toString();
             }
             return "'" + handleSpecialCharInSql(result) + "'";
         } else {
             if (DataTypeUtil.isBoolean(field.getDataType().getDataClass())) {
-                boolean boolValue = (Boolean)value;
+                boolean boolValue = (Boolean) value;
                 return boolValue ? "1" : "0";
             }
             return value + "";
@@ -261,12 +266,12 @@ public class SQLUtil {
         if (object == null) {
             return ibatisSQL;
         }
-        if (object == null || StringUtil.isEmpty(ibatisSQL)) {
+        if (StringUtil.isEmpty(ibatisSQL)) {
             return null;
         }
 
         List<String> vars = parseIbatisSQLVars(ibatisSQL);
-        if (vars == null || vars.size() == 0) {
+        if (vars.size() == 0) {
             return ibatisSQL;
         }
         String sql = ibatisSQL;
@@ -274,19 +279,22 @@ public class SQLUtil {
             Object value = getBeanFieldValue(object, varName);
             String valueSQL = null;
 
-            if (value != null & !String.class.isInstance(value) && !Date.class.isInstance(value)) {
+            if (value != null & !(value instanceof String) && !Date.class.isInstance(value)) {
                 valueSQL = value.toString();
             }
-            if (value != null && String.class.isInstance(value)) {
+            if (value instanceof String) {
                 value = value.toString().replace("'", "''");
+                if (value.toString().contains("\\")) {
+                    value = value.toString().replaceAll("\\\\", "\\\\\\\\");
+                }
                 if (containsQuotation && (ibatisSQL.indexOf("'#{" + varName + "}'") > -1 || ibatisSQL.indexOf("`#{" + varName + "}`") > -1)) {
                     valueSQL = value + "";
                 } else {
                     valueSQL = "'" + value + "'";
                 }
             }
-            if (value != null && Date.class.isInstance(value)) {
-                String valueDate = DateUtil.format((Date)value);
+            if (value instanceof Date) {
+                String valueDate = DateUtil.format((Date) value);
                 if (containsQuotation && ibatisSQL.indexOf("'#{" + varName + "}'") > -1) {
                     valueSQL = valueDate;
                 } else {
@@ -314,20 +322,20 @@ public class SQLUtil {
     }
 
     protected static Object getBeanFieldValue(Object object, String varName) {
-        if (JSONObject.class.isInstance(object)) {
-            JSONObject jsonObject = (JSONObject)object;
+        if (object instanceof JSONObject) {
+            JSONObject jsonObject = (JSONObject) object;
             return jsonObject.get(varName);
-        } else if (Map.class.isInstance(object)) {
-            Map<String, Object> paras = (Map)object;
+        } else if (object instanceof Map) {
+            Map<String, Object> paras = (Map) object;
             return paras.get(varName);
         } else {
 
-            if (IConfigurable.class.isInstance(object) && varName.equals(IConfigurable.JSON_PROPERTY)) {
-                IConfigurable configurable = (IConfigurable)object;
+            if (object instanceof IConfigurable && varName.equals(IConfigurable.JSON_PROPERTY)) {
+                IConfigurable configurable = (IConfigurable) object;
                 return configurable.toJson();
             }
-            if (BasedConfigurable.class.isInstance(object) && varName.equals(IConfigurable.STATUS_PROPERTY)) {
-                BasedConfigurable basedConfigurable = (BasedConfigurable)object;
+            if (object instanceof BasedConfigurable && varName.equals(IConfigurable.STATUS_PROPERTY)) {
+                BasedConfigurable basedConfigurable = (BasedConfigurable) object;
                 return basedConfigurable.getStatus();
             }
             return ReflectUtil.getBeanFieldValue(object, varName);
@@ -406,7 +414,7 @@ public class SQLUtil {
                 if (!isString) {
                     stringBuilder.append(value);
                 } else {
-                    stringBuilder.append("'" + value + "'");
+                    StringBuilder append = stringBuilder.append("'" + value + "'");
                 }
 
             }
diff --git a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
index 896e621..25c5bee 100644
--- a/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
+++ b/rocketmq-streams-configurable/src/main/java/org/apache/rocketmq/streams/configurable/service/AbstractConfigurableService.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.streams.configurable.service;
 
 import com.alibaba.fastjson.JSONObject;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.Properties;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.component.AbstractComponent;
@@ -133,9 +135,7 @@ public abstract class AbstractConfigurableService implements IConfigurableServic
         if (configures != null && configures.isQuerySuccess() && configures.getConfigurables() != null) {
             // List<Configure> configureList = filterConfigure(configures.getConfigure());
             List<IConfigurable> configurables = configures.getConfigurables();
-            List<IConfigurable> configurableList = checkAndUpdateConfigurables(namespace, configurables,
-                tempType2ConfigurableMap, tempName2ConfigurableMap,
-                configures.getConfigurables());
+            List<IConfigurable> configurableList = checkAndUpdateConfigurables(namespace, configurables, tempType2ConfigurableMap, tempName2ConfigurableMap, configures.getConfigurables());
             // this.namespace2ConfigurableMap = namespace2ConfigurableMap;
             for (IConfigurable configurable : configurableList) {
                 if (configurable instanceof IAfterConfigurableRefreshListener) {
diff --git a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
index 9080479..e51e94f 100644
--- a/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
+++ b/rocketmq-streams-filter/src/main/java/org/apache/rocketmq/streams/filter/context/RuleContext.java
@@ -317,7 +317,7 @@ public class RuleContext extends AbstractContext<Message> implements Serializabl
 
     @Override
     public AbstractContext copy() {
-        IMessage message = this.message.copy();
+        IMessage message = this.message.deepCopy();
         RuleContext context = new RuleContext(nameSpace, message.getMessageBody(), rule, contextConfigure);
         super.copyProperty(context);
         context.actionExecutor = actionExecutor;
diff --git a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
index bd7ef82..c5a0c27 100644
--- a/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
+++ b/rocketmq-streams-lease/src/main/java/org/apache/rocketmq/streams/lease/service/storages/DBLeaseStorage.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.utils.DateUtil;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
index 2f2ccf1..3255a69 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/context/FunctionContext.java
@@ -48,7 +48,7 @@ public class FunctionContext<T extends IMessage>
 
     @Override
     public AbstractContext copy() {
-        IMessage message = this.message.copy();
+        IMessage message = this.message.deepCopy();
         FunctionContext context = new FunctionContext(message);
         super.copyProperty(context);
         context.setFunctionService(this.functionService);
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/FieldFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/FieldFunction.java
index 2541a82..6d375be 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/FieldFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/field/FieldFunction.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.streams.script.function.impl.field;
 
+import com.google.common.collect.Lists;
+import java.util.List;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.script.annotation.Function;
@@ -29,17 +31,17 @@ public class FieldFunction {
 
     @FunctionMethod(value = "field", alias = "get", comment = "获取字段值")
     public <T> T getFieldValue(IMessage message, FunctionContext context,
-                               @FunctionParamter(value = "string", comment = "字段的名称,不需要引号") String fieldName) {
+        @FunctionParamter(value = "string", comment = "字段的名称,不需要引号") String fieldName) {
         String name = FunctionUtils.getValueString(message, context, fieldName);
         if (StringUtil.isEmpty(name)) {
             name = fieldName;
         }
-        return (T)message.getMessageBody().get(name);
+        return (T) message.getMessageBody().get(name);
     }
 
     @FunctionMethod(value = "char_length", alias = "len", comment = "求字段代码字符串或常量的长度")
     public int len(IMessage message, FunctionContext context,
-                   @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String fieldName) {
+        @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String fieldName) {
         String value = FunctionUtils.getValueString(message, context, fieldName);
         if (StringUtil.isEmpty(value)) {
             return 0;
@@ -49,7 +51,7 @@ public class FieldFunction {
 
     @FunctionMethod(value = "lower", alias = "low", comment = "把字符串转换称小写")
     public String lower(IMessage message, FunctionContext context,
-                        @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String fieldName) {
+        @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String fieldName) {
         String value = FunctionUtils.getValueString(message, context, fieldName);
         if (StringUtil.isEmpty(value)) {
             return null;
@@ -59,7 +61,7 @@ public class FieldFunction {
 
     @FunctionMethod(value = "concat", comment = "连接字符串")
     public String concat(IMessage message, FunctionContext context,
-                         @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String... fieldNames) {
+        @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String... fieldNames) {
         StringBuilder sb = new StringBuilder();
         for (String fieldName : fieldNames) {
             String value = FunctionUtils.getValueString(message, context, fieldName);
@@ -70,28 +72,30 @@ public class FieldFunction {
 
     @FunctionMethod(value = "concat_ws", alias = "concat_sign", comment = "通过分隔符把字符串拼接在一起")
     public String concat_ws(IMessage message, FunctionContext context,
-                            @FunctionParamter(value = "string", comment = "代表分隔符的字段名或常量") String sign,
-                            @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String... fieldNames) {
-        StringBuilder sb = new StringBuilder();
-        boolean isFirst = true;
+        @FunctionParamter(value = "string", comment = "代表分隔符的字段名或常量") String sign,
+        @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String... fieldNames) {
         sign = FunctionUtils.getValueString(message, context, sign);
+        if (sign == null) {
+            sign = ",";
+        }
+        List<String> values = Lists.newArrayList();
         for (String fieldName : fieldNames) {
-            if (isFirst) {
-                isFirst = false;
-            } else {
-                sb.append(sign);
-            }
             String value = FunctionUtils.getValueString(message, context, fieldName);
-            sb.append(value);
+            if (value != null) {
+                values.add(value);
+            }
         }
-        return sb.toString();
+        if (values.isEmpty()) {
+            return null;
+        }
+        return String.join(sign, values);
     }
 
     @FunctionMethod(value = "lpad", comment = "在原串左边补n个pad字符串,如果原串长度小于len,则截断,使得整个字符串长度为len")
     public String lpad(IMessage message, FunctionContext context,
-                       @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String ori,
-                       @FunctionParamter(value = "string", comment = "代表字符串长度字段名,数字或常量") String lenStr,
-                       @FunctionParamter(value = "string", comment = "代表补齐字符串的字段名或常量") String pad) {
+        @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String ori,
+        @FunctionParamter(value = "string", comment = "代表字符串长度字段名,数字或常量") String lenStr,
+        @FunctionParamter(value = "string", comment = "代表补齐字符串的字段名或常量") String pad) {
         if (StringUtil.isEmpty(ori) || pad == null) {
             return null;
         }
@@ -121,9 +125,9 @@ public class FieldFunction {
 
     @FunctionMethod(value = "rpad", comment = "在原串左边补n个pad字符串,如果原串长度小于len,则截断,使得整个字符串长度为len")
     public String rpad(IMessage message, FunctionContext context,
-                       @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String ori,
-                       @FunctionParamter(value = "string", comment = "代表字符串长度字段名,数字或常量") String lenStr,
-                       @FunctionParamter(value = "string", comment = "代表补齐字符串的字段名或常量") String pad) {
+        @FunctionParamter(value = "string", comment = "代表字符串的字段名或常量") String ori,
+        @FunctionParamter(value = "string", comment = "代表字符串长度字段名,数字或常量") String lenStr,
+        @FunctionParamter(value = "string", comment = "代表补齐字符串的字段名或常量") String pad) {
         if (StringUtil.isEmpty(ori) || pad == null) {
             return null;
         }
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
index f076696..85f5727 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/flatmap/SplitArrayFunction.java
@@ -18,8 +18,10 @@ package org.apache.rocketmq.streams.script.function.impl.flatmap;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+
 import java.util.List;
 import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.rocketmq.streams.common.context.IMessage;
@@ -132,7 +134,7 @@ public class SplitArrayFunction {
         context.openSplitModel();
         for (int i = 0; i < values.length; i++) {
             String value = values[i];
-            if("null".equals(value.toLowerCase())){
+            if ("null".equalsIgnoreCase(value)) {
                 continue;
             }
             IMessage newMessage = channelMessage.deepCopy();
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/AbstractScriptProxy.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/AbstractScriptProxy.java
index 6bdfc5a..5071a02 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/AbstractScriptProxy.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/AbstractScriptProxy.java
@@ -29,27 +29,26 @@ import org.apache.rocketmq.streams.script.utils.FunctionUtils;
 
 public abstract class AbstractScriptProxy implements IScriptExpression {
     protected IScriptExpression origExpression;
+
     public AbstractScriptProxy(IScriptExpression origExpression) {
-        this.origExpression=origExpression;
+        this.origExpression = origExpression;
     }
 
     public abstract List<ICacheFilter> getCacheFilters();
 
+    public abstract boolean supportOptimization(IScriptExpression scriptExpression);
 
-
-    public abstract boolean supportOptimization(IScriptExpression scriptExpression) ;
     protected String getParameterValue(IScriptParamter scriptParamter) {
         if (!ScriptParameter.class.isInstance(scriptParamter)) {
             return null;
         }
-        ScriptParameter parameter = (ScriptParameter)scriptParamter;
+        ScriptParameter parameter = (ScriptParameter) scriptParamter;
         if (parameter.getRigthVarName() != null) {
             return null;
         }
         return FunctionUtils.getConstant(parameter.getLeftVarName());
     }
 
-
     @Override public List<IScriptParamter> getScriptParamters() {
         return this.origExpression.getScriptParamters();
     }
@@ -63,7 +62,7 @@ public abstract class AbstractScriptProxy implements IScriptExpression {
     }
 
     @Override public Object getScriptParamter(IMessage message, FunctionContext context) {
-        return this.origExpression.getScriptParamter(message,context);
+        return this.origExpression.getScriptParamter(message, context);
     }
 
     public void setOrigExpression(IScriptExpression origExpression) {
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/CaseScriptExpressionProxy.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/CaseScriptExpressionProxy.java
index 6f6140c..4b507c6 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/CaseScriptExpressionProxy.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/CaseScriptExpressionProxy.java
@@ -31,39 +31,40 @@ public class CaseScriptExpressionProxy extends AbstractScriptProxy {
     }
 
     @Override public List<ICacheFilter> getCacheFilters() {
-        List<ICacheFilter> result=new ArrayList<>();
-        GroupScriptExpression groupScriptExpression=(GroupScriptExpression)this.origExpression;
-        recursion(groupScriptExpression,result);
+        List<ICacheFilter> result = new ArrayList<>();
+        GroupScriptExpression groupScriptExpression = (GroupScriptExpression) this.origExpression;
+        recursion(groupScriptExpression, result);
         return result;
     }
 
     /**
      * recursion else if GroupScriptExpression list
+     *
      * @param groupScriptExpression
      * @param cacheFilters
      */
-    protected void recursion(GroupScriptExpression groupScriptExpression,List<ICacheFilter> cacheFilters){
-        IScriptExpression scriptExpression= groupScriptExpression.getIfExpresssion();
-        AbstractScriptProxy abstractExpressionProxy= ScriptProxyFactory.getInstance().create(scriptExpression);
-        if(abstractExpressionProxy!=null){
+    protected void recursion(GroupScriptExpression groupScriptExpression, List<ICacheFilter> cacheFilters) {
+        IScriptExpression scriptExpression = groupScriptExpression.getIfExpresssion();
+        AbstractScriptProxy abstractExpressionProxy = ScriptProxyFactory.getInstance().create(scriptExpression);
+        if (abstractExpressionProxy != null) {
             groupScriptExpression.setIfExpresssion(abstractExpressionProxy);
             cacheFilters.addAll(abstractExpressionProxy.getCacheFilters());
         }
-        if(groupScriptExpression.getElseIfExpressions()!=null){
-            for(GroupScriptExpression expression:groupScriptExpression.getElseIfExpressions()){
-                recursion(expression,cacheFilters);
+        if (groupScriptExpression.getElseIfExpressions() != null) {
+            for (GroupScriptExpression expression : groupScriptExpression.getElseIfExpressions()) {
+                recursion(expression, cacheFilters);
             }
         }
     }
 
     @Override public boolean supportOptimization(IScriptExpression scriptExpression) {
-         if(scriptExpression instanceof GroupScriptExpression){
-             return true;
-         }
-         return false;
+        if (scriptExpression instanceof GroupScriptExpression) {
+            return true;
+        }
+        return false;
     }
 
     @Override public Object executeExpression(IMessage message, FunctionContext context) {
-        return this.origExpression.executeExpression(message,context);
+        return this.origExpression.executeExpression(message, context);
     }
 }
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/EqualsProxy.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/EqualsProxy.java
index ba1d06d..5f9aca9 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/EqualsProxy.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/EqualsProxy.java
@@ -17,10 +17,7 @@
 
 package org.apache.rocketmq.streams.script.optimization.performance;
 
-import org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilter;
-import org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilterBulider;
 import org.apache.rocketmq.streams.script.function.impl.condition.EqualsFunction;
-import org.apache.rocketmq.streams.script.function.impl.string.RegexFunction;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.IScriptParamter;
 
@@ -40,7 +37,7 @@ public class EqualsProxy extends SimpleScriptExpressionProxy {
     }
 
     @Override protected String getVarName() {
-       return getParameterValue((IScriptParamter)this.origExpression.getScriptParamters().get(0));
+        return getParameterValue((IScriptParamter) this.origExpression.getScriptParamters().get(0));
     }
 
 }
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/RegexProxy.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/RegexProxy.java
index d3cc1b2..30c214a 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/RegexProxy.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/RegexProxy.java
@@ -17,14 +17,11 @@
 
 package org.apache.rocketmq.streams.script.optimization.performance;
 
-import org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilter;
-import org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilterBulider;
-import org.apache.rocketmq.streams.script.function.impl.condition.EqualsFunction;
 import org.apache.rocketmq.streams.script.function.impl.string.RegexFunction;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.IScriptParamter;
 
-public class RegexProxy extends SimpleScriptExpressionProxy  {
+public class RegexProxy extends SimpleScriptExpressionProxy {
     public RegexProxy(IScriptExpression origExpression) {
         super(origExpression);
     }
@@ -40,8 +37,7 @@ public class RegexProxy extends SimpleScriptExpressionProxy  {
     }
 
     @Override protected String getVarName() {
-       return getParameterValue((IScriptParamter)this.origExpression.getScriptParamters().get(0));
+        return getParameterValue((IScriptParamter) this.origExpression.getScriptParamters().get(0));
     }
 
-
 }
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptExpressionGroupsProxy.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptExpressionGroupsProxy.java
index 220350c..5433e60 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptExpressionGroupsProxy.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptExpressionGroupsProxy.java
@@ -30,28 +30,31 @@ import org.apache.rocketmq.streams.script.service.IScriptExpression;
 import org.apache.rocketmq.streams.script.service.IScriptParamter;
 
 public class ScriptExpressionGroupsProxy extends CacheFilterManager implements IScriptExpression {
-    protected List<IScriptExpression> scriptExpressions=new ArrayList<>();
+    protected List<IScriptExpression> scriptExpressions = new ArrayList<>();
 
     public ScriptExpressionGroupsProxy(int elementCount, int capacity) {
         super(elementCount, capacity);
     }
+
     public void removeLessCount() {
-        Map<String, CacheFilterGroup> newFilterOptimizationMap=new HashMap<>();
-        for(String varName:this.filterOptimizationMap.keySet()){
-            CacheFilterGroup cacheFilterGroup =this.filterOptimizationMap.get(varName);
-            if(cacheFilterGroup.getSize()>5){
-                newFilterOptimizationMap.put(varName,cacheFilterGroup);
+        Map<String, CacheFilterGroup> newFilterOptimizationMap = new HashMap<>();
+        for (String varName : this.filterOptimizationMap.keySet()) {
+            CacheFilterGroup cacheFilterGroup = this.filterOptimizationMap.get(varName);
+            if (cacheFilterGroup.getSize() > 5) {
+                newFilterOptimizationMap.put(varName, cacheFilterGroup);
             }
         }
-        this.filterOptimizationMap=newFilterOptimizationMap;
+        this.filterOptimizationMap = newFilterOptimizationMap;
     }
-    public void addScriptExpression(IScriptExpression scriptExpression){
+
+    public void addScriptExpression(IScriptExpression scriptExpression) {
         this.scriptExpressions.add(scriptExpression);
     }
+
     @Override public Object executeExpression(IMessage message, FunctionContext context) {
-        this.execute(message,context);
-        for(IScriptExpression scriptExpression:scriptExpressions){
-            scriptExpression.executeExpression(message,context);
+        this.execute(message, context);
+        for (IScriptExpression scriptExpression : scriptExpressions) {
+            scriptExpression.executeExpression(message, context);
         }
         return null;
     }
@@ -84,5 +87,4 @@ public class ScriptExpressionGroupsProxy extends CacheFilterManager implements I
         return null;
     }
 
-
 }
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptOptimization.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptOptimization.java
index ae147ee..9d43e2b 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptOptimization.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptOptimization.java
@@ -16,41 +16,33 @@
  */
 package org.apache.rocketmq.streams.script.optimization.performance;
 
+import org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilter;
+import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
+import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
+import org.apache.rocketmq.streams.script.service.IScriptExpression;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rocketmq.streams.common.context.IMessage;
-import org.apache.rocketmq.streams.common.optimization.HyperscanRegex;
-import org.apache.rocketmq.streams.common.optimization.cachefilter.ICacheFilter;
-import org.apache.rocketmq.streams.script.context.FunctionContext;
-import org.apache.rocketmq.streams.script.operator.expression.GroupScriptExpression;
-import org.apache.rocketmq.streams.script.operator.expression.ScriptExpression;
-import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
-import org.apache.rocketmq.streams.script.service.IScriptExpression;
 
 public class ScriptOptimization {
     protected String name;//function script namespace,name
 
-
     /**
      * optimizate expression
      */
-    protected ScriptExpressionGroupsProxy scriptExpressionGroupsProxy =new ScriptExpressionGroupsProxy(160,1000000);
+    protected ScriptExpressionGroupsProxy scriptExpressionGroupsProxy = new ScriptExpressionGroupsProxy(160, 1000000);
 
     //the optimizated script
     protected List<IScriptExpression> scriptExpressions;
 
-
     //newFieldName created in the script
     protected Map<String, IScriptExpression> newFieldName2Expressions = new HashMap<>();
 
-
     /**
      * Optimization once
      */
@@ -61,8 +53,8 @@ public class ScriptOptimization {
      *
      * @param scriptExpressions
      */
-    public ScriptOptimization(String name,List<IScriptExpression> scriptExpressions) {
-        this.name=name;
+    public ScriptOptimization(String name, List<IScriptExpression> scriptExpressions) {
+        this.name = name;
         this.scriptExpressions = scriptExpressions;
 
         /**
@@ -91,8 +83,6 @@ public class ScriptOptimization {
         return false;
     }
 
-
-
     /**
      * 把表达式拆成3段,创建变量的,正则类,其他。正则类用HyperscanRegex做优化
      */
@@ -100,7 +90,7 @@ public class ScriptOptimization {
         if (!startOptimization.compareAndSet(false, true)) {
             return this.scriptExpressions;
         }
-        Set<String> newVarNames=new HashSet<>();
+        Set<String> newVarNames = new HashSet<>();
         List<IScriptExpression> allScriptExpressions = new ArrayList<>();//最终输出的表达式列表
         List<IScriptExpression> proxyExpressions = new ArrayList<>();//最后执行的脚本,在执行完正则后执行的部分
         List<IScriptExpression> lastExpressions = new ArrayList<>();//最后执行的脚本,在执行完正则后执行的部分
@@ -108,46 +98,39 @@ public class ScriptOptimization {
         for (IScriptExpression scriptExpression : scriptExpressions) {
 
             Set<String> newFieldNames = scriptExpression.getNewFieldNames();
-            if (newFieldNames != null&&newFieldNames.size() > 0) {
+            if (newFieldNames != null && newFieldNames.size() > 0) {
                 String newFieldName = newFieldNames.iterator().next();
                 newVarNames.add(newFieldName);
             }
 
-
-
-
-
-            IScriptExpression scriptExpressionProxy = createProxy(scriptExpression,newVarNames);
+            IScriptExpression scriptExpressionProxy = createProxy(scriptExpression, newVarNames);
             String functionName = scriptExpressionProxy.getFunctionName();
-            if(scriptExpressionProxy instanceof AbstractScriptProxy){
+            if (scriptExpressionProxy instanceof AbstractScriptProxy) {
                 proxyExpressions.add(scriptExpressionProxy);
-            }else if("trim".equals(functionName) || "lower".equals(functionName) || "concat".equals(functionName)){
+            } else if ("trim".equals(functionName) || "lower".equals(functionName) || "concat".equals(functionName)) {
                 mapExpressions.add(scriptExpressionProxy);
-            }else {
+            } else {
                 lastExpressions.add(scriptExpressionProxy);
             }
         }
         allScriptExpressions.addAll(mapExpressions);//把优先执行的表达式添加上
-        if(this.scriptExpressionGroupsProxy.scriptExpressions.size()>0){
+        if (this.scriptExpressionGroupsProxy.scriptExpressions.size() > 0) {
             allScriptExpressions.add(this.scriptExpressionGroupsProxy);
         }
         allScriptExpressions.addAll(lastExpressions);//把剩余的表达式增加到list中
 
-        this.scriptExpressions=allScriptExpressions;
+        this.scriptExpressions = allScriptExpressions;
         this.scriptExpressionGroupsProxy.removeLessCount();
         return this.scriptExpressions;
     }
 
-
-
-
     /**
      * 如果脚本中有较多的正则表达式,则统一注册到正则库,并行执行。
      *
      * @param scriptExpression
      * @return
      */
-    protected IScriptExpression createProxy(IScriptExpression scriptExpression,Set<String> newVarNames) {
+    protected IScriptExpression createProxy(IScriptExpression scriptExpression, Set<String> newVarNames) {
         AbstractScriptProxy scriptProxy = ScriptProxyFactory.getInstance().create(scriptExpression);
         if (scriptProxy == null) {
             return scriptExpression;
@@ -158,22 +141,21 @@ public class ScriptOptimization {
          * 如果依赖的字段是其他脚本产生的,则不做优化
          */
         for (String fieldName : dependentFields) {
-            if (newFieldName2Expressions.containsKey(fieldName)&&!newVarNames.contains(fieldName)) {
+            if (newFieldName2Expressions.containsKey(fieldName) && !newVarNames.contains(fieldName)) {
                 return scriptExpression;
             }
         }
 
         this.scriptExpressionGroupsProxy.addScriptExpression(scriptProxy);
-        List<ICacheFilter> cacheFilters=scriptProxy.getCacheFilters();
-        if(cacheFilters!=null){
-            for(ICacheFilter cacheFilter:cacheFilters){
-                this.scriptExpressionGroupsProxy.addOptimizationExpression(this.name,cacheFilter);
+        List<ICacheFilter> cacheFilters = scriptProxy.getCacheFilters();
+        if (cacheFilters != null) {
+            for (ICacheFilter cacheFilter : cacheFilters) {
+                this.scriptExpressionGroupsProxy.addOptimizationExpression(this.name, cacheFilter);
             }
         }
         return scriptProxy;
     }
 
-
     public static void main(String[] args) {
         String scriptValue = "source='netstat_ob';\n"
             + "____regex_10001=regex(std_cmdline,'^(((/?([a-zA-Z0-9_\\.\\-]+/){1,20})bin/)|/bin/|/|-)?"
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptProxyFactory.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptProxyFactory.java
index 514d6dd..5118c1d 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptProxyFactory.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/ScriptProxyFactory.java
@@ -26,33 +26,31 @@ import org.apache.rocketmq.streams.common.utils.ReflectUtil;
 import org.apache.rocketmq.streams.script.service.IScriptExpression;
 
 public class ScriptProxyFactory {
-    protected List<AbstractScriptProxy> expressionProxies=new ArrayList<>();
-    protected static ScriptProxyFactory expressionProxyFactory=new ScriptProxyFactory();
-    protected static AtomicBoolean isFinishScan=new AtomicBoolean(false);
-    protected AbstractScan scan=new AbstractScan() {
+    protected List<AbstractScriptProxy> expressionProxies = new ArrayList<>();
+    protected static ScriptProxyFactory expressionProxyFactory = new ScriptProxyFactory();
+    protected static AtomicBoolean isFinishScan = new AtomicBoolean(false);
+    protected AbstractScan scan = new AbstractScan() {
         @Override protected void doProcessor(Class clazz) {
-            if(AbstractScriptProxy.class.isAssignableFrom(clazz)&&!Modifier.isAbstract(clazz.getModifiers())){
-                AbstractScriptProxy abstractExpressionProxy=(AbstractScriptProxy)ReflectUtil.forInstance(clazz,new Class[]{IScriptExpression.class},new Object[]{null});
+            if (AbstractScriptProxy.class.isAssignableFrom(clazz) && !Modifier.isAbstract(clazz.getModifiers())) {
+                AbstractScriptProxy abstractExpressionProxy = (AbstractScriptProxy) ReflectUtil.forInstance(clazz, new Class[] {IScriptExpression.class}, new Object[] {null});
                 expressionProxies.add(abstractExpressionProxy);
             }
         }
     };
 
-
-
-    public static ScriptProxyFactory getInstance(){
-        if(isFinishScan.compareAndSet(false,true)){
+    public static ScriptProxyFactory getInstance() {
+        if (isFinishScan.compareAndSet(false, true)) {
             expressionProxyFactory.scan.scanPackages("org.apache.rocketmq.streams.script.optimization.performance");
             expressionProxyFactory.scan.scanPackages("org.apache.rocketmq.streams.filter.optimization");
         }
         return expressionProxyFactory;
     }
 
-    public AbstractScriptProxy create(IScriptExpression oriScriptExpression){
-        for(AbstractScriptProxy abstractExpressionProxy: expressionProxies){
+    public AbstractScriptProxy create(IScriptExpression oriScriptExpression) {
+        for (AbstractScriptProxy abstractExpressionProxy : expressionProxies) {
             abstractExpressionProxy.setOrigExpression(oriScriptExpression);
-            if(abstractExpressionProxy.supportOptimization(oriScriptExpression)){
-                return (AbstractScriptProxy)ReflectUtil.forInstance(abstractExpressionProxy.getClass(),new Class[]{IScriptExpression.class},new Object[]{oriScriptExpression});
+            if (abstractExpressionProxy.supportOptimization(oriScriptExpression)) {
+                return (AbstractScriptProxy) ReflectUtil.forInstance(abstractExpressionProxy.getClass(), new Class[] {IScriptExpression.class}, new Object[] {oriScriptExpression});
             }
         }
         return null;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/SimpleScriptExpressionProxy.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/SimpleScriptExpressionProxy.java
index 249b10a..84bf529 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/SimpleScriptExpressionProxy.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/optimization/performance/SimpleScriptExpressionProxy.java
@@ -31,21 +31,23 @@ public abstract class SimpleScriptExpressionProxy extends AbstractScriptProxy {
     public SimpleScriptExpressionProxy(IScriptExpression origExpression) {
         super(origExpression);
     }
-    protected List<ICacheFilter> optimizationExpressions=null;
+
+    protected List<ICacheFilter> optimizationExpressions = null;
+
     @Override
     public List<ICacheFilter> getCacheFilters() {
-        IScriptExpression scriptExpression=this.origExpression;
-        if(this.optimizationExpressions==null){
-            synchronized (this){
-                if(this.optimizationExpressions==null){
-                    List<ICacheFilter> optimizationExpressions=new ArrayList<>();
-                    optimizationExpressions.add(new AbstractCacheFilter(getVarName(),this.origExpression) {
+        IScriptExpression scriptExpression = this.origExpression;
+        if (this.optimizationExpressions == null) {
+            synchronized (this) {
+                if (this.optimizationExpressions == null) {
+                    List<ICacheFilter> optimizationExpressions = new ArrayList<>();
+                    optimizationExpressions.add(new AbstractCacheFilter(getVarName(), this.origExpression) {
                         @Override public boolean executeOrigExpression(IMessage message, AbstractContext context) {
                             FunctionContext functionContext = new FunctionContext(message);
                             if (context != null) {
                                 context.syncSubContext(functionContext);
                             }
-                            Boolean isMatch=(Boolean)scriptExpression.executeExpression(message,functionContext);
+                            Boolean isMatch = (Boolean) scriptExpression.executeExpression(message, functionContext);
 
                             if (context != null) {
                                 context.syncContext(functionContext);
@@ -53,7 +55,7 @@ public abstract class SimpleScriptExpressionProxy extends AbstractScriptProxy {
                             return isMatch;
                         }
                     });
-                    this.optimizationExpressions=optimizationExpressions;
+                    this.optimizationExpressions = optimizationExpressions;
                 }
             }
         }
@@ -61,15 +63,13 @@ public abstract class SimpleScriptExpressionProxy extends AbstractScriptProxy {
 
     }
 
-
     @Override public Object executeExpression(IMessage message, FunctionContext context) {
-        Boolean value= this.optimizationExpressions.get(0).execute(message,context);
-        if(this.origExpression.getNewFieldNames()!=null&&this.origExpression.getNewFieldNames().size()>0){
+        Boolean value = this.optimizationExpressions.get(0).execute(message, context);
+        if (this.origExpression.getNewFieldNames() != null && this.origExpression.getNewFieldNames().size() > 0) {
             message.getMessageBody().put(this.origExpression.getNewFieldNames().iterator().next(), value);
         }
         return value;
     }
 
-
     protected abstract String getVarName();
 }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
index 6b05827..a75d300 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
+
 import com.alibaba.fastjson.JSONObject;
 
 import org.apache.commons.lang3.StringUtils;
@@ -87,7 +88,7 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
     /**
      * 用消息中的哪个字段做时间字段
      */
-    protected String timeFieldName ;
+    protected String timeFieldName;
 
     /**
      * having column in having clause eg: key:'having_sum_0001' value:'having_sum_0001=SUM(OrderPrice)<2000' note: here ignore the logical relation value may be multi expression which split by ${SCRIPT_SPLIT_CHAR} update: change sql(move the function into select clause) to escape function in having clause
@@ -102,7 +103,7 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
     /**
      * SQL中group by的字段,使用;拼接,如"name;age"
      */
-    protected String groupByFieldName ;
+    protected String groupByFieldName;
 
     /**
      * 意义同blink中,允许最晚的消息到达时间,单位是分钟
@@ -120,7 +121,7 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
     /**
      * 主要是做兼容,以前设计的窗口时间是分钟为单位,如果有秒作为窗口时间的,通过设置timeUntiAdjust=1来实现。 后续需要调整成直接秒级窗口
      */
-    protected  int timeUnitAdjust=60;
+    protected int timeUnitAdjust = 60;
     /**
      * the variable name of window size which can be got from message
      */
@@ -144,14 +145,14 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
     /**
      * 默认为空,窗口的触发类似flink,在测试模式下,因为消息有界,期望当消息发送完成后能触发,可以设置两条消息的最大间隔,超过这个间隔,将直接触发消息
      */
-    protected Long msgMaxGapSecond=10L;
+    protected Long msgMaxGapSecond = 10L;
 
     /**
      * 是否支持过期数据的计算 过期:当前时间大于数据所在窗口的触发时间
      */
-    protected int fireMode=0;//0:普通触发,firetime后收到数据丢弃;1:多实例多次独立触发,在watermark时间内,同starttime,endtime创建多个实例,多次触发;2.单实例,多次独立触发,每次触发是最新值
+    protected int fireMode = 0;//0:普通触发,firetime后收到数据丢弃;1:多实例多次独立触发,在watermark时间内,同starttime,endtime创建多个实例,多次触发;2.单实例,多次独立触发,每次触发是最新值
 
-    protected boolean isLocalStorageOnly=true;//是否只用本地存储,可以提高性能,但不保证可靠性
+    protected boolean isLocalStorageOnly = true;//是否只用本地存储,可以提高性能,但不保证可靠性
     protected String reduceSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
     protected transient IReducer reducer;
     /**
@@ -205,12 +206,12 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
         /**
          * 如果没有db配置,不开启远程存储服务
          */
-        if(!ORMUtil.hasConfigueDB()){
-            isLocalStorageOnly=true;
+        if (!ORMUtil.hasConfigueDB()) {
+            isLocalStorageOnly = true;
         }
-        sqlCache=new SQLCache(isLocalStorageOnly);
-        AbstractWindow window=this;
-        windowCache=new WindowCache(){
+        sqlCache = new SQLCache(isLocalStorageOnly);
+        AbstractWindow window = this;
+        windowCache = new WindowCache() {
 
             @Override
             protected String generateShuffleKey(IMessage message) {
@@ -226,12 +227,12 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
         initFunctionExecutor();
         //启动shuffle channel 实现消息shuffle以及接收shuffle消息并处理
         // FireManager.getInstance().startFireCheck();
-        if(StringUtil.isNotEmpty(this.reduceSerializeValue)){
-            byte[] bytes= Base64Utils.decode(  this.reduceSerializeValue);
+        if (StringUtil.isNotEmpty(this.reduceSerializeValue)) {
+            byte[] bytes = Base64Utils.decode(this.reduceSerializeValue);
             reducer = InstantiationUtil.deserializeObject(bytes);
         }
-        eventTimeManager=new EventTimeManager();
-        windowMaxValueManager = new WindowMaxValueManager(this,sqlCache);
+        eventTimeManager = new EventTimeManager();
+        windowMaxValueManager = new WindowMaxValueManager(this, sqlCache);
         return success;
     }
 
@@ -282,22 +283,22 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
     }
     */
 
-    public  WindowInstance createWindowInstance(String startTime, String endTime, String fireTime,String splitId) {
-        WindowInstance windowInstance =new WindowInstance();
+    public WindowInstance createWindowInstance(String startTime, String endTime, String fireTime, String splitId) {
+        WindowInstance windowInstance = new WindowInstance();
         windowInstance.setFireTime(fireTime);
         windowInstance.setStartTime(startTime);
         windowInstance.setEndTime(endTime);
         windowInstance.setSplitId(splitId);
         windowInstance.setGmtCreate(new Date());
         windowInstance.setGmtModified(new Date());
-        windowInstance.setWindowInstanceName(createWindowInstanceName(startTime,endTime,fireTime));
+        windowInstance.setWindowInstanceName(createWindowInstanceName(startTime, endTime, fireTime));
         windowInstance.setWindowName(getConfigureName());
         windowInstance.setWindowNameSpace(getNameSpace());
-        String windowInstanceId =windowInstance.createWindowInstanceId();
+        String windowInstanceId = windowInstance.createWindowInstanceId();
         String dbWindowInstanceId = StringUtil.createMD5Str(windowInstanceId);
         windowInstance.setWindowInstanceKey(dbWindowInstanceId);
 
-        windowInstance.setWindowInstanceSplitName(StringUtil.createMD5Str(MapKeyUtil.createKey(getNameSpace(), getConfigureName(),splitId)));
+        windowInstance.setWindowInstanceSplitName(StringUtil.createMD5Str(MapKeyUtil.createKey(getNameSpace(), getConfigureName(), splitId)));
         windowInstance.setNewWindowInstance(true);
         return windowInstance;
     }
@@ -310,8 +311,8 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
      * @param fireTime
      * @return
      */
-    public String createWindowInstanceName(String startTime, String endTime, String fireTime){
-        return fireMode==0?getConfigureName():fireTime;
+    public String createWindowInstanceName(String startTime, String endTime, String fireTime) {
+        return fireMode == 0 ? getConfigureName() : fireTime;
     }
 
     /**
@@ -322,14 +323,14 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
      * @return
      */
 
-    public long incrementAndGetSplitNumber(WindowInstance instance,String shuffleId){
-        long maxValue= windowMaxValueManager.incrementAndGetSplitNumber(instance,shuffleId);
+    public long incrementAndGetSplitNumber(WindowInstance instance, String shuffleId) {
+        long maxValue = windowMaxValueManager.incrementAndGetSplitNumber(instance, shuffleId);
         return maxValue;
     }
 
     public abstract Class getWindowBaseValueClass();
 
-    public abstract int fireWindowInstance(WindowInstance windowInstance,Map<String,String>queueId2Offset) ;
+    public abstract int fireWindowInstance(WindowInstance windowInstance, Map<String, String> queueId2Offset);
 
     /**
      * 计算每条记录的group by值,对于groupby分组,里面任何字段不能为null值,如果为null值,这条记录会被忽略
@@ -337,21 +338,21 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
      * @param message
      * @return
      */
-    protected String generateShuffleKey(IMessage message){
+    protected String generateShuffleKey(IMessage message) {
         if (StringUtil.isEmpty(groupByFieldName)) {
             return null;
         }
-        JSONObject msg=message.getMessageBody();
+        JSONObject msg = message.getMessageBody();
         String[] fieldNames = groupByFieldName.split(";");
-        String[] values=new String[fieldNames.length];
+        String[] values = new String[fieldNames.length];
         boolean isFirst = true;
-        int i=0;
+        int i = 0;
         for (String filedName : fieldNames) {
             if (isFirst) {
                 isFirst = false;
             }
             String value = msg.getString(filedName);
-            values[i]=value;
+            values[i] = value;
             i++;
         }
         return MapKeyUtil.createKey(values);
@@ -359,8 +360,8 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
 
     public abstract void clearFireWindowInstance(WindowInstance windowInstance);
 
-    public void clearFire(WindowInstance windowInstance){
-        if(windowInstance==null){
+    public void clearFire(WindowInstance windowInstance) {
+        if (windowInstance == null) {
             return;
         }
         clearFireWindowInstance(windowInstance);
@@ -412,7 +413,7 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
                             scriptBuilder = new StringBuilder();
                         }
                         String[] functionParameterNames = scriptParameterList.stream().map(
-                            scriptParameter -> scriptParameter.getScriptParameterStr()).collect(Collectors.toList())
+                                scriptParameter -> scriptParameter.getScriptParameterStr()).collect(Collectors.toList())
                             .toArray(new String[0]);
                         AggregationScript accEngine = new AggregationScript(
                             ((ScriptExpression)expression).getNewFieldName(), functionName,
@@ -460,22 +461,23 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
      * @param message
      * @return
      */
-    public List<WindowInstance> queryOrCreateWindowInstance(IMessage message,String queueId) {
-        return  WindowInstance.getOrCreateWindowInstance(this, WindowInstance.getOccurTime(this, message), timeUnitAdjust,
+    public List<WindowInstance> queryOrCreateWindowInstance(IMessage message, String queueId) {
+        return WindowInstance.getOrCreateWindowInstance(this, WindowInstance.getOccurTime(this, message), timeUnitAdjust,
             queueId);
     }
 
     /**
      * 获取window处理的消息中最大的时间
+     *
      * @param msg
      * @return
      */
-    public void updateMaxEventTime(IMessage msg){
-        eventTimeManager.updateEventTime(msg,this);
+    public void updateMaxEventTime(IMessage msg) {
+        eventTimeManager.updateEventTime(msg, this);
     }
 
     public Long getMaxEventTime(String queueId) {
-       return this.eventTimeManager.getMaxEventTime(queueId);
+        return this.eventTimeManager.getMaxEventTime(queueId);
     }
 
     /**
@@ -483,15 +485,15 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
      *
      * @param windowValueList
      */
-    public void sendFireMessage(List<WindowValue> windowValueList,String queueId) {
+    public void sendFireMessage(List<WindowValue> windowValueList, String queueId) {
         int count = 0;
-        List<IMessage> msgs=new ArrayList<>();
+        List<IMessage> msgs = new ArrayList<>();
         for (WindowValue windowValue : windowValueList) {
             JSONObject message = new JSONObject();
 
-            if(JSONObject.class.isInstance(windowValue.getcomputedResult())){
-                message=(JSONObject)windowValue.getcomputedResult();
-            }else {
+            if (JSONObject.class.isInstance(windowValue.getcomputedResult())) {
+                message = (JSONObject)windowValue.getcomputedResult();
+            } else {
                 Iterator<Entry<String, Object>> it = windowValue.iteratorComputedColumnResult();
                 while (it.hasNext()) {
                     Entry<String, Object> entry = it.next();
@@ -499,23 +501,23 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
                 }
             }
 
-            Long fireTime=DateUtil.parseTime(windowValue.getFireTime()).getTime();
-            long baseTime= 1577808000000L  ;//set base time from 2021-01-01 00:00:00
-            int sameFireCount=0;
-            if(fireMode!=0){
-                Long endTime=DateUtil.parseTime(windowValue.getEndTime()).getTime();
-                sameFireCount=(int)((fireTime-endTime)/1000)/sizeInterval*timeUnitAdjust;
-                if(sameFireCount>=1){
-                    sameFireCount=1;
+            Long fireTime = DateUtil.parseTime(windowValue.getFireTime()).getTime();
+            long baseTime = 1577808000000L;//set base time from 2021-01-01 00:00:00
+            int sameFireCount = 0;
+            if (fireMode != 0) {
+                Long endTime = DateUtil.parseTime(windowValue.getEndTime()).getTime();
+                sameFireCount = (int)((fireTime - endTime) / 1000) / sizeInterval * timeUnitAdjust;
+                if (sameFireCount >= 1) {
+                    sameFireCount = 1;
                 }
             }
             //can keep offset in order
-            Long offset=((fireTime-baseTime)/1000*10+sameFireCount)*100000000+windowValue.getPartitionNum();
-            message.put("windowInstanceId",windowValue.getWindowInstancePartitionId());
-            message.put("start_time",windowValue.getStartTime());
-            message.put("end_time",windowValue.getEndTime());
-            message.put("offset",offset);
-            Message newMessage=windowFireSource.createMessage(message,queueId,offset+"",false);
+            Long offset = ((fireTime - baseTime) / 1000 * 10 + sameFireCount) * 100000000 + windowValue.getPartitionNum();
+            message.put("windowInstanceId", windowValue.getWindowInstancePartitionId());
+            message.put("start_time", windowValue.getStartTime());
+            message.put("end_time", windowValue.getEndTime());
+            message.put("offset", offset);
+            Message newMessage = windowFireSource.createMessage(message, queueId, offset + "", false);
             newMessage.getHeader().setOffsetIsLong(true);
             if (count == windowValueList.size() - 1) {
                 newMessage.getHeader().setNeedFlush(true);
@@ -526,8 +528,8 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
             count++;
         }
 
-        if(DebugWriter.getDebugWriter(this.getConfigureName()).isOpenDebug()){
-            DebugWriter.getDebugWriter(this.getConfigureName()).writeWindowFire(this,msgs,queueId);
+        if (DebugWriter.getDebugWriter(this.getConfigureName()).isOpenDebug()) {
+            DebugWriter.getDebugWriter(this.getConfigureName()).writeWindowFire(this, msgs, queueId);
         }
     }
 
@@ -687,7 +689,7 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
     public void setReducer(IReducer reducer) {
         this.reducer = reducer;
         byte[] bytes = InstantiationUtil.serializeObject(reducer);
-        this.reduceSerializeValue=Base64Utils.encode(bytes);
+        this.reduceSerializeValue = Base64Utils.encode(bytes);
     }
 
     public int getTimeUnitAdjust() {
@@ -752,8 +754,8 @@ public abstract class AbstractWindow extends BasedConfigurable implements IWindo
         return sqlCache;
     }
 
-    public void initWindowInstanceMaxSplitNum(WindowInstance instance){
-        getWindowMaxValueManager().initMaxSplitNum(instance,queryWindowInstanceMaxSplitNum(instance));
+    public void initWindowInstanceMaxSplitNum(WindowInstance instance) {
+        getWindowMaxValueManager().initMaxSplitNum(instance, queryWindowInstanceMaxSplitNum(instance));
     }
 
     protected abstract Long queryWindowInstanceMaxSplitNum(WindowInstance instance);
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
index 03e4224..4bcfe3a 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/sqlcache/SQLCache.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
 import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache;
 import org.apache.rocketmq.streams.db.driver.DriverBuilder;
@@ -34,9 +35,9 @@ import org.apache.rocketmq.streams.db.driver.JDBCDriver;
  */
 
 public class SQLCache extends AbstractMultiSplitMessageCache<ISQLElement> {
-    protected Boolean isOpenCache=true;//if false,then execute sql when receive sql
-    protected Set<String> firedWindowInstances=new HashSet<>();//fired window instance ,if the owned sqls have not commit, can cancel the sqls
-    protected Map<String,Integer> windowInstance2Index=new HashMap<>();//set index to ISQLElement group by window instance
+    protected Boolean isOpenCache = true;//if false,then execute sql when receive sql
+    protected Set<String> firedWindowInstances = new HashSet<>();//fired window instance ,if the owned sqls have not commit, can cancel the sqls
+    protected Map<String, Integer> windowInstance2Index = new HashMap<>();//set index to ISQLElement group by window instance
 
     protected boolean isLocalOnly;
 
@@ -45,7 +46,7 @@ public class SQLCache extends AbstractMultiSplitMessageCache<ISQLElement> {
         this.isLocalOnly = isLocalOnly;
         this.flushCallBack = new MessageFlushCallBack(new SQLCacheCallback());
         this.setBatchSize(1000);
-        this.setAutoFlushTimeGap(30 * 1000);
+        this.setAutoFlushTimeGap(10 * 1000);
         this.setAutoFlushSize(100);
         this.openAutoFlush();
     }

Mime
View raw message