rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-streams] branch main updated: Joinwindow bug fix (#61)
Date Wed, 15 Sep 2021 03:37:32 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 0742cf4  Joinwindow bug fix (#61)
0742cf4 is described below

commit 0742cf44bd5cf3903461f85fa0c9c4eef2d3c447
Author: YUDA <xstorm@live.cn>
AuthorDate: Wed Sep 15 11:37:25 2021 +0800

    Joinwindow bug fix (#61)
    
    * fix bugs
    
    * fix joinwindow message remove bugs
---
 pom.xml                                            |   2 +-
 .../streams/common/topology/ChainStage.java        |   4 +-
 .../common/topology/model/AbstractStage.java       |   7 +-
 .../impl/string/SubStringIndexFunction.java        |   6 +
 .../streams/window/model/WindowInstance.java       |   4 +
 .../streams/window/operator/impl/OverWindow.java   |   2 +-
 .../streams/window/operator/join/JoinWindow.java   | 157 ++++++++++++++-------
 .../streams/window/shuffle/ShuffleChannel.java     |   3 +-
 .../streams/window/state/impl/WindowValue.java     |   2 +-
 9 files changed, 129 insertions(+), 58 deletions(-)

diff --git a/pom.xml b/pom.xml
index ab8825d..7bbe1f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
         <spring.version>3.2.13.RELEASE</spring.version>
         <auto-service.version>1.0-rc5</auto-service.version>
         <mysql-connector.version>5.1.40</mysql-connector.version>
-        <fastjson.version>1.2.27</fastjson.version>
+        <fastjson.version>1.2.78</fastjson.version>
         <quartz.version>2.2.1</quartz.version>
         <httpclient.version>4.5.2</httpclient.version>
         <commons-io.version>2.5</commons-io.version>
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
index 1916291..d1b7ab6 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java
@@ -152,7 +152,9 @@ public abstract class ChainStage<T extends IMessage> extends AbstractStage<T>
{
         }
         Set<ChainPipeline> set = new HashSet<>();
         for (Pipeline pipeline : pipelines) {
-            set.add((ChainPipeline)pipeline);
+            if (pipeline != null) {
+                set.add((ChainPipeline)pipeline);
+            }
         }
         sendSystem(message, context, set);
     }
diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
index 9860c51..1a2d8b1 100644
--- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
+++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java
@@ -84,7 +84,12 @@ public abstract class AbstractStage<T extends IMessage> extends BasedConfigurabl
             context.breakExecute();
             return null;
         }
-        TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage", label, t.getMessageBody().toJSONString());
+        try {
+
+            TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage", label, t.getMessageBody().toJSONString());
+        } catch (Exception e) {
+            LOG.error("t.getMessageBody() parse error", e);
+        }
         IStageHandle handle = selectHandle(t, context);
         if (handle == null) {
             return t;
diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java
b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java
index 705a5fc..3d50f01 100644
--- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java
+++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java
@@ -85,6 +85,12 @@ public class SubStringIndexFunction {
                                  @FunctionParamter(comment = "指定用于拆分原始字段的字符代表列名称或常量值",
value = "string") Integer startIndex,
                                  @FunctionParamter(comment = "指定用于拆分原始字段的字符代表列名称或常量值",
value = "string") Integer endIndex) {
         oriMsg = FunctionUtils.getValueString(message, context, oriMsg);
+        int msgLength = oriMsg.length();
+        if (startIndex >= msgLength) {
+            return "";
+        } else if (endIndex > msgLength) {
+            endIndex = msgLength;
+        }
         return oriMsg.substring(startIndex, endIndex);
     }
 
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
index ebcc0fe..c575976 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java
@@ -109,6 +109,10 @@ public class WindowInstance extends Entity implements Serializable {
         return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName,
startTime, endTime);
     }
 
+    public String createWindowInstanceIdWithoutSplitid() {
+        return MapKeyUtil.createKey(windowNameSpace, windowName, windowInstanceName, startTime,
endTime);
+    }
+
     public String createWindowInstanceTriggerId(){
         return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName,
startTime, endTime,fireTime);
     }
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java
index b8b74f8..86fa99b 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java
@@ -128,7 +128,7 @@ public class OverWindow extends AbstractWindow {
 
     @Override
     protected boolean initConfigurable() {
-        return true;
+        return super.initConfigurable();
     }
 
     @Override
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
index 6579647..920d525 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.streams.common.utils.TraceUtil;
 import org.apache.rocketmq.streams.common.context.AbstractContext;
 import org.apache.rocketmq.streams.common.context.Context;
 import org.apache.rocketmq.streams.common.context.IMessage;
@@ -40,6 +41,7 @@ import org.apache.rocketmq.streams.window.state.WindowBaseValue;
 import org.apache.rocketmq.streams.window.state.impl.JoinLeftState;
 import org.apache.rocketmq.streams.window.state.impl.JoinRightState;
 import org.apache.rocketmq.streams.window.state.impl.JoinState;
+import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;
 
 public class JoinWindow extends AbstractShuffleWindow {
 
@@ -73,6 +75,7 @@ public class JoinWindow extends AbstractShuffleWindow {
     //
     //    }
 
+
     @Override
     protected int fireWindowInstance(WindowInstance instance, String shuffleId, Map<String,
String> queueId2Offsets) {
         clearFire(instance);
@@ -81,7 +84,8 @@ public class JoinWindow extends AbstractShuffleWindow {
 
     @Override
     public void clearCache(String queueId) {
-
+        getStorage().clearCache(shuffleChannel.getChannelQueue(queueId),getWindowBaseValueClass());
+        ShufflePartitionManager.getInstance().clearSplit(queueId);
     }
 
     @Override
@@ -110,8 +114,7 @@ public class JoinWindow extends AbstractShuffleWindow {
         }
 
         for (IMessage msg : messages) {
-            MessageHeader header = JSONObject.parseObject(msg.getMessageBody().
-                getString(WindowCache.ORIGIN_MESSAGE_HEADER), MessageHeader.class);
+            MessageHeader header = msg.getHeader();
             String routeLabel = header.getMsgRouteFromLable();
             //            Map<String,WindowBaseValue> joinMessages = new HashMap<>();
             String storeKeyPrefix = "";
@@ -128,7 +131,11 @@ public class JoinWindow extends AbstractShuffleWindow {
             List<WindowBaseValue> tmpMessages = new ArrayList<>();
             int count = 0;
             while (iterator.hasNext()) {
-                tmpMessages.add(iterator.next());
+                WindowBaseValue windowBaseValue = iterator.next();
+                if (windowBaseValue == null) {
+                    continue;
+                }
+                tmpMessages.add(windowBaseValue);
                 count++;
                 if (count == 100) {
                     sendMessage(msg, tmpMessages);
@@ -146,7 +153,9 @@ public class JoinWindow extends AbstractShuffleWindow {
 
         List<WindowInstance> instances = new ArrayList<>();
         for (Map.Entry<String, WindowInstance> entry : this.windowInstanceMap.entrySet())
{
-            instances.add(entry.getValue());
+            if (queueId.equalsIgnoreCase(entry.getValue().getSplitId())) {
+                instances.add(entry.getValue());
+            }
         }
         Iterator<WindowInstance> windowInstanceIter = instances.iterator();
         return new Iterator<WindowBaseValue>() {
@@ -159,9 +168,9 @@ public class JoinWindow extends AbstractShuffleWindow {
                 if (iterator != null && iterator.hasNext()) {
                     return true;
                 }
-                while (windowInstanceIter.hasNext()) {
+                if (windowInstanceIter.hasNext()) {
                     WindowInstance instance = windowInstanceIter.next();
-                    iterator = storage.loadWindowInstanceSplitData(null, queueId,
+                    iterator = storage.loadWindowInstanceSplitData(null, null,
                         instance.createWindowInstanceId(),
                         keyPrefix,
                         clazz);
@@ -217,17 +226,64 @@ public class JoinWindow extends AbstractShuffleWindow {
 
     }
 
-    public List<JSONObject> connectJoin(IMessage message, List<Map<String, Object>>
rows, String joinType, String rightAsName) {
+    public List<JSONObject> connectJoin(IMessage message, List<Map<String, Object>>
rows, String joinType,
+        String rightAsName) {
         List<JSONObject> result = new ArrayList<>();
         if (rows.size() <= 0) {
             return result;
         }
         if ("inner".equalsIgnoreCase(joinType)) {
             result = connectInnerJoin(message, rows, rightAsName);
+        } else if ("left".equalsIgnoreCase(joinType)) {
+            result = connectLeftJoin(message, rows, rightAsName);
+        }
+        return result;
+    }
+
+    private List<JSONObject> connectLeftJoin(IMessage message, List<Map<String,
Object>> rows, String rightAsName) {
+
+        List<JSONObject> result = new ArrayList<>();
+        String routeLabel = message.getHeader().getMsgRouteFromLable();
+        JSONObject messageBody = message.getMessageBody();
+        String traceId = message.getHeader().getTraceId();
+        int index = 1;
+        if (LABEL_LEFT.equalsIgnoreCase(routeLabel) && rows.size() > 0) {
+            for (Map<String, Object> raw : rows) {
+                //                addAsName(raw, rightAsName);
+                JSONObject object = (JSONObject)messageBody.clone();
+                object.fluentPutAll(addAsName(raw, rightAsName));
+                object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+                index++;
+                result.add(object);
+            }
+        } else if (LABEL_LEFT.equalsIgnoreCase(routeLabel) && rows.size() <= 0)
{
+            JSONObject object = (JSONObject) messageBody.clone();
+            object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+            result.add(object);
+        } else {
+            messageBody = addAsName(messageBody, rightAsName);
+            for (Map<String, Object> raw : rows) {
+                JSONObject object = (JSONObject)messageBody.clone();
+                object.fluentPutAll(raw);
+                object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+                index++;
+                result.add(object);
+            }
+        }
+
+
+
+        if (rows != null && rows.size() > 0) {
+            for (Map<String,Object> raw : rows) {
+                JSONObject object = (JSONObject) messageBody.clone();
+                object.fluentPutAll(raw);
+                result.add(object);
+            }
+            return result;
+        }
+        if (LABEL_LEFT.equalsIgnoreCase(routeLabel)) {
+            result.add(messageBody);
         }
-        //        else if ("left".equalsIgnoreCase(joinType)) {
-        //            result = connectLeftJoin(message, rows, rightAsName);
-        //        }
         return result;
     }
 
@@ -241,12 +297,16 @@ public class JoinWindow extends AbstractShuffleWindow {
     public List<JSONObject> connectInnerJoin(IMessage message, List<Map<String,
Object>> rows, String rightAsName) {
         List<JSONObject> result = new ArrayList<>();
         String routeLabel = message.getHeader().getMsgRouteFromLable();
+        String traceId = message.getHeader().getTraceId();
+        int index = 1;
         if (LABEL_LEFT.equalsIgnoreCase(routeLabel)) {
             JSONObject messageBody = message.getMessageBody();
             for (Map<String, Object> raw : rows) {
                 //                addAsName(raw, rightAsName);
                 JSONObject object = (JSONObject)messageBody.clone();
                 object.fluentPutAll(addAsName(raw, rightAsName));
+                object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+                index++;
                 result.add(object);
             }
         } else {
@@ -255,6 +315,8 @@ public class JoinWindow extends AbstractShuffleWindow {
             for (Map<String, Object> raw : rows) {
                 JSONObject object = (JSONObject)messageBody.clone();
                 object.fluentPutAll(raw);
+                object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
+                index++;
                 result.add(object);
             }
         }
@@ -284,14 +346,9 @@ public class JoinWindow extends AbstractShuffleWindow {
      */
     protected String createStoreKey(IMessage message, String routeLabel, WindowInstance windowInstance)
{
         String shuffleKey = message.getMessageBody().getString(WindowCache.SHUFFLE_KEY);
-        String shuffleId = shuffleChannel.getChannelQueue(shuffleKey).getQueueId();
         String orginQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
         String originOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
-        String windowNamespace = getNameSpace();
-        String windowName = getConfigureName();
-        String startTime = windowInstance.getStartTime();
-        String endTime = windowInstance.getEndTime();
-        String storeKey = MapKeyUtil.createKey(shuffleId, windowNamespace, windowName, startTime,
endTime, shuffleKey, routeLabel, orginQueueId, originOffset);
+        String storeKey = MapKeyUtil.createKey(windowInstance.createWindowInstanceId(), shuffleKey,
routeLabel, orginQueueId, originOffset);
         return storeKey;
     }
 
@@ -327,6 +384,8 @@ public class JoinWindow extends AbstractShuffleWindow {
         JSONObject messageBody = (JSONObject)message.getMessageBody().clone();
         messageBody.remove("WindowInstance");
         messageBody.remove("AbstractWindow");
+        messageBody.remove(WindowCache.ORIGIN_MESSAGE_HEADER);
+        messageBody.remove("MessageHeader");
 
         JoinState state = null;
         if ("left".equalsIgnoreCase(routeLabel)) {
@@ -396,47 +455,41 @@ public class JoinWindow extends AbstractShuffleWindow {
         return JoinState.class;
     }
 
-    //    @Override
-    //    public void finishWindowProcessAndSend2Receiver(List<IMessage> messageList,WindowInstance
windowInstance) {
-    //        for (IMessage message : messageList) {
-    //            List<Map<String, Object>> result = joinOperator.dealJoin(message);
-    //            List<Map<String,Object>> rows = matchRows(message.getMessageBody(),
result);
-    //            String rightAsName = message.getMessageBody().getString("rightAsName");
-    //            String joinType = message.getMessageBody().getString("joinType");
-    //            List<JSONObject> connectMsgs = joinOperator.connectJoin(message,
rows, joinType, rightAsName);
-    //            for (int i=0; i < connectMsgs.size(); i++) {
-    //                if (i == connectMsgs.size() -1) {
-    //                    sendMessage(connectMsgs.get(i), true);
-    //                } else {
-    //                    sendMessage(connectMsgs.get(i), false);
-    //                }
-    //
-    //            }
-    //
-    //        }
-    //        //todo 完成处理
-    //        //todo 发送消息到下一个节点 sendFireMessage();
-    //    }
 
     /**
      * window触发后的清理工作
-     * @param windowInstances
-     */
-    /**
-     * 删除掉触发过的数据
-     *
-     * @param instance
+     * @param windowInstance
      */
     @Override
-    public void clearFireWindowInstance(WindowInstance instance) {
-        if(instance==null){
-            return;
+    public void clearFireWindowInstance(WindowInstance windowInstance) {
+//        String partitionNum=(getOrderBypPrefix()+ windowInstance.getSplitId());
+
+        List<WindowInstance> removeInstances = new ArrayList<>();
+
+        Date clearTime = DateUtil.addSecond(DateUtil.parse(windowInstance.getStartTime()),
-sizeInterval * (retainWindowCount - 1) * 60);
+        Iterator<String> iterable = this.windowInstanceMap.keySet().iterator();
+        while (iterable.hasNext()) {
+            WindowInstance instance = this.windowInstanceMap.get(iterable.next());
+            Date startTime = DateUtil.parse(instance.getStartTime());
+            if (DateUtil.dateDiff(clearTime, startTime) >= 0) {
+                removeInstances.add(instance);
+                iterable.remove();
+            }
         }
-        WindowInstance.clearInstance(instance);
-        joinOperator.cleanMessage(instance.getWindowNameSpace(), instance.getWindowName(),
this.getRetainWindowCount(),
-            this.getSizeInterval(), instance.getStartTime());
-        //todo windowinstace
-        //todo left+right
+
+        for (WindowInstance instance : removeInstances) {
+
+            windowMaxValueManager.deleteSplitNum(instance, instance.getSplitId());
+            ShufflePartitionManager.getInstance().clearWindowInstance(instance.createWindowInstanceId());
+            storage.delete(instance.createWindowInstanceId(),null,WindowBaseValue.class,sqlCache);
+            if(!isLocalStorageOnly){
+                WindowInstance.clearInstance(instance,sqlCache);
+                joinOperator.cleanMessage(instance.getWindowNameSpace(), instance.getWindowName(),
this.getRetainWindowCount(),
+                    this.getSizeInterval(), windowInstance.getStartTime());
+            }
+        }
+
+
     }
 
     protected List<Map<String, Object>> matchRows(JSONObject msg, List<Map<String,
Object>> rows) {
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
index cb6cb4e..ba17a44 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java
@@ -67,7 +67,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
     protected static final Log LOG = LogFactory.getLog(ShuffleChannel.class);
 
     protected static final String SHUFFLE_QUEUE_ID = "SHUFFLE_QUEUE_ID";
-
+    protected static final String SHUFFLE_OFFSET = "SHUFFLE_OFFSET";
     protected static final String SHUFFLE_MESSAGES = "SHUFFLE_MESSAGES";
     protected String MSG_OWNER = "MSG_OWNER";//消息所属的window
 
@@ -158,6 +158,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
         for (Object obj : messages) {
             IMessage message = new Message((JSONObject) obj);
             message.getHeader().setQueueId(queueId);
+            message.getMessageBody().put(SHUFFLE_OFFSET, oriMessage.getHeader().getOffset());
             window.updateMaxEventTime(message);
             if (isRepeateMessage(message, queueId)) {
                 continue;
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
index b4ae904..e04bd16 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java
@@ -264,7 +264,7 @@ public class WindowValue extends WindowBaseValue implements Serializable
{
             calProjectColumn(window, message);
             String traceId = message.getMessageBody().getString(WindowCache.ORIGIN_MESSAGE_TRACE_ID);
             if (!StringUtil.isEmpty(traceId)) {
-                TraceUtil.debug(traceId, "window value result", getComputedColumnResult());
+                TraceUtil.debug(traceId, "window value result", decodeSQLContent(getComputedColumnResult()));
             }
         } catch (Exception e) {
             LOG.error("failed in calculating the message", e);

Mime
View raw message