rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-streams] branch main updated: Add Sqlmode (#63)
Date Wed, 15 Sep 2021 03:45:45 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 87a3222  Add Sqlmode  (#63)
87a3222 is described below

commit 87a32224d2a874ba50dac4aee881237f0f62c8ae
Author: 零号程序 <jet.j.j.cheng@gmail.com>
AuthorDate: Wed Sep 15 11:45:39 2021 +0800

    Add Sqlmode  (#63)
    
    * add update logic for the DBSinker 、 upgrade the concat_ws function
    
    * Add the field level cache to reduce duplicate data entry #60
    
    * Sqlmode can be used to write data to the database as specified
    #62
    
    Co-authored-by: junjie.cheng <junjie.cheng@alibaba-inc.com>
---
 .../apache/rocketmq/streams/db/sink/DBSink.java    | 184 ++++++++++++---------
 .../rocketmq/streams/db/sink/DBSinkBuilder.java    |  24 +--
 .../streams/client/transform/DataStream.java       |  70 ++++----
 .../streams/common/metadata/AbstractMetaData.java  |   2 +-
 .../rocketmq/streams/common/metadata/MetaData.java |   2 +-
 .../streams/common/metadata/MetaDataField.java     |  32 ++--
 .../common/topology/stages/OutputChainStage.java   |   5 +-
 7 files changed, 167 insertions(+), 152 deletions(-)

diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
index 1aab2ba..886e23a 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java
@@ -18,40 +18,34 @@ package org.apache.rocketmq.streams.db.sink;
 
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.Lists;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
 import org.apache.rocketmq.streams.common.channel.IChannel;
 import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
 import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
-import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
 import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
 import org.apache.rocketmq.streams.common.component.AbstractComponent;
 import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
 import org.apache.rocketmq.streams.common.context.IMessage;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
+import org.apache.rocketmq.streams.common.metadata.MetaDataField;
 import org.apache.rocketmq.streams.common.utils.SQLUtil;
 import org.apache.rocketmq.streams.common.utils.StringUtil;
 import org.apache.rocketmq.streams.db.driver.DriverBuilder;
 import org.apache.rocketmq.streams.db.driver.JDBCDriver;
 
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Set;
-
 /**
  * 主要用于写db,输入可以是一个insert/replace 模版,也可以是metadata对象,二者选一即可。都支持批量插入,提高吞吐
sql 模版:insert into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
MetaData:主要是描述每个字段的类型,是否必须 二者选一个即可。sql模式,系统会把一批(batchSize)数据拼成一个大sql。metadata模式,基于字段描述,最终也是拼成一个大sql
  */
 public class DBSink extends AbstractSink {
-
-    protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert
into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
-
-    protected String duplicateSQLTemplate; //通过on duplicate key update 来对已经存在的信息进行更新
-
-    protected MetaData metaData;//可以指定meta data,和insertSQL二选一
-
-    protected String tableName; //指定要插入的数据表
+    public static final String SQL_MODE_DEFAULT = "default";
+    public static final String SQL_MODE_REPLACE = "replace";
+    public static final String SQL_MODE_IGNORE = "ignore";
 
     @ENVDependence
     protected String jdbcDriver = AbstractComponent.DEFAULT_JDBC_DRIVER;
@@ -60,7 +54,15 @@ public class DBSink extends AbstractSink {
     @ENVDependence
     protected String userName;
     @ENVDependence
+    protected String tableName; //指定要插入的数据表
+    @ENVDependence
     protected String password;
+    @ENVDependence
+    protected String sqlMode;
+
+    protected MetaData metaData;//可以指定meta data,和insertSQL二选一
+
+    protected String insertSQLTemplate;//完成插入部分的工作,和metadata二选一。insert
into table(column1,column2,column3)values('#{var1}',#{var2},'#{var3}')
 
     protected boolean openSqlCache = true;
 
@@ -87,60 +89,83 @@ public class DBSink extends AbstractSink {
     }
 
     public DBSink() {
-        setType(IChannel.TYPE);
+        this(null, null, null, null);
     }
 
-    public DBSink(String url, String userName, String password) {
-        setType(IChannel.TYPE);
-        this.url = url;
-        this.userName = userName;
-        this.password = password;
+    public DBSink(String url, String userName, String password, String tableName) {
+        this(url, userName, password, tableName, SQL_MODE_DEFAULT);
     }
 
-    public DBSink(String insertSQL, String url, String userName, String password) {
+    public DBSink(String url, String userName, String password, String tableName, String
sqlMode) {
+        this(url, userName, password, tableName, sqlMode, null);
+    }
+
+    public DBSink(String url, String userName, String password, String tableName, String
sqlMode, MetaData metaData) {
         setType(IChannel.TYPE);
         this.url = url;
         this.userName = userName;
         this.password = password;
-        this.insertSQLTemplate = insertSQL;
+        this.tableName = tableName;
+        this.sqlMode = sqlMode;
+        this.metaData = metaData;
     }
 
     @Override
     protected boolean initConfigurable() {
-        try {
-            Class.forName("com.mysql.jdbc.Driver");
-            if (StringUtil.isNotEmpty(this.tableName)) {
-                Connection connection = DriverManager.getConnection(url, userName, password);
-                DatabaseMetaData metaData = connection.getMetaData();
-                ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%",
this.tableName, null);
-                this.metaData = MetaData.createMetaData(metaResult);
-                this.metaData.setTableName(this.tableName);
-            }
-            sqlCache = new MessageCache<>(new IMessageFlushCallBack<String>()
{
-                @Override
-                public boolean flushMessage(List<String> sqls) {
-                    JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName,
password);
-                    try {
-                        dataSource.executSqls(sqls);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                        throw new RuntimeException(e);
-                    } finally {
-                        if (dataSource != null) {
-                            dataSource.destroy();
-                        }
-                    }
-                    return true;
+        if (this.metaData == null) {
+            try {
+                Class.forName("com.mysql.jdbc.Driver");
+                if (StringUtil.isNotEmpty(this.tableName)) {
+                    Connection connection = DriverManager.getConnection(this.url, this.userName,
this.password);
+                    DatabaseMetaData connectionMetaData = connection.getMetaData();
+                    ResultSet metaResult = connectionMetaData.getColumns(connection.getCatalog(),
"%", this.tableName, null);
+                    this.metaData = MetaData.createMetaData(metaResult);
+                    this.metaData.setTableName(this.tableName);
                 }
-            });
-            ((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
-            ((MessageCache<String>) sqlCache).setAutoFlushSize(50);
-            sqlCache.openAutoFlush();
-            return super.initConfigurable();
-        } catch (ClassNotFoundException | SQLException e) {
-            e.printStackTrace();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        List<MetaDataField> fieldList = this.metaData.getMetaDataFields();
+        List<String> insertFields = Lists.newArrayList();
+        List<String> insertValues = Lists.newArrayList();
+        List<String> duplicateKeys = Lists.newArrayList();
+        fieldList.forEach(field -> {
+            String fieldName = field.getFieldName();
+            insertFields.add(fieldName);
+            insertValues.add("'#{" + fieldName + "}'");
+            duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
+        });
+
+        String sql = "insert";
+        if (sqlMode == null || SQL_MODE_DEFAULT.equals(sqlMode)) {
+            sql = sql + " into ";
+        } else if (SQL_MODE_IGNORE.equals(sqlMode)) {
+            sql = sql + " ignore into ";
+        } else if (SQL_MODE_REPLACE.equals(sqlMode)) {
+            sql = sql + " into ";
         }
-        return false;
+        sql = sql + tableName + "(" + String.join(",", insertFields) + ") values (" + String.join(",",
insertValues) + ")  ";
+        if (SQL_MODE_REPLACE.equals(sqlMode)) {
+            sql = sql + " on duplicate key update " + String.join(",", duplicateKeys);
+        }
+        this.insertSQLTemplate = sql;
+        this.sqlCache = new MessageCache<>(sqls -> {
+            JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName,
password);
+            try {
+                dataSource.executSqls(sqls);
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            } finally {
+                dataSource.destroy();
+            }
+            return true;
+        });
+        ((MessageCache<String>) this.sqlCache).setAutoFlushTimeGap(100000);
+        ((MessageCache<String>) this.sqlCache).setAutoFlushSize(50);
+        this.sqlCache.openAutoFlush();
+        return super.initConfigurable();
     }
 
     @Override
@@ -154,7 +179,6 @@ public class DBSink extends AbstractSink {
             if (StringUtil.isEmpty(insertSQLTemplate) && metaData != null) {
                 String sql = SQLUtil.createInsertSql(metaData, messages.get(0));
                 sql += SQLUtil.createInsertValuesSQL(metaData, messages.subList(1, messages.size()));
-                sql += this.duplicateSQLTemplate;
                 executeSQL(dbDataSource, sql);
                 return true;
             }
@@ -162,7 +186,6 @@ public class DBSink extends AbstractSink {
             if (StringUtil.isEmpty(insertValueSQL) || insertSQLTemplate.replace(insertValueSQL,
"").contains("#{")) {
                 for (JSONObject message : messages) {
                     String sql = parseSQL(message, insertSQLTemplate);
-                    sql += this.duplicateSQLTemplate;
                     executeSQL(dbDataSource, sql);
                 }
                 return true;
@@ -172,7 +195,6 @@ public class DBSink extends AbstractSink {
                     subInsert.add(parseSQL(message, insertValueSQL));
                 }
                 String insertSQL = this.insertSQLTemplate.replace(insertValueSQL, String.join(",",
subInsert));
-                insertSQL += this.duplicateSQLTemplate;
                 executeSQL(dbDataSource, insertSQL);
                 return true;
             }
@@ -195,7 +217,16 @@ public class DBSink extends AbstractSink {
         } else {
             dbDataSource.execute(sql);
         }
+    }
 
+    protected void executeSQL(JDBCDriver dbDataSource, List<String> sqls) {
+        if (isOpenSqlCache()) {
+            for (String sql : sqls) {
+                this.sqlCache.addCache(sql);
+            }
+        } else {
+            dbDataSource.executSqls(sqls);
+        }
     }
 
     /**
@@ -209,7 +240,7 @@ public class DBSink extends AbstractSink {
             return null;
         }
         String valuesSQL = insertSQL.substring(start + VALUES_NAME.length());
-        int end = valuesSQL.toLowerCase().lastIndexOf(")");
+        int end = valuesSQL.toLowerCase().indexOf(")");
         if (end == -1) {
             return null;
         }
@@ -260,14 +291,6 @@ public class DBSink extends AbstractSink {
         this.password = password;
     }
 
-    public MetaData getMetaData() {
-        return metaData;
-    }
-
-    public void setMetaData(MetaData metaData) {
-        this.metaData = metaData;
-    }
-
     public String getTableName() {
         return tableName;
     }
@@ -276,6 +299,22 @@ public class DBSink extends AbstractSink {
         this.tableName = tableName;
     }
 
+    public String getSqlMode() {
+        return sqlMode;
+    }
+
+    public void setSqlMode(String sqlMode) {
+        this.sqlMode = sqlMode;
+    }
+
+    public MetaData getMetaData() {
+        return metaData;
+    }
+
+    public void setMetaData(MetaData metaData) {
+        this.metaData = metaData;
+    }
+
     public boolean isOpenSqlCache() {
         return openSqlCache;
     }
@@ -284,11 +323,4 @@ public class DBSink extends AbstractSink {
         this.openSqlCache = openSqlCache;
     }
 
-    public String getDuplicateSQLTemplate() {
-        return duplicateSQLTemplate;
-    }
-
-    public void setDuplicateSQLTemplate(String duplicateSQLTemplate) {
-        this.duplicateSQLTemplate = duplicateSQLTemplate;
-    }
 }
diff --git a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
index 87f5fb7..1743f62 100644
--- a/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
+++ b/rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSinkBuilder.java
@@ -17,17 +17,13 @@
 package org.apache.rocketmq.streams.db.sink;
 
 import com.google.auto.service.AutoService;
-import com.google.common.collect.Lists;
+import java.util.Properties;
 import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
 import org.apache.rocketmq.streams.common.channel.sink.ISink;
 import org.apache.rocketmq.streams.common.channel.source.ISource;
 import org.apache.rocketmq.streams.common.metadata.MetaData;
-import org.apache.rocketmq.streams.common.metadata.MetaDataField;
 import org.apache.rocketmq.streams.common.model.ServiceName;
 
-import java.util.List;
-import java.util.Properties;
-
 @AutoService(IChannelBuilder.class)
 @ServiceName(DBSinkBuilder.TYPE)
 public class DBSinkBuilder implements IChannelBuilder {
@@ -39,21 +35,9 @@ public class DBSinkBuilder implements IChannelBuilder {
         sink.setUrl(properties.getProperty("url"));
         sink.setUserName(properties.getProperty("userName"));
         sink.setPassword(properties.getProperty("password"));
-        List<MetaDataField> fieldList = metaData.getMetaDataFields();
-
-        List<String> insertFields = Lists.newArrayList();
-        List<String> insertValues = Lists.newArrayList();
-        List<String> duplicateKeys = Lists.newArrayList();
-        fieldList.forEach(field -> {
-            String fieldName = field.getFieldName();
-            insertFields.add(fieldName);
-            insertValues.add("'#{" + fieldName + "}'");
-            duplicateKeys.add(fieldName + " = VALUES(" + fieldName + ")");
-        });
-
-        String sql = "insert into " + properties.getProperty("tableName") + "(" + String.join(",",
insertFields) + ") values (" + String.join(",", insertValues) + ")  ";
-        sink.setInsertSQLTemplate(sql);
-        sink.setDuplicateSQLTemplate(" on duplicate key update " + String.join(",", duplicateKeys));
+        sink.setTableName(properties.getProperty("tableName"));
+        sink.setSqlMode(properties.getProperty("sqlMode"));
+        sink.setMetaData(metaData);
         return sink;
     }
 
diff --git a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
index f9260ea..08bfe7f 100644
--- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
+++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java
@@ -73,7 +73,8 @@ public class DataStream implements Serializable {
         this.currentChainStage = currentChainStage;
     }
 
-    public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder> pipelineBuilders,
ChainStage<?> currentChainStage) {
+    public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder> pipelineBuilders,
+        ChainStage<?> currentChainStage) {
         this.mainPipelineBuilder = pipelineBuilder;
         this.otherPipelineBuilders = pipelineBuilders;
         this.currentChainStage = currentChainStage;
@@ -97,11 +98,11 @@ public class DataStream implements Serializable {
             @Override
             protected <T> T operate(IMessage message, AbstractContext context) {
                 try {
-                    O o = (O)(message.getMessageValue());
-                    T result = (T)mapFunction.map(o);
+                    O o = (O) (message.getMessageValue());
+                    T result = (T) mapFunction.map(o);
                     if (result != message.getMessageValue()) {
                         if (result instanceof JSONObject) {
-                            message.setMessageBody((JSONObject)result);
+                            message.setMessageBody((JSONObject) result);
                         } else {
                             message.setMessageBody(new UserDefinedMessage(result));
                         }
@@ -118,28 +119,28 @@ public class DataStream implements Serializable {
         return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage);
     }
 
-
     public <T, O> DataStream flatMap(FlatMapFunction<T, O> mapFunction) {
         StageBuilder stageBuilder = new StageBuilder() {
             @Override
             protected <T> T operate(IMessage message, AbstractContext context) {
                 try {
-                    O o = (O)(message.getMessageValue());
-                    List<T> result =(List<T>)mapFunction.flatMap(o);
-                    if(result==null||result.size()==0){
+                    O o = (O) (message.getMessageValue());
+                    List<T> result = (List<T>) mapFunction.flatMap(o);
+                    if (result == null || result.size() == 0) {
                         context.breakExecute();
                     }
-                    List<IMessage> splitMessages=new ArrayList<>();
-                    for(T t:result){
-                        Message subMessage=null;
+                    List<IMessage> splitMessages = new ArrayList<>();
+                    for (T t : result) {
+                        Message subMessage = null;
                         if (result instanceof JSONObject) {
-                            subMessage=new Message((JSONObject)t);
+                            subMessage = new Message((JSONObject) t);
                         } else {
-                            subMessage=new Message(new UserDefinedMessage(t));
+                            subMessage = new Message(new UserDefinedMessage(t));
                         }
                         splitMessages.add(subMessage);
                     }
-                    context.openSplitModel();;
+                    context.openSplitModel();
+                    ;
                     context.setSplitMessages(splitMessages);
                     return null;
                 } catch (Exception e) {
@@ -159,7 +160,7 @@ public class DataStream implements Serializable {
             @Override
             protected <T> T operate(IMessage message, AbstractContext context) {
                 try {
-                    boolean isFilter = filterFunction.filter((O)message.getMessageValue());
+                    boolean isFilter = filterFunction.filter((O) message.getMessageValue());
                     if (isFilter) {
                         context.breakExecute();
                     }
@@ -232,7 +233,7 @@ public class DataStream implements Serializable {
         Union union = new Union();
 
         //处理左流,做流的isMain设置成true
-        UDFUnionChainStage chainStage = (UDFUnionChainStage)this.mainPipelineBuilder.createStage(union);
+        UDFUnionChainStage chainStage = (UDFUnionChainStage) this.mainPipelineBuilder.createStage(union);
         chainStage.setMainStream(true);
         this.mainPipelineBuilder.setTopologyStages(currentChainStage, chainStage);
 
@@ -272,7 +273,8 @@ public class DataStream implements Serializable {
      * @param sqlOrTableName
      * @return
      */
-    public JoinStream join(String url, String userName, String password, String sqlOrTableName,
long pollingTimeMintue) {
+    public JoinStream join(String url, String userName, String password, String sqlOrTableName,
+        long pollingTimeMintue) {
         return join(url, userName, password, sqlOrTableName, null, pollingTimeMintue);
     }
 
@@ -285,7 +287,8 @@ public class DataStream implements Serializable {
      * @param sqlOrTableName
      * @return
      */
-    public JoinStream join(String url, String userName, String password, String sqlOrTableName,
String jdbcDriver, long pollingTimeMinute) {
+    public JoinStream join(String url, String userName, String password, String sqlOrTableName,
String jdbcDriver,
+        long pollingTimeMinute) {
         DBDim dbDim = new DBDim();
         dbDim.setUrl(url);
         dbDim.setUserName(userName);
@@ -308,7 +311,7 @@ public class DataStream implements Serializable {
         StageBuilder selfChainStage = new StageBuilder() {
             @Override
             protected <T> T operate(IMessage message, AbstractContext context) {
-                forEachFunction.foreach((O)message.getMessageValue());
+                forEachFunction.foreach((O) message.getMessageValue());
                 return null;
             }
         };
@@ -367,7 +370,6 @@ public class DataStream implements Serializable {
             return;
         }
 
-
         ConfigurableComponent configurableComponent = ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(),
ConfigurableComponent.class, ConfigureFileKey.CONNECT_TYPE + ":memory");
         ChainPipeline pipeline = this.mainPipelineBuilder.build(configurableComponent.getService());
         pipeline.startChannel();
@@ -403,21 +405,23 @@ public class DataStream implements Serializable {
         this.otherPipelineBuilders.addAll(rightSource.otherPipelineBuilders);
     }
 
-    public DataStreamAction toFile(String filePath,int batchSize,boolean isAppend) {
-        FileSink fileChannel = new FileSink(filePath,isAppend);
-        if(batchSize>0){
+    public DataStreamAction toFile(String filePath, int batchSize, boolean isAppend) {
+        FileSink fileChannel = new FileSink(filePath, isAppend);
+        if (batchSize > 0) {
             fileChannel.setBatchSize(batchSize);
         }
         ChainStage<?> output = mainPipelineBuilder.createStage(fileChannel);
         mainPipelineBuilder.setTopologyStages(currentChainStage, output);
         return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders,
output);
     }
-    public DataStreamAction toFile(String filePath,boolean isAppend) {
-        FileSink fileChannel = new FileSink(filePath,isAppend);
+
+    public DataStreamAction toFile(String filePath, boolean isAppend) {
+        FileSink fileChannel = new FileSink(filePath, isAppend);
         ChainStage<?> output = mainPipelineBuilder.createStage(fileChannel);
         mainPipelineBuilder.setTopologyStages(currentChainStage, output);
         return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders,
output);
     }
+
     public DataStreamAction toFile(String filePath) {
         FileSink fileChannel = new FileSink(filePath);
         ChainStage<?> output = mainPipelineBuilder.createStage(fileChannel);
@@ -440,28 +444,26 @@ public class DataStream implements Serializable {
     }
 
     public DataStreamAction toDB(String url, String userName, String password, String tableName)
{
-        DBSink dbChannel = new DBSink(url, userName, password);
-        dbChannel.setTableName(tableName);
+        DBSink dbChannel = new DBSink(url, userName, password, tableName);
         ChainStage<?> output = this.mainPipelineBuilder.createStage(dbChannel);
         this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
         return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders,
output);
     }
 
     public DataStreamAction toRocketmq(String topic) {
-        return toRocketmq(topic, "*", null,-1, null);
+        return toRocketmq(topic, "*", null, -1, null);
     }
 
-
-    public DataStreamAction toRocketmq(String topic,String namesrvAddr) {
-        return toRocketmq(topic, "*", null,-1, namesrvAddr);
+    public DataStreamAction toRocketmq(String topic, String namesrvAddr) {
+        return toRocketmq(topic, "*", null, -1, namesrvAddr);
     }
 
     public DataStreamAction toRocketmq(String topic, String tags,
-                                       String namesrvAddr) {
-        return toRocketmq(topic, tags,null,-1, namesrvAddr);
+        String namesrvAddr) {
+        return toRocketmq(topic, tags, null, -1, namesrvAddr);
     }
 
-    public DataStreamAction toRocketmq(String topic, String tags,String groupName, int batchSize,
 String namesrvAddr) {
+    public DataStreamAction toRocketmq(String topic, String tags, String groupName, int batchSize,
String namesrvAddr) {
         RocketMQSink rocketMQSink = new RocketMQSink();
         rocketMQSink.setTopic(topic);
         rocketMQSink.setTags(tags);
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java
index 7efd6d3..59f7464 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/AbstractMetaData.java
@@ -197,7 +197,7 @@ public abstract class AbstractMetaData<T> extends BasedConfigurable
         Iterator i$ = this.metaDataFields.iterator();
 
         while (i$.hasNext()) {
-            MetaDataField<T> field = (MetaDataField)i$.next();
+            MetaDataField<T> field = (MetaDataField) i$.next();
             jsonArray.add(field.toJson());
         }
 
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java
index 3deec90..b4f04d3 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaData.java
@@ -164,7 +164,7 @@ public class MetaData extends AbstractMetaData {
     }
 
     public static String getTableName(Class clazz) {
-        TableClassName tableClass = (TableClassName)clazz.getAnnotation(TableClassName.class);
+        TableClassName tableClass = (TableClassName) clazz.getAnnotation(TableClassName.class);
         if (tableClass != null) {
             String className = tableClass.value();
             if (StringUtil.isNotEmpty(className)) {
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java
index 9ec882b..5a035ae 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/metadata/MetaDataField.java
@@ -116,7 +116,7 @@ public class MetaDataField<T> extends Entity implements IJsonable
{
         JSONObject jsonObject = new JSONObject();
         jsonObject.put("fieldName", fieldName);
         if (dataType == null) {
-            dataType = (DataType)new StringDataType();
+            dataType = (DataType) new StringDataType();
         }
         jsonObject.put("dataType", dataType.toJson());
         jsonObject.put("isRequired", isRequired);
@@ -134,8 +134,8 @@ public class MetaDataField<T> extends Entity implements IJsonable
{
         this.isPrimary = jsonObject.getBoolean("isPrimary");
     }
 
-    public static DataType getDataTypeByStr(String dataType) {
-        DataType dt = null;
+    public static DataType<?> getDataTypeByStr(String dataType) {
+        DataType<?> dt = null;
         if ("String".equals(dataType)) {
             dt = new StringDataType();
         } else if ("long".equals(dataType)) {
@@ -152,22 +152,20 @@ public class MetaDataField<T> extends Entity implements IJsonable
{
         return dt;
     }
 
-    public static String getDataTypeStrByType(DataType dataType) {
-        String dataTypestr = "";
-        if (StringDataType.class.isInstance(dataType)) {
-            dataTypestr = "String";
-        } else if (LongDataType.class.isInstance(dataType)) {
-            dataTypestr = "long";
-        } else if (IntDataType.class.isInstance(dataType)) {
-            dataTypestr = "int";
-        } else if (FloatDataType.class.isInstance(dataType)) {
-            dataTypestr = "float";
-        } else if (Boolean.class.isInstance(dataType)) {
-            dataTypestr = "boolean";
+    public static String getDataTypeStrByType(DataType<?> dataType) {
+        String dataTypeStr = "";
+        if (dataType instanceof StringDataType) {
+            dataTypeStr = "String";
+        } else if (dataType instanceof LongDataType) {
+            dataTypeStr = "long";
+        } else if (dataType instanceof IntDataType) {
+            dataTypeStr = "int";
+        } else if (dataType instanceof FloatDataType) {
+            dataTypeStr = "float";
         } else {
-            dataTypestr = "String";
+            dataTypeStr = "String";
         }
-        return dataTypestr;
+        return dataTypeStr;
     }
 
 }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
index 5cbac0f..a715ca3 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/OutputChainStage.java
@@ -82,13 +82,12 @@ public class OutputChainStage<T extends IMessage> extends ChainStage<T>
implemen
              */
             if (openMockChannel()) {
                 if (mockSink != null) {
-                    mockSink.batchAdd(message.deepCopy());
+                    mockSink.batchAdd(message);
                     return message;
                 }
                 return message;
             }
-            sink.batchAdd(message.deepCopy());
-
+            sink.batchAdd(message);
             return message;
         }
 

Mime
View raw message