rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq] branch enchanced_acl updated: [ISSUE #1156]new mqadmin API for ACL configuration (#1217)
Date Fri, 14 Jun 2019 04:22:15 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch enchanced_acl
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/enchanced_acl by this push:
     new a2165d8  [ISSUE #1156]new mqadmin API for ACL configuration (#1217)
a2165d8 is described below

commit a2165d80517d46a3f8be62a1a4e3d3a2290afab8
Author: Hu Zongtang <huzongtang@cmss.chinamobile.com>
AuthorDate: Fri Jun 14 12:22:10 2019 +0800

    [ISSUE #1156]new mqadmin API for ACL configuration (#1217)
    
    * [issue#1164]return the codes to original reput method part.
    
    * [issue#1164]fix issue that Consumer Instance can't consume message from slave when cluster is in the high level tps and master has been killed.
    
    * [issue#1164]if the broker is a master node,then modify reputFromOffset correctly.
    
    * [issue#1164]add some coding comments.
    
    * [ISSUE#1156]new mqadmin API for ACL configuration.Add dataVersion and write data to acl yaml config file in the acl module.
    
    * [ISSUE#1156]add unit test cases for new acl mqadmin API command which adding dataversion part.
    
    * [ISSUE#1156]implement update,delete and query Acl config version in new acl mqadmin API command.
    
    * [ISSUE#1156]polish and optimize the implementation for new acl mqadmin API command.
    
    * [ISSUE#1156]fix the issues that topicPerms and groupPerms can't be updated and there is NPE when querying acl config version.
    
    * [ISSUE#1156]fix small issue that specify wrong plainAccessConfig attribute,the correct one is AccessKey.
    
    * [ISSUE#1156]add updateGlobalWhiteAddr subcommand codes for mqadmin acl command.
    
    * [ISSUE#1156]adjust some codes for cluster acl config version list in the mqadmin acl commands.
    
    * [ISSUE#1156]add acl mqadmin command part in the acl user_guide docs.
    
    * [ISSUE#1156]polish and optimize some part of acl mqadmin command codes.
    
    * [ISSUE#1156]fix code comments issue that the first letter needs to be capitalized and adjust the contents of acl user_guide.
---
 .../org/apache/rocketmq/acl/AccessValidator.java   |  31 +++
 .../apache/rocketmq/acl/common/AclConstants.java   |  50 ++++
 .../org/apache/rocketmq/acl/common/AclUtils.java   |  34 ++-
 .../rocketmq/acl/plain/PlainAccessResource.java    |   2 +-
 .../rocketmq/acl/plain/PlainAccessValidator.java   |  28 +-
 ...sionLoader.java => PlainPermissionManager.java} | 282 ++++++++++++++-------
 .../apache/rocketmq/acl/common/AclUtilsTest.java   |  73 +++++-
 .../acl/plain/PlainAccessValidatorTest.java        | 264 +++++++++++++++++++
 ...erTest.java => PlainPermissionManagerTest.java} |  93 +++----
 acl/src/test/resources/conf/plain_acl_correct.yml  |  22 ++
 acl/src/test/resources/conf/plain_acl_delete.yml   |  22 ++
 .../conf/plain_acl_global_white_addrs.yml          |  22 ++
 .../resources/conf/plain_acl_update_create.yml     |  22 ++
 .../resources/conf/plain_acl_with_no_accouts.yml   |   3 +
 .../apache/rocketmq/broker/BrokerController.java   |   8 +-
 .../broker/processor/AdminBrokerProcessor.java     | 149 +++++++++++
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 105 ++++++++
 .../rocketmq/client/impl/MQClientAPIImplTest.java  | 132 ++++++++++
 .../apache/rocketmq/common/PlainAccessConfig.java  | 102 ++++++++
 .../java/org/apache/rocketmq/common/UtilAll.java   |  28 +-
 .../rocketmq/common/protocol/RequestCode.java      |   8 +
 .../rocketmq/common/protocol/ResponseCode.java     |   7 +
 .../protocol/body/ClusterAclVersionInfo.java       |  64 +++++
 .../header/CreateAccessConfigRequestHeader.java    | 113 +++++++++
 .../header/DeleteAccessConfigRequestHeader.java    |  42 +--
 .../header/GetBrokerAclConfigResponseHeader.java   |  71 ++++++
 .../UpdateGlobalWhiteAddrsConfigRequestHeader.java |  35 +--
 docs/cn/acl/user_guide.md                          |  73 +++++-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  24 ++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  25 ++
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |  14 +
 .../rocketmq/tools/command/MQAdminStartup.java     |  10 +
 .../acl/ClusterAclConfigVersionListSubCommand.java | 131 ++++++++++
 .../command/acl/DeleteAccessConfigSubCommand.java  | 106 ++++++++
 .../command/acl/UpdateAccessConfigSubCommand.java  | 185 ++++++++++++++
 .../acl/UpdateGlobalWhiteAddrSubCommand.java       | 101 ++++++++
 ...ClusterAclConfigVersionListSubCommandTest.java} |  28 +-
 .../DeleteAccessConfigSubCommandTest.java}         |  29 +--
 .../acl/UpdateAccessConfigSubCommandTest.java      |  89 +++++++
 .../UpdateGlobalWhiteAddrSubCommandTest.java}      |  29 +--
 .../command/topic/UpdateTopicSubCommandTest.java   |   2 -
 41 files changed, 2397 insertions(+), 261 deletions(-)

diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
index c915cf3..b87cc2f 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
@@ -17,9 +17,12 @@
 
 package org.apache.rocketmq.acl;
 
+import java.util.List;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface AccessValidator {
+
     /**
      * Parse to get the AccessResource(user, resource, needed permission)
      *
@@ -35,4 +38,32 @@ public interface AccessValidator {
      * @param accessResource
      */
     void validate(AccessResource accessResource);
+
+    /**
+     * Update the access resource config
+     *
+     * @param plainAccessConfig
+     * @return
+     */
+    boolean updateAccessConfig(PlainAccessConfig plainAccessConfig);
+
+    /**
+     * Delete the access resource config
+     *
+     * @return
+     */
+    boolean deleteAccessConfig(String accesskey);
+
+    /**
+     * Get the access resource config version information
+     *
+     * @return
+     */
+    String getAclConfigVersion();
+
+    /**
+     * Update globalWhiteRemoteAddresses in acl yaml config file
+     * @return
+     */
+    boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);
 }
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java
new file mode 100644
index 0000000..bfe96f5
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+public class AclConstants {
+
+    public static final String CONFIG_GLOBAL_WHITE_ADDRS = "globalWhiteRemoteAddresses";
+
+    public static final String CONFIG_ACCOUNTS = "accounts";
+
+    public static final String CONFIG_ACCESS_KEY = "accessKey";
+
+    public static final String CONFIG_SECRET_KEY = "secretKey";
+
+    public static final String CONFIG_WHITE_ADDR = "whiteRemoteAddress";
+
+    public static final String CONFIG_ADMIN_ROLE = "admin";
+
+    public static final String CONFIG_DEFAULT_TOPIC_PERM = "defaultTopicPerm";
+
+    public static final String CONFIG_DEFAULT_GROUP_PERM = "defaultGroupPerm";
+
+    public static final String CONFIG_TOPIC_PERMS = "topicPerms";
+
+    public static final String CONFIG_GROUP_PERMS = "groupPerms";
+
+    public static final String CONFIG_DATA_VERSION = "dataVersion";
+
+    public static final String CONFIG_COUNTER = "counter";
+
+    public static final String CONFIG_TIME_STAMP = "timestamp";
+
+    public static final int ACCESS_KEY_MIN_LENGTH = 6;
+
+    public static final int SECRET_KEY_MIN_LENGTH = 6;
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
index 39f75a3..20e1cfa 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
@@ -20,7 +20,9 @@ import com.alibaba.fastjson.JSONObject;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.Map;
 import java.util.SortedMap;
 import org.apache.commons.lang3.StringUtils;
@@ -48,7 +50,7 @@ public class AclUtils {
 
             return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
         } catch (Exception e) {
-            throw new RuntimeException("incompatible exception.", e);
+            throw new RuntimeException("Incompatible exception.", e);
         }
     }
 
@@ -69,7 +71,7 @@ public class AclUtils {
 
     public static void verify(String netaddress, int index) {
         if (!AclUtils.isScope(netaddress, index)) {
-            throw new AclException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
+            throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
         }
     }
 
@@ -127,11 +129,11 @@ public class AclUtils {
     }
 
     public static <T> T getYamlDataObject(String path, Class<T> clazz) {
-        Yaml ymal = new Yaml();
+        Yaml yaml = new Yaml();
         FileInputStream fis = null;
         try {
             fis = new FileInputStream(new File(path));
-            return ymal.loadAs(fis, clazz);
+            return yaml.loadAs(fis, clazz);
         } catch (FileNotFoundException ignore) {
             return null;
         } catch (Exception e) {
@@ -146,13 +148,31 @@ public class AclUtils {
         }
     }
 
+    public static boolean writeDataObject(String path, Map<String,Object> dataMap) {
+        Yaml yaml = new Yaml();
+        PrintWriter pw = null;
+        try {
+            pw = new PrintWriter(new FileWriter(path));
+            String dumpAsMap = yaml.dumpAsMap(dataMap);
+            pw.print(dumpAsMap);
+            pw.flush();
+        } catch (Exception e) {
+            throw new AclException(e.getMessage());
+        } finally {
+            if (pw != null) {
+                pw.close();
+            }
+        }
+        return true;
+    }
+
     public static RPCHook getAclRPCHook(String fileName) {
         JSONObject yamlDataObject = null;
         try {
             yamlDataObject = AclUtils.getYamlDataObject(fileName,
                 JSONObject.class);
         } catch (Exception e) {
-            log.error("convert yaml file to data object error, ",e);
+            log.error("Convert yaml file to data object error, ",e);
             return null;
         }
 
@@ -161,8 +181,8 @@ public class AclUtils {
             return null;
         }
         
-        String accessKey = yamlDataObject.getString("accessKey");
-        String secretKey = yamlDataObject.getString("secretKey");
+        String accessKey = yamlDataObject.getString(AclConstants.CONFIG_ACCESS_KEY);
+        String secretKey = yamlDataObject.getString(AclConstants.CONFIG_SECRET_KEY);
 
         if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
             log.warn("AccessKey or secretKey is blank, the acl is not enabled.");
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index 00072e8..a0cceed 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -43,7 +43,7 @@ public class PlainAccessResource implements AccessResource {
 
     private int requestCode;
 
-    //the content to calculate the content
+    // The content to calculate the content
     private byte[] content;
 
     private String signature;
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
index 11b2a43..c8ce239 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.acl.plain;
 
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -25,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException;
 import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.acl.common.Permission;
 import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
@@ -38,10 +40,10 @@ import static org.apache.rocketmq.acl.plain.PlainAccessResource.getRetryTopic;
 
 public class PlainAccessValidator implements AccessValidator {
 
-    private PlainPermissionLoader aclPlugEngine;
+    private PlainPermissionManager aclPlugEngine;
 
     public PlainAccessValidator() {
-        aclPlugEngine = new PlainPermissionLoader();
+        aclPlugEngine = new PlainPermissionManager();
     }
 
     @Override
@@ -56,8 +58,8 @@ public class PlainAccessValidator implements AccessValidator {
         accessResource.setRequestCode(request.getCode());
 
         if (request.getExtFields() == null) {
-            //If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern)
-            //The following logic codes depend on the request's extFields not to be null.
+            // If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern)
+            // The following logic codes depend on the request's extFields not to be null.
             return accessResource;
         }
         accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
@@ -135,4 +137,22 @@ public class PlainAccessValidator implements AccessValidator {
         aclPlugEngine.validate((PlainAccessResource) accessResource);
     }
 
+    @Override
+    public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) {
+        return aclPlugEngine.updateAccessConfig(plainAccessConfig);
+    }
+
+    @Override
+    public boolean deleteAccessConfig(String accesskey) {
+        return aclPlugEngine.deleteAccessConfig(accesskey);
+    }
+
+    @Override public String getAclConfigVersion() {
+        return aclPlugEngine.getAclConfigDataVersion();
+    }
+
+    @Override public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
+        return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
+    }
+
 }
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
similarity index 53%
rename from acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java
rename to acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
index e97825d..fc7f0f3 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
@@ -21,27 +21,29 @@ import com.alibaba.fastjson.JSONObject;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclConstants;
 import org.apache.rocketmq.acl.common.AclException;
 import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.acl.common.Permission;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.srvutil.FileWatchService;
 
-public class PlainPermissionLoader {
+public class PlainPermissionManager {
 
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
 
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
     private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
         System.getenv(MixAll.ROCKETMQ_HOME_ENV));
 
@@ -55,7 +57,9 @@ public class PlainPermissionLoader {
 
     private boolean isWatchStart;
 
-    public PlainPermissionLoader() {
+    private final DataVersion dataVersion = new DataVersion();
+
+    public PlainPermissionManager() {
         load();
         watch();
     }
@@ -67,7 +71,6 @@ public class PlainPermissionLoader {
 
         JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
             JSONObject.class);
-
         if (plainAclConfData == null || plainAclConfData.isEmpty()) {
             throw new AclException(String.format("%s file  is not data", fileHome + File.separator + fileName));
         }
@@ -80,7 +83,7 @@ public class PlainPermissionLoader {
             }
         }
 
-        JSONArray accounts = plainAclConfData.getJSONArray("accounts");
+        JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS);
         if (accounts != null && !accounts.isEmpty()) {
             List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
             for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
@@ -89,10 +92,184 @@ public class PlainPermissionLoader {
             }
         }
 
+        // For loading dataversion part just
+        JSONArray tempDataVersion = plainAclConfData.getJSONArray(AclConstants.CONFIG_DATA_VERSION);
+        if (tempDataVersion != null && !tempDataVersion.isEmpty()) {
+            List<DataVersion> dataVersion = tempDataVersion.toJavaList(DataVersion.class);
+            DataVersion firstElement = dataVersion.get(0);
+            this.dataVersion.assignNewOne(firstElement);
+        }
+
         this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
         this.plainAccessResourceMap = plainAccessResourceMap;
     }
 
+    public String getAclConfigDataVersion() {
+        return this.dataVersion.toJson();
+    }
+
+    private Map<String, Object> updateAclConfigFileVersion(Map<String, Object> updateAclConfigMap) {
+
+        dataVersion.nextVersion();
+        List<Map<String, Object>> versionElement = new ArrayList<Map<String, Object>>();
+        Map<String, Object> accountsMap = new LinkedHashMap<String, Object>() {
+            {
+                put(AclConstants.CONFIG_COUNTER, dataVersion.getCounter().longValue());
+                put(AclConstants.CONFIG_TIME_STAMP, dataVersion.getTimestamp());
+            }
+        };
+        versionElement.add(accountsMap);
+        updateAclConfigMap.put(AclConstants.CONFIG_DATA_VERSION, versionElement);
+        return updateAclConfigMap;
+    }
+
+    public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) {
+
+        if (plainAccessConfig == null) {
+            log.error("Parameter value plainAccessConfig is null,Please check your parameter");
+            return false;
+        }
+
+        Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
+            Map.class);
+
+        List<Map<String, Object>> accounts = (List<Map<String, Object>>) aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+        Map<String, Object> updateAccountMap = null;
+        if (accounts != null) {
+            for (Map<String, Object> account : accounts) {
+                if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
+                    // Update acl access config elements
+                    accounts.remove(account);
+                    updateAccountMap = createAclAccessConfigMap(account, plainAccessConfig);
+                    accounts.add(updateAccountMap);
+                    aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+
+                    if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
+                        return true;
+                    }
+                    return false;
+                }
+            }
+            // Create acl access config elements
+            accounts.add(createAclAccessConfigMap(null, plainAccessConfig));
+            aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+            if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
+                return true;
+            }
+            return false;
+        }
+
+        log.error("Users must ensure that the acl yaml config file has accounts node element");
+        return false;
+    }
+
+    private Map<String, Object> createAclAccessConfigMap(Map<String, Object> existedAccoutMap, PlainAccessConfig plainAccessConfig) {
+
+
+        Map<String, Object> newAccountsMap = null;
+        if (existedAccoutMap == null) {
+            newAccountsMap = new LinkedHashMap<String, Object>();
+        } else {
+            newAccountsMap = existedAccoutMap;
+        }
+
+        if (StringUtils.isEmpty(plainAccessConfig.getAccessKey()) ||
+            plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH) {
+            throw new AclException(String.format(
+                    "The accessKey=%s cannot be null and length should longer than 6",
+                    plainAccessConfig.getAccessKey()));
+        }
+        newAccountsMap.put(AclConstants.CONFIG_ACCESS_KEY, plainAccessConfig.getAccessKey());
+
+        if (!StringUtils.isEmpty(plainAccessConfig.getSecretKey())) {
+            if (plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) {
+                throw new AclException(String.format(
+                    "The secretKey=%s value length should longer than 6",
+                    plainAccessConfig.getSecretKey()));
+            }
+            newAccountsMap.put(AclConstants.CONFIG_SECRET_KEY, (String) plainAccessConfig.getSecretKey());
+        }
+        if (!StringUtils.isEmpty(plainAccessConfig.getWhiteRemoteAddress())) {
+            newAccountsMap.put(AclConstants.CONFIG_WHITE_ADDR, plainAccessConfig.getWhiteRemoteAddress());
+        }
+        if (!StringUtils.isEmpty(String.valueOf(plainAccessConfig.isAdmin()))) {
+            newAccountsMap.put(AclConstants.CONFIG_ADMIN_ROLE, plainAccessConfig.isAdmin());
+        }
+        if (!StringUtils.isEmpty(plainAccessConfig.getDefaultTopicPerm())) {
+            newAccountsMap.put(AclConstants.CONFIG_DEFAULT_TOPIC_PERM, plainAccessConfig.getDefaultTopicPerm());
+        }
+        if (!StringUtils.isEmpty(plainAccessConfig.getDefaultGroupPerm())) {
+            newAccountsMap.put(AclConstants.CONFIG_DEFAULT_GROUP_PERM, plainAccessConfig.getDefaultGroupPerm());
+        }
+        if (plainAccessConfig.getTopicPerms() != null && !plainAccessConfig.getTopicPerms().isEmpty()) {
+            newAccountsMap.put(AclConstants.CONFIG_TOPIC_PERMS, plainAccessConfig.getTopicPerms());
+        }
+        if (plainAccessConfig.getGroupPerms() != null && !plainAccessConfig.getGroupPerms().isEmpty()) {
+            newAccountsMap.put(AclConstants.CONFIG_GROUP_PERMS, plainAccessConfig.getGroupPerms());
+        }
+
+        return newAccountsMap;
+    }
+
+    public boolean deleteAccessConfig(String accesskey) {
+        if (StringUtils.isEmpty(accesskey)) {
+            log.error("Parameter value accesskey is null or empty String,Please check your parameter");
+            return false;
+        }
+
+        Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
+                    Map.class);
+
+        List<Map<String, Object>> accounts = (List<Map<String, Object>>) aclAccessConfigMap.get("accounts");
+        if (accounts != null) {
+            Iterator<Map<String, Object>> itemIterator = accounts.iterator();
+            while (itemIterator.hasNext()) {
+
+                if (itemIterator.next().get(AclConstants.CONFIG_ACCESS_KEY).equals(accesskey)) {
+                    // Delete the related acl config element
+                    itemIterator.remove();
+                    aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+
+                    if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
+                        return true;
+                    }
+                    return false;
+                }
+            }
+        }
+        log.error("Users must ensure that the acl yaml config file has related acl config elements");
+
+        return false;
+    }
+
+    public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
+
+        if (globalWhiteAddrsList == null) {
+            log.error("Parameter value globalWhiteAddrsList is null,Please check your parameter");
+            return false;
+        }
+
+        Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
+            Map.class);
+
+        List<String> globalWhiteRemoteAddrList = (List<String>) aclAccessConfigMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
+
+        if (globalWhiteRemoteAddrList != null) {
+            globalWhiteRemoteAddrList.clear();
+            globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList);
+
+            // Update globalWhiteRemoteAddr element in memeory map firstly
+            aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS,globalWhiteRemoteAddrList);
+            if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
+                return true;
+            }
+            return false;
+        }
+
+        log.error("Users must ensure that the acl yaml config file has globalWhiteRemoteAddresses flag firstly");
+        return false;
+    }
+
     private void watch() {
         try {
             String watchFilePath = fileHome + fileName;
@@ -156,8 +333,8 @@ public class PlainPermissionLoader {
     public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException {
         if (plainAccessConfig.getAccessKey() == null
             || plainAccessConfig.getSecretKey() == null
-            || plainAccessConfig.getAccessKey().length() <= 6
-            || plainAccessConfig.getSecretKey().length() <= 6) {
+            || plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH
+            || plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) {
             throw new AclException(String.format(
                 "The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
                     plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey()));
@@ -217,89 +394,4 @@ public class PlainPermissionLoader {
     public boolean isWatchStart() {
         return isWatchStart;
     }
-
-    static class PlainAccessConfig {
-
-        private String accessKey;
-
-        private String secretKey;
-
-        private String whiteRemoteAddress;
-
-        private boolean admin;
-
-        private String defaultTopicPerm;
-
-        private String defaultGroupPerm;
-
-        private List<String> topicPerms;
-
-        private List<String> groupPerms;
-
-        public String getAccessKey() {
-            return accessKey;
-        }
-
-        public void setAccessKey(String accessKey) {
-            this.accessKey = accessKey;
-        }
-
-        public String getSecretKey() {
-            return secretKey;
-        }
-
-        public void setSecretKey(String secretKey) {
-            this.secretKey = secretKey;
-        }
-
-        public String getWhiteRemoteAddress() {
-            return whiteRemoteAddress;
-        }
-
-        public void setWhiteRemoteAddress(String whiteRemoteAddress) {
-            this.whiteRemoteAddress = whiteRemoteAddress;
-        }
-
-        public boolean isAdmin() {
-            return admin;
-        }
-
-        public void setAdmin(boolean admin) {
-            this.admin = admin;
-        }
-
-        public String getDefaultTopicPerm() {
-            return defaultTopicPerm;
-        }
-
-        public void setDefaultTopicPerm(String defaultTopicPerm) {
-            this.defaultTopicPerm = defaultTopicPerm;
-        }
-
-        public String getDefaultGroupPerm() {
-            return defaultGroupPerm;
-        }
-
-        public void setDefaultGroupPerm(String defaultGroupPerm) {
-            this.defaultGroupPerm = defaultGroupPerm;
-        }
-
-        public List<String> getTopicPerms() {
-            return topicPerms;
-        }
-
-        public void setTopicPerms(List<String> topicPerms) {
-            this.topicPerms = topicPerms;
-        }
-
-        public List<String> getGroupPerms() {
-            return groupPerms;
-        }
-
-        public void setGroupPerms(List<String> groupPerms) {
-            this.groupPerms = groupPerms;
-        }
-
-    }
-
 }
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
index 4883afa..5b2627d 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
@@ -17,7 +17,11 @@
 package org.apache.rocketmq.acl.common;
 
 import com.alibaba.fastjson.JSONObject;
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
@@ -131,11 +135,78 @@ public class AclUtilsTest {
     @Test
     public void getYamlDataObjectTest() {
 
-        Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl.yml", Map.class);
+        Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl_correct.yml", Map.class);
         Assert.assertFalse(map.isEmpty());
     }
 
     @Test
+    public void writeDataObject2YamlFileTest() throws IOException{
+
+        String targetFileName = "src/test/resources/conf/plain_write_acl.yml";
+        File transport = new File(targetFileName);
+        transport.delete();
+        transport.createNewFile();
+
+        Map<String, Object> aclYamlMap = new HashMap<String, Object>();
+
+        // For globalWhiteRemoteAddrs element in acl yaml config file
+        List<String> globalWhiteRemoteAddrs = new ArrayList<String>();
+        globalWhiteRemoteAddrs.add("10.10.103.*");
+        globalWhiteRemoteAddrs.add("192.168.0.*");
+        aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs);
+
+        // For accounts element in acl yaml config file
+        List<Map<String, Object>> accounts = new ArrayList<Map<String, Object>>();
+        Map<String, Object> accountsMap = new LinkedHashMap<String, Object>() {
+            {
+                put("accessKey", "RocketMQ");
+                put("secretKey", "12345678");
+                put("whiteRemoteAddress", "whiteRemoteAddress");
+                put("admin", "true");
+            }
+        };
+        accounts.add(accountsMap);
+        aclYamlMap.put("accounts",accounts);
+        Assert.assertTrue(AclUtils.writeDataObject(targetFileName, aclYamlMap));
+
+        transport.delete();
+    }
+
+    @Test
+    public void updateExistedYamlFileTest()  throws IOException{
+
+        String targetFileName = "src/test/resources/conf/plain_update_acl.yml";
+        File transport = new File(targetFileName);
+        transport.delete();
+        transport.createNewFile();
+
+        Map<String, Object> aclYamlMap = new HashMap<String, Object>();
+
+        // For globalWhiteRemoteAddrs element in acl yaml config file
+        List<String> globalWhiteRemoteAddrs = new ArrayList<String>();
+        globalWhiteRemoteAddrs.add("10.10.103.*");
+        globalWhiteRemoteAddrs.add("192.168.0.*");
+        aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs);
+
+        // Write file to yaml file
+        AclUtils.writeDataObject(targetFileName, aclYamlMap);
+
+        Map<String, Object> updatedMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+        List<String> globalWhiteRemoteAddrList = (List<String>) updatedMap.get("globalWhiteRemoteAddrs");
+        globalWhiteRemoteAddrList.clear();
+        globalWhiteRemoteAddrList.add("192.168.1.2");
+
+        // Update file and flush to yaml file
+        AclUtils.writeDataObject(targetFileName, updatedMap);
+
+        Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+        List<String> updatedGlobalWhiteRemoteAddrs = (List<String>) readableMap.get("globalWhiteRemoteAddrs");
+        Assert.assertEquals("192.168.1.2",updatedGlobalWhiteRemoteAddrs.get(0));
+
+        transport.delete();
+    }
+
+    @Test
     public void getYamlDataIgnoreFileNotFoundExceptionTest() {
 
         JSONObject yamlDataObject = AclUtils.getYamlDataObject("plain_acl.yml", JSONObject.class);
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
index b7cdb69..bca9075 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
@@ -16,14 +16,20 @@
  */
 package org.apache.rocketmq.acl.plain;
 
+
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.AclConstants;
 import org.apache.rocketmq.acl.common.AclException;
 import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.header.*;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
@@ -297,4 +303,262 @@ public class PlainAccessValidatorTest {
         PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress);
         plainAccessValidator.validate(accessResource);
     }
+
+    @Test
+    public void updateAccessAclYamlConfigNormalTest() {
+        System.setProperty("rocketmq.home.dir", "src/test/resources");
+        System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
+
+        String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
+        Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+        PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+        plainAccessConfig.setAccessKey("RocketMQ");
+        plainAccessConfig.setSecretKey("1234567890");
+        plainAccessConfig.setDefaultGroupPerm("PUB");
+        plainAccessConfig.setDefaultTopicPerm("SUB");
+        List<String> topicPerms = new ArrayList<String>();
+        topicPerms.add("topicC=PUB|SUB");
+        topicPerms.add("topicB=PUB");
+        plainAccessConfig.setTopicPerms(topicPerms);
+        List<String> groupPerms = new ArrayList<String>();
+        groupPerms.add("groupB=PUB|SUB");
+        groupPerms.add("groupC=DENY");
+        plainAccessConfig.setGroupPerms(groupPerms);
+
+        PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+        // Update acl access yaml config file
+        plainAccessValidator.updateAccessConfig(plainAccessConfig);
+
+        Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+        List<Map<String, Object>> accounts =  (List<Map<String, Object>>)readableMap.get("accounts");
+        Map<String, Object> verifyMap = null;
+        for (Map<String, Object> account : accounts) {
+            if (account.get("accessKey").equals(plainAccessConfig.getAccessKey())) {
+                verifyMap = account;
+                break;
+            }
+        }
+
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"1234567890");
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"SUB");
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB");
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_ADMIN_ROLE),false);
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_WHITE_ADDR),"192.168.0.*");
+        Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2);
+        Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2);
+
+        // Verify the dateversion element is correct or not
+        List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get("dataVersion");
+        Assert.assertEquals(1,dataVersions.get(0).get("counter"));
+
+        // Restore the backup file and flush to yaml file
+        AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+    }
+
+    @Test
+    public void updateAccessAclYamlConfigTest() {
+        System.setProperty("rocketmq.home.dir", "src/test/resources");
+        System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
+
+        String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
+        Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+        PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+        plainAccessConfig.setAccessKey("RocketMQ");
+        plainAccessConfig.setSecretKey("123456789111");
+
+        PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+        // Update element in the acl access yaml config file
+        plainAccessValidator.updateAccessConfig(plainAccessConfig);
+
+        Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+        List<Map<String, Object>> accounts =  (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+        Map<String, Object> verifyMap = null;
+        for (Map<String, Object> account : accounts) {
+            if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
+                verifyMap = account;
+                break;
+            }
+        }
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111");
+
+        // Restore the backup file and flush to yaml file
+        AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+    }
+
+
+    @Test
+    public void createAndUpdateAccessAclYamlConfigNormalTest() {
+        System.setProperty("rocketmq.home.dir", "src/test/resources");
+        System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
+
+        String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
+        Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+        PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+        plainAccessConfig.setAccessKey("RocketMQ33");
+        plainAccessConfig.setSecretKey("123456789111");
+        plainAccessConfig.setDefaultGroupPerm("PUB");
+        plainAccessConfig.setDefaultTopicPerm("DENY");
+        List<String> topicPerms = new ArrayList<String>();
+        topicPerms.add("topicC=PUB|SUB");
+        topicPerms.add("topicB=PUB");
+        plainAccessConfig.setTopicPerms(topicPerms);
+        List<String> groupPerms = new ArrayList<String>();
+        groupPerms.add("groupB=PUB|SUB");
+        groupPerms.add("groupC=DENY");
+        plainAccessConfig.setGroupPerms(groupPerms);
+
+        PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+        // Create element in the acl access yaml config file
+        plainAccessValidator.updateAccessConfig(plainAccessConfig);
+
+        Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+        List<Map<String, Object>> accounts =  (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+        Map<String, Object> verifyMap = null;
+        for (Map<String, Object> account : accounts) {
+            if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
+                verifyMap = account;
+                break;
+            }
+        }
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111");
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"DENY");
+        Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB");
+        Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2);
+        Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2);
+        Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicC=PUB|SUB"));
+        Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicB=PUB"));
+        Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupB=PUB|SUB"));
+        Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupC=DENY"));
+
+        // Verify the dateversion element is correct or not
+        List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
+        Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+
+        // Update element in the acl config yaml file
+        PlainAccessConfig plainAccessConfig2 = new PlainAccessConfig();
+        plainAccessConfig2.setAccessKey("rocketmq2");
+        plainAccessConfig2.setSecretKey("1234567890123");
+
+        // Update acl access yaml config file secondly
+        plainAccessValidator.updateAccessConfig(plainAccessConfig2);
+
+        Map<String, Object> readableMap2 = AclUtils.getYamlDataObject(targetFileName, Map.class);
+        List<Map<String, Object>> accounts2 =  (List<Map<String, Object>>)readableMap2.get(AclConstants.CONFIG_ACCOUNTS);
+        Map<String, Object> verifyMap2 = null;
+        for (Map<String, Object> account : accounts2) {
+            if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig2.getAccessKey())) {
+                verifyMap2 = account;
+                break;
+            }
+        }
+
+        // Verify the dateversion element after updating is correct or not
+        List<Map<String, Object>> dataVersions2 = (List<Map<String, Object>>) readableMap2.get(AclConstants.CONFIG_DATA_VERSION);
+        Assert.assertEquals(2,dataVersions2.get(0).get(AclConstants.CONFIG_COUNTER));
+        Assert.assertEquals(verifyMap2.get(AclConstants.CONFIG_SECRET_KEY),"1234567890123");
+
+
+        // Restore the backup file and flush to yaml file
+        AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+    }
+
+    @Test(expected = AclException.class)
+    public void updateAccessAclYamlConfigExceptionTest() {
+        System.setProperty("rocketmq.home.dir", "src/test/resources");
+        System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
+
+        PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+        plainAccessConfig.setAccessKey("RocketMQ");
+        plainAccessConfig.setSecretKey("12345");
+
+        PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+        // Update acl access yaml config file
+        plainAccessValidator.updateAccessConfig(plainAccessConfig);
+    }
+
+    @Test
+    public void deleteAccessAclYamlConfigNormalTest() {
+        System.setProperty("rocketmq.home.dir", "src/test/resources");
+        System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_delete.yml");
+
+        String targetFileName = "src/test/resources/conf/plain_acl_delete.yml";
+        Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+
+        String accessKey = "rocketmq2";
+        PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+        plainAccessValidator.deleteAccessConfig(accessKey);
+
+        Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+        List<Map<String, Object>> accounts =  (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+        Map<String, Object> verifyMap = null;
+        for (Map<String, Object> account : accounts) {
+            if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(accessKey)) {
+                verifyMap = account;
+                break;
+            }
+        }
+
+        // Verify the specified element is removed or not
+        Assert.assertEquals(verifyMap,null);
+        // Verify the dateversion element is correct or not
+        List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
+        Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+        
+        // Restore the backup file and flush to yaml file
+        AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+    }
+
+    @Test
+    public void updateAccessAclYamlConfigWithNoAccoutsExceptionTest() {
+        System.setProperty("rocketmq.home.dir", "src/test/resources");
+        System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_with_no_accouts.yml");
+
+        String targetFileName = "src/test/resources/conf/plain_acl_with_no_accouts.yml";
+        Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+        PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+        plainAccessConfig.setAccessKey("RocketMQ");
+        plainAccessConfig.setSecretKey("1234567890");
+
+        PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+        // Update acl access yaml config file and verify the return value is true
+        Assert.assertEquals(plainAccessValidator.updateAccessConfig(plainAccessConfig), false);
+    }
+
+    @Test
+    public void updateGlobalWhiteAddrsNormalTest() {
+        System.setProperty("rocketmq.home.dir", "src/test/resources");
+        System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_global_white_addrs.yml");
+
+        String targetFileName = "src/test/resources/conf/plain_acl_global_white_addrs.yml";
+        Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+        PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+        // Update global white remote addr value list in the acl access yaml config file
+
+        List<String> globalWhiteAddrsList = new ArrayList<String>();
+        globalWhiteAddrsList.add("10.10.154.1");
+        globalWhiteAddrsList.add("10.10.154.2");
+        globalWhiteAddrsList.add("10.10.154.3");
+        plainAccessValidator.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
+
+        Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+        List<String> globalWhiteAddrList =  (List<String>)readableMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
+        Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.1"));
+        Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.2"));
+        Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.3"));
+
+        // Verify the dateversion element is correct or not
+        List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
+        Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+
+        // Restore the backup file and flush to yaml file
+        AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+    }
+
 }
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
similarity index 75%
rename from acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java
rename to acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
index 575c901..d5ffb0c 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
@@ -26,32 +26,31 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.acl.common.Permission;
-import org.apache.rocketmq.acl.plain.PlainPermissionLoader.PlainAccessConfig;
-import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-public class PlainPermissionLoaderTest {
+public class PlainPermissionManagerTest {
 
-    PlainPermissionLoader plainPermissionLoader;
+    PlainPermissionManager plainPermissionManager;
     PlainAccessResource PUBPlainAccessResource;
     PlainAccessResource SUBPlainAccessResource;
     PlainAccessResource ANYPlainAccessResource;
     PlainAccessResource DENYPlainAccessResource;
     PlainAccessResource plainAccessResource = new PlainAccessResource();
     PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
-    PlainAccessResource plainAccessResourceTwo = new PlainAccessResource();
     Set<Integer> adminCode = new HashSet<>();
 
     @Before
     public void init() throws NoSuchFieldException, SecurityException, IOException {
-        //  UPDATE_AND_CREATE_TOPIC
+        // UPDATE_AND_CREATE_TOPIC
         adminCode.add(17);
-        //  UPDATE_BROKER_CONFIG
+        // UPDATE_BROKER_CONFIG
         adminCode.add(25);
-        //  DELETE_TOPIC_IN_BROKER
+        // DELETE_TOPIC_IN_BROKER
         adminCode.add(215);
         // UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
         adminCode.add(200);
@@ -65,7 +64,8 @@ public class PlainPermissionLoaderTest {
 
         System.setProperty("rocketmq.home.dir", "src/test/resources");
         System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
-        plainPermissionLoader = new PlainPermissionLoader();
+        
+        plainPermissionManager = new PlainPermissionManager();
 
     }
 
@@ -95,16 +95,16 @@ public class PlainPermissionLoaderTest {
 
         plainAccess.setAccessKey("RocketMQ");
         plainAccess.setSecretKey("12345678");
-        plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+        plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
         Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
         Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678");
 
         plainAccess.setWhiteRemoteAddress("127.0.0.1");
-        plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+        plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
         Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1");
 
         plainAccess.setAdmin(true);
-        plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+        plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
         Assert.assertEquals(plainAccessResource.isAdmin(), true);
 
         List<String> groups = new ArrayList<String>();
@@ -112,7 +112,7 @@ public class PlainPermissionLoaderTest {
         groups.add("groupB=PUB|SUB");
         groups.add("groupC=PUB");
         plainAccess.setGroupPerms(groups);
-        plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+        plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
         Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
         Assert.assertEquals(resourcePermMap.size(), 3);
 
@@ -125,7 +125,7 @@ public class PlainPermissionLoaderTest {
         topics.add("topicB=PUB|SUB");
         topics.add("topicC=PUB");
         plainAccess.setTopicPerms(topics);
-        plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+        plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
         resourcePermMap = plainAccessResource.getResourcePermMap();
         Assert.assertEquals(resourcePermMap.size(), 6);
 
@@ -138,7 +138,7 @@ public class PlainPermissionLoaderTest {
     public void checkPermAdmin() {
         PlainAccessResource plainAccessResource = new PlainAccessResource();
         plainAccessResource.setRequestCode(17);
-        plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
+        plainPermissionManager.checkPerm(plainAccessResource, PUBPlainAccessResource);
     }
 
     @Test
@@ -146,15 +146,15 @@ public class PlainPermissionLoaderTest {
 
         PlainAccessResource plainAccessResource = new PlainAccessResource();
         plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
-        plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
+        plainPermissionManager.checkPerm(plainAccessResource, PUBPlainAccessResource);
         plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
-        plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
+        plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource);
 
         plainAccessResource = new PlainAccessResource();
         plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
-        plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
+        plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource);
         plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
-        plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
+        plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource);
 
     }
     @Test(expected = AclException.class)
@@ -162,55 +162,58 @@ public class PlainPermissionLoaderTest {
 
         plainAccessResource = new PlainAccessResource();
         plainAccessResource.addResourceAndPerm("topicF", Permission.PUB);
-        plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
+        plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource);
     }
     @Test(expected = AclException.class)
     public void accountNullTest() {
         plainAccessConfig.setAccessKey(null);
-        plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
+        plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
     }
 
     @Test(expected = AclException.class)
     public void accountThanTest() {
         plainAccessConfig.setAccessKey("123");
-        plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
+        plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
     }
 
     @Test(expected = AclException.class)
     public void passWordtNullTest() {
         plainAccessConfig.setAccessKey(null);
-        plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
+        plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
     }
 
     @Test(expected = AclException.class)
     public void passWordThanTest() {
         plainAccessConfig.setAccessKey("123");
-        plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
+        plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
     }
 
     @Test(expected = AclException.class)
     public void testPlainAclPlugEngineInit() {
         System.setProperty("rocketmq.home.dir", "");
-        new PlainPermissionLoader().load();
+        new PlainPermissionManager().load();
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void cleanAuthenticationInfoTest() throws IllegalAccessException {
-        //plainPermissionLoader.addPlainAccessResource(plainAccessResource);
-        Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+        // PlainPermissionManager.addPlainAccessResource(plainAccessResource);
+        Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
         Assert.assertFalse(plainAccessResourceMap.isEmpty());
 
-        plainPermissionLoader.clearPermissionInfo();
-        plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+        plainPermissionManager.clearPermissionInfo();
+        plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
         Assert.assertTrue(plainAccessResourceMap.isEmpty());
+        // RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml");
     }
 
     @Test
     public void isWatchStartTest() {
 
-        PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
-        Assert.assertTrue(plainPermissionLoader.isWatchStart());
+        PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
+        Assert.assertTrue(plainPermissionManager.isWatchStart());
+        // RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml");
+
     }
 
 
@@ -231,11 +234,12 @@ public class PlainPermissionLoaderTest {
         writer.flush();
         writer.close();
 
-        PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
-        Assert.assertTrue(plainPermissionLoader.isWatchStart());
+
+        PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
+        Assert.assertTrue(plainPermissionManager.isWatchStart());
 
         {
-            Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+            Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
             PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq");
             Assert.assertNotNull(accessResource);
             Assert.assertEquals(accessResource.getSecretKey(), "12345678");
@@ -243,17 +247,19 @@ public class PlainPermissionLoaderTest {
 
         }
 
-        writer = new FileWriter(new File(fileName), true);
-        writer.write("- accessKey: watchrocketmq1\r\n");
-        writer.write("  secretKey: 88888888\r\n");
-        writer.write("  whiteRemoteAddress: 127.0.0.1\r\n");
-        writer.write("  admin: false\r\n");
-        writer.flush();
-        writer.close();
+        Map<String, Object> updatedMap = AclUtils.getYamlDataObject(fileName, Map.class);
+        List<Map<String, Object>> accounts = (List<Map<String, Object>>) updatedMap.get("accounts");
+        accounts.get(0).remove("accessKey");
+        accounts.get(0).remove("secretKey");
+        accounts.get(0).put("accessKey", "watchrocketmq1");
+        accounts.get(0).put("secretKey", "88888888");
+        accounts.get(0).put("admin", "false");
+        // Update file and flush to yaml file
+        AclUtils.writeDataObject(fileName, updatedMap);
 
         Thread.sleep(1000);
         {
-            Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+            Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
             PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1");
             Assert.assertNotNull(accessResource);
             Assert.assertEquals(accessResource.getSecretKey(), "88888888");
@@ -268,8 +274,7 @@ public class PlainPermissionLoaderTest {
     @Test(expected = AclException.class)
     public void initializeTest() {
         System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml");
-        new PlainPermissionLoader();
+        new PlainPermissionManager();
 
     }
-
 }
diff --git a/acl/src/test/resources/conf/plain_acl_correct.yml b/acl/src/test/resources/conf/plain_acl_correct.yml
new file mode 100644
index 0000000..40d66d9
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_correct.yml
@@ -0,0 +1,22 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+accounts:
+- accessKey: RocketMQ
+  secretKey: 12345678
+  whiteRemoteAddress: 192.168.0.*
+  admin: false
+  defaultTopicPerm: DENY
+  defaultGroupPerm: SUB
+  topicPerms:
+  - topicA=DENY
+  - topicB=PUB|SUB
+  - topicC=SUB
+  groupPerms:
+  - groupA=DENY
+  - groupB=SUB
+  - groupC=SUB
+- accessKey: rocketmq2
+  secretKey: 12345678
+  whiteRemoteAddress: 192.168.1.*
+  admin: true
diff --git a/acl/src/test/resources/conf/plain_acl_delete.yml b/acl/src/test/resources/conf/plain_acl_delete.yml
new file mode 100644
index 0000000..40d66d9
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_delete.yml
@@ -0,0 +1,22 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+accounts:
+- accessKey: RocketMQ
+  secretKey: 12345678
+  whiteRemoteAddress: 192.168.0.*
+  admin: false
+  defaultTopicPerm: DENY
+  defaultGroupPerm: SUB
+  topicPerms:
+  - topicA=DENY
+  - topicB=PUB|SUB
+  - topicC=SUB
+  groupPerms:
+  - groupA=DENY
+  - groupB=SUB
+  - groupC=SUB
+- accessKey: rocketmq2
+  secretKey: 12345678
+  whiteRemoteAddress: 192.168.1.*
+  admin: true
diff --git a/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml b/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml
new file mode 100644
index 0000000..40d66d9
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml
@@ -0,0 +1,22 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+accounts:
+- accessKey: RocketMQ
+  secretKey: 12345678
+  whiteRemoteAddress: 192.168.0.*
+  admin: false
+  defaultTopicPerm: DENY
+  defaultGroupPerm: SUB
+  topicPerms:
+  - topicA=DENY
+  - topicB=PUB|SUB
+  - topicC=SUB
+  groupPerms:
+  - groupA=DENY
+  - groupB=SUB
+  - groupC=SUB
+- accessKey: rocketmq2
+  secretKey: 12345678
+  whiteRemoteAddress: 192.168.1.*
+  admin: true
diff --git a/acl/src/test/resources/conf/plain_acl_update_create.yml b/acl/src/test/resources/conf/plain_acl_update_create.yml
new file mode 100644
index 0000000..40d66d9
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_update_create.yml
@@ -0,0 +1,22 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+accounts:
+- accessKey: RocketMQ
+  secretKey: 12345678
+  whiteRemoteAddress: 192.168.0.*
+  admin: false
+  defaultTopicPerm: DENY
+  defaultGroupPerm: SUB
+  topicPerms:
+  - topicA=DENY
+  - topicB=PUB|SUB
+  - topicC=SUB
+  groupPerms:
+  - groupA=DENY
+  - groupB=SUB
+  - groupC=SUB
+- accessKey: rocketmq2
+  secretKey: 12345678
+  whiteRemoteAddress: 192.168.1.*
+  admin: true
diff --git a/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml b/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml
new file mode 100644
index 0000000..08274b1
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml
@@ -0,0 +1,3 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
\ No newline at end of file
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 427f861..56e3fe4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -162,7 +163,7 @@ public class BrokerController {
     private TransactionalMessageService transactionalMessageService;
     private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
     private Future<?> slaveSyncFuture;
-
+    private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();
 
     public BrokerController(
         final BrokerConfig brokerConfig,
@@ -502,6 +503,7 @@ public class BrokerController {
 
         for (AccessValidator accessValidator: accessValidators) {
             final AccessValidator validator = accessValidator;
+            accessValidatorMap.put(validator.getClass(),validator);
             this.registerServerRPCHook(new RPCHook() {
 
                 @Override
@@ -1101,7 +1103,9 @@ public class BrokerController {
 
     }
 
-
+    public Map<Class, AccessValidator> getAccessValidatorMap() {
+        return accessValidatorMap;
+    }
 
     private void handleSlaveSynchronize(BrokerRole role) {
         if (role == BrokerRole.SLAVE) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 73fe439..f23cca6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -30,6 +30,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.acl.plain.PlainAccessValidator;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -37,6 +39,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -44,6 +47,10 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -201,6 +208,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 return fetchAllConsumeStatsInBroker(ctx, request);
             case RequestCode.QUERY_CONSUME_QUEUE:
                 return queryConsumeQueue(ctx, request);
+            case RequestCode.UPDATE_AND_CREATE_ACL_CONFIG:
+                return updateAndCreateAccessConfig(ctx, request);
+            case RequestCode.DELETE_ACL_CONFIG:
+                return deleteAccessConfig(ctx, request);
+            case RequestCode.GET_BROKER_CLUSTER_ACL_INFO:
+                return getBrokerAclConfigVersion(ctx, request);
+            case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
+                return updateGlobalWhiteAddrsConfig(ctx, request);
             default:
                 break;
         }
@@ -269,6 +284,140 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return response;
     }
 
+    private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        final CreateAccessConfigRequestHeader requestHeader =
+            (CreateAccessConfigRequestHeader) request.decodeCommandCustomHeader(CreateAccessConfigRequestHeader.class);
+
+        PlainAccessConfig accessConfig = new PlainAccessConfig();
+        accessConfig.setAccessKey(requestHeader.getAccessKey());
+        accessConfig.setSecretKey(requestHeader.getSecretKey());
+        accessConfig.setWhiteRemoteAddress(requestHeader.getWhiteRemoteAddress());
+        accessConfig.setDefaultTopicPerm(requestHeader.getDefaultTopicPerm());
+        accessConfig.setDefaultGroupPerm(requestHeader.getDefaultGroupPerm());
+        accessConfig.setTopicPerms(UtilAll.String2List(requestHeader.getTopicPerms(),","));
+        accessConfig.setGroupPerms(UtilAll.String2List(requestHeader.getGroupPerms(),","));
+        accessConfig.setAdmin(requestHeader.isAdmin());
+        try {
+
+            AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
+            if (accessValidator.updateAccessConfig(accessConfig)) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                response.markResponseType();
+                response.setRemark(null);
+                ctx.writeAndFlush(response);
+            } else {
+                String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been updated failed.";
+                log.warn(errorMsg);
+                response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
+                response.setRemark(errorMsg);
+                return response;
+            }
+        } catch (Exception e) {
+            log.error("Failed to generate a proper update accessvalidator response", e);
+            response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
+            response.setRemark(e.getMessage());
+            return response;
+        }
+
+        return null;
+    }
+
+    private synchronized RemotingCommand deleteAccessConfig(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        final DeleteAccessConfigRequestHeader requestHeader =
+            (DeleteAccessConfigRequestHeader) request.decodeCommandCustomHeader(DeleteAccessConfigRequestHeader.class);
+        log.info("DeleteAccessConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+        try {
+            String accessKey = requestHeader.getAccessKey();
+            AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
+            if (accessValidator.deleteAccessConfig(accessKey)) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                response.markResponseType();
+                response.setRemark(null);
+                ctx.writeAndFlush(response);
+            } else {
+                String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been deleted failed.";
+                log.warn(errorMsg);
+                response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
+                response.setRemark(errorMsg);
+                return response;
+            }
+
+        } catch (Exception e) {
+            log.error("Failed to generate a proper delete accessvalidator response", e);
+            response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
+            response.setRemark(e.getMessage());
+            return response;
+        }
+
+        return null;
+    }
+
+    private synchronized RemotingCommand updateGlobalWhiteAddrsConfig(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        final UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader =
+            (UpdateGlobalWhiteAddrsConfigRequestHeader) request.decodeCommandCustomHeader(UpdateGlobalWhiteAddrsConfigRequestHeader.class);
+
+        try {
+            AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
+            if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.String2List(requestHeader.getGlobalWhiteAddrs(),","))) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+                response.markResponseType();
+                response.setRemark(null);
+                ctx.writeAndFlush(response);
+            } else {
+                String errorMsg = "The globalWhiteAddresses[" + requestHeader.getGlobalWhiteAddrs() + "] has been updated failed.";
+                log.warn(errorMsg);
+                response.setCode(ResponseCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED);
+                response.setRemark(errorMsg);
+                return response;
+            }
+        } catch (Exception e) {
+            log.error("Failed to generate a proper update globalWhiteAddresses response", e);
+            response.setCode(ResponseCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED);
+            response.setRemark(e.getMessage());
+            return response;
+        }
+
+        return null;
+    }
+
+    private RemotingCommand getBrokerAclConfigVersion(ChannelHandlerContext ctx, RemotingCommand request) {
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerAclConfigResponseHeader.class);
+
+        final GetBrokerAclConfigResponseHeader responseHeader = (GetBrokerAclConfigResponseHeader)response.readCustomHeader();
+
+        try {
+            AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
+
+            responseHeader.setVersion(accessValidator.getAclConfigVersion());
+            responseHeader.setBrokerAddr(this.brokerController.getBrokerAddr());
+            responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+            responseHeader.setClusterName(this.brokerController.getBrokerConfig().getBrokerClusterName());
+            
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        } catch (Exception e) {
+            log.error("Failed to generate a proper getBrokerAclConfigVersion response", e);
+        }
+
+        return null;
+    }
+
     private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
         // final GetAllTopicConfigResponseHeader responseHeader =
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 9048ab8..c3382ca 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -44,8 +44,10 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -63,6 +65,7 @@ import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -86,10 +89,13 @@ import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
 import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
@@ -123,6 +129,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
@@ -284,6 +291,104 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
+    public void createPlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig,
+        final long timeoutMillis)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        CreateAccessConfigRequestHeader requestHeader = new CreateAccessConfigRequestHeader();
+        requestHeader.setAccessKey(plainAccessConfig.getAccessKey());
+        requestHeader.setSecretKey(plainAccessConfig.getSecretKey());
+        requestHeader.setAdmin(plainAccessConfig.isAdmin());
+        requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
+        requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
+        requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
+        requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),","));
+        requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),","));
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+            request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public void deleteAccessConfig(final String addr, final String accessKey, final long timeoutMillis)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        DeleteAccessConfigRequestHeader requestHeader = new DeleteAccessConfigRequestHeader();
+        requestHeader.setAccessKey(accessKey);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_ACL_CONFIG, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+            request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+        UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
+        requestHeader.setGlobalWhiteAddrs(globalWhiteAddrs);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+            request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetBrokerAclConfigResponseHeader responseHeader =
+                    (GetBrokerAclConfigResponseHeader) response.decodeCommandCustomHeader(GetBrokerAclConfigResponseHeader.class);
+
+                ClusterAclVersionInfo clusterAclVersionInfo = new ClusterAclVersionInfo();
+                clusterAclVersionInfo.setClusterName(responseHeader.getClusterName());
+                clusterAclVersionInfo.setBrokerName(responseHeader.getBrokerName());
+                clusterAclVersionInfo.setBrokerAddr(responseHeader.getBrokerAddr());
+                clusterAclVersionInfo.setAclConfigDataVersion(DataVersion.fromJson(responseHeader.getVersion(), DataVersion.class));
+                return clusterAclVersionInfo;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+        
+    }
+
     public SendResult sendMessage(
         final String addr,
         final String brokerName,
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index c13e75c..e3a25b9 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -19,17 +19,24 @@ package org.apache.rocketmq.client.impl;
 import java.lang.reflect.Field;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.hook.SendMessageContext;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -210,6 +217,79 @@ public class MQClientAPIImplTest {
         }
     }
 
+    @Test
+    public void testCreatePlainAccessConfig_Success() throws InterruptedException, RemotingException, MQBrokerException {
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock mock) throws Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                return createSuccessResponse4UpdateAclConfig(request);
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        PlainAccessConfig config = createUpdateAclConfig();
+
+        try {
+            mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000);
+        } catch (MQClientException ex) {
+
+        }
+    }
+
+    @Test
+    public void testCreatePlainAccessConfig_Exception() throws InterruptedException, RemotingException, MQBrokerException {
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock mock) throws Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                return createErrorResponse4UpdateAclConfig(request);
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        PlainAccessConfig config = createUpdateAclConfig();
+        try {
+            mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000);
+        } catch (MQClientException ex) {
+            assertThat(ex.getResponseCode()).isEqualTo(209);
+            assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been updated failed");
+        }
+    }
+
+    @Test
+    public void testDeleteAccessConfig_Success() throws InterruptedException, RemotingException, MQBrokerException {
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock mock) throws Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                return createSuccessResponse4DeleteAclConfig(request);
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+
+    }
+
+    @Test
+    public void testDeleteAccessConfig_Exception() throws InterruptedException, RemotingException, MQBrokerException {
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock mock) throws Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                return createErrorResponse4DeleteAclConfig(request);
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        try {
+            mqClientAPI.deleteAccessConfig(brokerAddr, "11111", 3 * 1000);
+        } catch (MQClientException ex) {
+            assertThat(ex.getResponseCode()).isEqualTo(210);
+            assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
+        }
+    }
+
     private RemotingCommand createSuccessResponse(RemotingCommand request) {
         RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
         response.setCode(ResponseCode.SUCCESS);
@@ -228,6 +308,58 @@ public class MQClientAPIImplTest {
         return response;
     }
 
+    private RemotingCommand createSuccessResponse4UpdateAclConfig(RemotingCommand request) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setOpaque(request.getOpaque());
+        response.markResponseType();
+        response.setRemark(null);
+
+        return response;
+    }
+
+    private RemotingCommand createSuccessResponse4DeleteAclConfig(RemotingCommand request) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setOpaque(request.getOpaque());
+        response.markResponseType();
+        response.setRemark(null);
+        
+        return response;
+    }
+
+    private RemotingCommand createErrorResponse4UpdateAclConfig(RemotingCommand request) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
+        response.setOpaque(request.getOpaque());
+        response.markResponseType();
+        response.setRemark("corresponding to accessConfig has been updated failed");
+
+        return response;
+    }
+
+    private RemotingCommand createErrorResponse4DeleteAclConfig(RemotingCommand request) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
+        response.setOpaque(request.getOpaque());
+        response.markResponseType();
+        response.setRemark("corresponding to accessConfig has been deleted failed");
+
+        return response;
+    }
+
+    private PlainAccessConfig createUpdateAclConfig() {
+
+        PlainAccessConfig config = new PlainAccessConfig();
+        config.setAccessKey("Rocketmq111");
+        config.setSecretKey("123456789");
+        config.setAdmin(true);
+        config.setWhiteRemoteAddress("127.0.0.1");
+        config.setDefaultTopicPerm("DENY");
+        config.setDefaultGroupPerm("SUB");
+        return config;
+    }
+
     private SendMessageRequestHeader createSendMessageRequestHeader() {
         SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
         requestHeader.setBornTimestamp(System.currentTimeMillis());
diff --git a/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java b/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java
new file mode 100644
index 0000000..b193f43
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import java.util.List;
+
+public class PlainAccessConfig {
+
+    private String accessKey;
+
+    private String secretKey;
+
+    private String whiteRemoteAddress;
+
+    private boolean admin;
+
+    private String defaultTopicPerm;
+
+    private String defaultGroupPerm;
+
+    private List<String> topicPerms;
+
+    private List<String> groupPerms;
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getWhiteRemoteAddress() {
+        return whiteRemoteAddress;
+    }
+
+    public void setWhiteRemoteAddress(String whiteRemoteAddress) {
+        this.whiteRemoteAddress = whiteRemoteAddress;
+    }
+
+    public boolean isAdmin() {
+        return admin;
+    }
+
+    public void setAdmin(boolean admin) {
+        this.admin = admin;
+    }
+
+    public String getDefaultTopicPerm() {
+        return defaultTopicPerm;
+    }
+
+    public void setDefaultTopicPerm(String defaultTopicPerm) {
+        this.defaultTopicPerm = defaultTopicPerm;
+    }
+
+    public String getDefaultGroupPerm() {
+        return defaultGroupPerm;
+    }
+
+    public void setDefaultGroupPerm(String defaultGroupPerm) {
+        this.defaultGroupPerm = defaultGroupPerm;
+    }
+
+    public List<String> getTopicPerms() {
+        return topicPerms;
+    }
+
+    public void setTopicPerms(List<String> topicPerms) {
+        this.topicPerms = topicPerms;
+    }
+
+    public List<String> getGroupPerms() {
+        return groupPerms;
+    }
+
+    public void setGroupPerms(List<String> groupPerms) {
+        this.groupPerms = groupPerms;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index dee6ca2..33674dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -28,15 +28,17 @@ import java.net.NetworkInterface;
 import java.text.NumberFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.zip.CRC32;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -529,4 +531,28 @@ public class UtilAll {
             file.delete();
         }
     }
+
+    public static String List2String(List<String> list,String splitor) {
+        if (list == null || list.size() == 0) {
+            return null;
+        }
+        StringBuffer str = new StringBuffer();
+        for (int i = 0;i < list.size();i++) {
+            str.append(list.get(i));
+            if (i == list.size() - 1) {
+                continue;
+            }
+            str.append(splitor);
+        }
+        return str.toString();
+    }
+
+    public static List<String> String2List(String str,String splitor) {
+        if (StringUtils.isEmpty(str)) {
+            return null;
+        }
+
+        String[] addrArray = str.split(splitor);
+        return Arrays.asList(addrArray);
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 8cf2d46..b771b77 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -70,6 +70,14 @@ public class RequestCode {
 
     public static final int CHECK_CLIENT_CONFIG = 46;
 
+    public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;
+
+    public static final int DELETE_ACL_CONFIG = 51;
+
+    public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;
+
+    public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
+
     public static final int PUT_KV_CONFIG = 100;
 
     public static final int GET_KV_CONFIG = 101;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index f62c4ea..dc74444 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -73,4 +73,11 @@ public class ResponseCode extends RemotingSysResponseCode {
     public static final int CONSUME_MSG_TIMEOUT = 207;
 
     public static final int NO_MESSAGE = 208;
+
+    public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;
+
+    public static final int DELETE_ACL_CONFIG_FAILED = 210;
+
+    public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java
new file mode 100644
index 0000000..aeae9d5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class ClusterAclVersionInfo extends RemotingSerializable {
+
+    private String brokerName;
+
+    private String brokerAddr;
+
+    private DataVersion aclConfigDataVersion;
+
+    private String clusterName;
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public DataVersion getAclConfigDataVersion() {
+        return aclConfigDataVersion;
+    }
+
+    public void setAclConfigDataVersion(DataVersion aclConfigDataVersion) {
+        this.aclConfigDataVersion = aclConfigDataVersion;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java
new file mode 100644
index 0000000..36990fc
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class CreateAccessConfigRequestHeader implements CommandCustomHeader {
+
+    @CFNotNull
+    private String accessKey;
+
+    private String secretKey;
+
+    private String whiteRemoteAddress;
+
+    private boolean admin;
+
+    private String defaultTopicPerm;
+
+    private String defaultGroupPerm;
+
+    // list string,eg: topicA=DENY,topicD=SUB
+    private String topicPerms;
+
+    // list string,eg: groupD=DENY,groupD=SUB
+    private String groupPerms;
+    
+
+    @Override public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getWhiteRemoteAddress() {
+        return whiteRemoteAddress;
+    }
+
+    public void setWhiteRemoteAddress(String whiteRemoteAddress) {
+        this.whiteRemoteAddress = whiteRemoteAddress;
+    }
+
+    public boolean isAdmin() {
+        return admin;
+    }
+
+    public void setAdmin(boolean admin) {
+        this.admin = admin;
+    }
+
+    public String getDefaultTopicPerm() {
+        return defaultTopicPerm;
+    }
+
+    public void setDefaultTopicPerm(String defaultTopicPerm) {
+        this.defaultTopicPerm = defaultTopicPerm;
+    }
+
+    public String getDefaultGroupPerm() {
+        return defaultGroupPerm;
+    }
+
+    public void setDefaultGroupPerm(String defaultGroupPerm) {
+        this.defaultGroupPerm = defaultGroupPerm;
+    }
+
+    public String getTopicPerms() {
+        return topicPerms;
+    }
+
+    public void setTopicPerms(String topicPerms) {
+        this.topicPerms = topicPerms;
+    }
+
+    public String getGroupPerms() {
+        return groupPerms;
+    }
+
+    public void setGroupPerms(String groupPerms) {
+        this.groupPerms = groupPerms;
+    }
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java
similarity index 57%
copy from acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java
index c915cf3..293480c 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java
@@ -15,24 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.acl;
-
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-public interface AccessValidator {
-    /**
-     * Parse to get the AccessResource(user, resource, needed permission)
-     *
-     * @param request
-     * @param remoteAddr
-     * @return Plain access resource result,include access key,signature and some other access attributes.
-     */
-    AccessResource parse(RemotingCommand request, String remoteAddr);
-
-    /**
-     * Validate the access resource.
-     *
-     * @param accessResource
-     */
-    void validate(AccessResource accessResource);
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class DeleteAccessConfigRequestHeader implements CommandCustomHeader {
+
+    @CFNotNull
+    private String accessKey;
+
+    @Override public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java
new file mode 100644
index 0000000..43fbe47
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetBrokerAclConfigResponseHeader implements CommandCustomHeader {
+
+    @CFNotNull
+    private String version;
+
+    @CFNotNull
+    private String brokerName;
+
+    @CFNotNull
+    private String brokerAddr;
+
+    @CFNotNull
+    private String clusterName;
+
+    @Override public void checkFields() throws RemotingCommandException {
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java
similarity index 54%
copy from acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java
index c915cf3..2d42c75 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java
@@ -14,25 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.rocketmq.common.protocol.header;
 
-package org.apache.rocketmq.acl;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+public class UpdateGlobalWhiteAddrsConfigRequestHeader implements CommandCustomHeader {
 
-public interface AccessValidator {
-    /**
-     * Parse to get the AccessResource(user, resource, needed permission)
-     *
-     * @param request
-     * @param remoteAddr
-     * @return Plain access resource result,include access key,signature and some other access attributes.
-     */
-    AccessResource parse(RemotingCommand request, String remoteAddr);
+    @CFNotNull
+    private String globalWhiteAddrs;
 
-    /**
-     * Validate the access resource.
-     *
-     * @param accessResource
-     */
-    void validate(AccessResource accessResource);
+    @Override public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getGlobalWhiteAddrs() {
+        return globalWhiteAddrs;
+    }
+
+    public void setGlobalWhiteAddrs(String globalWhiteAddrs) {
+        this.globalWhiteAddrs = globalWhiteAddrs;
+    }
 }
diff --git a/docs/cn/acl/user_guide.md b/docs/cn/acl/user_guide.md
index 1fea9ef..01c37dc 100644
--- a/docs/cn/acl/user_guide.md
+++ b/docs/cn/acl/user_guide.md
@@ -82,5 +82,76 @@ RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可
 (2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组
 内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。
 
-**特别注意**在[4.5.0]版本中即使使用上面所述的白名单也无法解决开启ACL的问题,解决该问题的[PR链接](https://github.com/apache/rocketmq/pull/1149)
+## 7. ACL mqadmin配置管理命令
 
+### 7.1 更新ACL配置文件中“account”的属性值
+
+该命令的示例如下:
+
+sh mqadmin updateAclConfig -n 192.168.1.2:9876 -b 192.168.12.134:10911 -a RocketMQ -s 1234567809123 
+-t topicA=DENY,topicD=SUB -g groupD=DENY,groupB=SUB
+
+说明:如果不存在则会在ACL Config YAML配置文件中创建;若存在,则会更新对应的“accounts”的属性值;
+如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
+
+| 参数 | 取值 | 含义 |
+| --- | --- | --- |
+| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
+| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
+| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
+| a | eg:RocketMQ | Access Key值(必填) |
+| s | eg:1234567809123 | Secret Key值(可选) |
+| m | eg:true | 是否管理员账户(可选) |
+| w | eg:192.168.0.* | whiteRemoteAddress,用户IP白名单(可选) |
+| i | eg:DENY;PUB;SUB;PUB\|SUB | defaultTopicPerm,默认Topic权限(可选) |
+| u | eg:DENY;PUB;SUB;PUB\|SUB | defaultGroupPerm,默认ConsumerGroup权限(可选) |
+| t | eg:topicA=DENY,topicD=SUB | topicPerms,各个Topic的权限(可选) |
+| g | eg:groupD=DENY,groupB=SUB | groupPerms,各个ConsumerGroup的权限(可选) |
+
+### 7.2 删除ACL配置文件里面的对应“account”
+该命令的示例如下:
+
+sh mqadmin deleteAccessConfig -n 192.168.1.2:9876 -c DefaultCluster -a RocketMQ
+
+说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
+其中,参数"a"为Access Key的值,用以标识唯一账户id,因此该命令的参数中指定账户id即可。
+
+| 参数 | 取值 | 含义 |
+| --- | --- | --- |
+| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
+| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
+| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
+| a | eg:RocketMQ | Access Key的值(必填) |
+
+
+### 7.3 更新ACL配置文件里面中的全局白名单
+该命令的示例如下:
+
+sh mqadmin updateGlobalWhiteAddr -n 192.168.1.2:9876 -b 192.168.12.134:10911 -g 10.10.154.1,10.10.154.2
+
+说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
+其中,参数"g"为全局IP白名的值,用以更新ACL配置文件中的“globalWhiteRemoteAddresses”字段的属性值。
+
+| 参数 | 取值 | 含义 |
+| --- | --- | --- |
+| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
+| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
+| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
+| g | eg:10.10.154.1,10.10.154.2 | 全局IP白名单(必填) |
+
+### 7.4 查询集群/Broker的ACL配置文件版本信息
+该命令的示例如下:
+
+sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster
+
+说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
+
+| 参数 | 取值 | 含义 |
+| --- | --- | --- |
+| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
+| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
+| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
+
+
+**特别注意**开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题,
+在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149;
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index dc829c1..f00dcef 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.RollbackStats;
@@ -33,6 +34,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -161,6 +163,27 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
+    public void createAndUpdatePlainAccessConfig(String addr,
+        PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExtImpl.createAndUpdatePlainAccessConfig(addr, config);
+    }
+
+    @Override public void deletePlainAccessConfig(String addr,
+        String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExtImpl.deletePlainAccessConfig(addr, accessKey);
+    }
+
+    @Override public void updateGlobalWhiteAddrConfig(String addr,
+        String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExtImpl.updateGlobalWhiteAddrConfig(addr, globalWhiteAddrs);
+    }
+
+    @Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
+        String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExtImpl.examineBrokerClusterAclVersionInfo(addr);
+    }
+
+    @Override
     public void createAndUpdateSubscriptionGroupConfig(String addr,
         SubscriptionGroupConfig config) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException {
@@ -305,6 +328,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
         defaultMQAdminExtImpl.deleteKvConfig(namespace, key);
     }
 
+    @Override
     public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
         boolean force)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 2a7815b..502e9da 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -47,6 +48,7 @@ import org.apache.rocketmq.common.admin.RollbackStats;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -87,6 +89,7 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.apache.rocketmq.tools.admin.api.TrackType;
 
 public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
+
     private final InternalLogger log = ClientLogger.getLog();
     private final DefaultMQAdminExt defaultMQAdminExt;
     private ServiceState serviceState = ServiceState.CREATE_JUST;
@@ -178,6 +181,27 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
     }
 
+    @Override public void createAndUpdatePlainAccessConfig(String addr,
+        PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.mqClientInstance.getMQClientAPIImpl().createPlainAccessConfig(addr, config, timeoutMillis);
+    }
+
+    @Override public void deletePlainAccessConfig(String addr,
+        String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.mqClientInstance.getMQClientAPIImpl().deleteAccessConfig(addr, accessKey, timeoutMillis);
+    }
+
+    @Override public void updateGlobalWhiteAddrConfig(String addr,
+        String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(addr, globalWhiteAddrs, timeoutMillis);
+    }
+
+    @Override
+    public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
+        String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis);
+    }
+
     @Override
     public void createAndUpdateSubscriptionGroupConfig(String addr,
         SubscriptionGroupConfig config) throws RemotingException,
@@ -548,6 +572,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
         return Collections.EMPTY_MAP;
     }
 
+    @Override
     public void createOrUpdateOrderConf(String key, String value,
         boolean isCluster) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 16b4427..930785e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.RollbackStats;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -68,6 +70,18 @@ public interface MQAdminExt extends MQAdmin {
         final TopicConfig config) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
 
+    void createAndUpdatePlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException,
+        InterruptedException, MQClientException;
+
+    void deletePlainAccessConfig(final String addr, final String accessKey) throws RemotingException, MQBrokerException,
+        InterruptedException, MQClientException;
+
+    void updateGlobalWhiteAddrConfig(final String addr, final String globalWhiteAddrs)throws RemotingException, MQBrokerException,
+        InterruptedException, MQClientException;
+
+    ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(final String addr) throws RemotingException, MQBrokerException,
+        InterruptedException, MQClientException;
+
     void createAndUpdateSubscriptionGroupConfig(final String addr,
         final SubscriptionGroupConfig config) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index f2531de..614fed8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -30,6 +30,10 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand;
+import org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand;
+import org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand;
+import org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand;
 import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
 import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
 import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
@@ -199,6 +203,12 @@ public class MQAdminStartup {
         initCommand(new QueryConsumeQueueCommand());
         initCommand(new SendMessageCommand());
         initCommand(new ConsumeMessageCommand());
+
+        //for acl command
+        initCommand(new UpdateAccessConfigSubCommand());
+        initCommand(new DeleteAccessConfigSubCommand());
+        initCommand(new ClusterAclConfigVersionListSubCommand());
+        initCommand(new UpdateGlobalWhiteAddrSubCommand());
     }
 
     private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java
new file mode 100644
index 0000000..c1e86fb
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ClusterAclConfigVersionListSubCommand implements SubCommand {
+
+    @Override public String commandName() {
+        return "clusterAclConfigVersion";
+    }
+
+    @Override public String commandDesc() {
+        return "List all of acl config version information in cluster";
+    }
+
+    @Override public Options buildCommandlineOptions(Options options) {
+        OptionGroup optionGroup = new OptionGroup();
+
+        Option opt = new Option("b", "brokerAddr", true, "query acl config version for which broker");
+        optionGroup.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "query acl config version for specified cluster");
+        optionGroup.addOption(opt);
+
+        optionGroup.setRequired(true);
+        options.addOptionGroup(optionGroup);
+        
+        return options;
+    }
+
+    @Override public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+                defaultMQAdminExt.start();
+                printClusterBaseInfo(defaultMQAdminExt, addr);
+
+                System.out.printf("get broker's plain access config version success.%n", addr);
+                return;
+
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+
+                defaultMQAdminExt.start();
+
+                Set<String> masterSet =
+                    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                System.out.printf("%-16s  %-22s  %-22s  %-20s  %-22s%n",
+                    "#Cluster Name",
+                    "#Broker Name",
+                    "#Broker Addr",
+                    "#AclConfigVersionNum",
+                    "#AclLastUpdateTime"
+                );
+                for (String addr : masterSet) {
+                    printClusterBaseInfo(defaultMQAdminExt, addr);
+                }
+                System.out.printf("get cluster's plain access config version success.%n");
+
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+
+    private void printClusterBaseInfo(
+        final DefaultMQAdminExt defaultMQAdminExt, final String addr) throws
+        InterruptedException, MQBrokerException, RemotingException, MQClientException {
+
+
+        ClusterAclVersionInfo clusterAclVersionInfo = defaultMQAdminExt.examineBrokerClusterAclVersionInfo(addr);
+        DataVersion aclDataVersion = clusterAclVersionInfo.getAclConfigDataVersion();
+        String versionNum = String.valueOf(aclDataVersion.getCounter());
+
+        DateFormat sdf = new SimpleDateFormat(UtilAll.YYYY_MM_DD_HH_MM_SS);
+        String timeStampStr = sdf.format(new Timestamp(aclDataVersion.getTimestamp()));
+
+        System.out.printf("%-16s  %-22s  %-22s  %-20s  %-22s%n",
+            clusterAclVersionInfo.getClusterName(),
+            clusterAclVersionInfo.getBrokerName(),
+            clusterAclVersionInfo.getBrokerAddr(),
+            versionNum,
+            timeStampStr
+        );
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java
new file mode 100644
index 0000000..8570b2f
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class DeleteAccessConfigSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "deleteAccessConfig";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Delete Acl Config Account in broker";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        OptionGroup optionGroup = new OptionGroup();
+
+        Option opt = new Option("b", "brokerAddr", true, "delete acl config account to which broker");
+        optionGroup.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "delete cl config account to which cluster");
+        optionGroup.addOption(opt);
+
+        optionGroup.setRequired(true);
+        options.addOptionGroup(optionGroup);
+
+        opt = new Option("a", "accessKey", true, "set accessKey in acl config file for deleting which account");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+
+            String accessKey = commandLine.getOptionValue('a').trim();
+
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+
+                defaultMQAdminExt.start();
+                defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
+
+                System.out.printf("delete plain access config account to %s success.%n", addr);
+                System.out.printf("account's accesskey is:%s", accessKey);
+                return;
+
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+
+                defaultMQAdminExt.start();
+
+                Set<String> masterSet =
+                    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
+                    System.out.printf("delete plain access config account to %s success.%n", addr);
+                }
+
+                System.out.printf("account's accesskey is:%s", accessKey);
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
new file mode 100644
index 0000000..10241bf
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateAccessConfigSubCommand implements SubCommand {
+
+    @Override
+    public String commandName() {
+        return "updateAclConfig";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Update acl config yaml file in broker";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        OptionGroup optionGroup = new OptionGroup();
+
+        Option opt = new Option("b", "brokerAddr", true, "update acl config file to which broker");
+        optionGroup.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "update cl config file to which cluster");
+        optionGroup.addOption(opt);
+
+        optionGroup.setRequired(true);
+        options.addOptionGroup(optionGroup);
+
+        opt = new Option("a", "accessKey", true, "set accessKey in acl config file");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("s", "secretKey", true, "set secretKey in acl config file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("w", "whiteRemoteAddress", true, "set white ip Address for account in acl config file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("i", "defaultTopicPerm", true, "set default topicPerm in acl config file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("u", "defaultGroupPerm", true, "set default GroupPerm in acl config file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("t", "topicPerms", true, "set topicPerms list,eg: topicA=DENY,topicD=SUB");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "groupPerms", true, "set groupPerms list,eg: groupD=DENY,groupD=SUB");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("m", "admin", true, "set admin flag in acl config file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            PlainAccessConfig accessConfig = new PlainAccessConfig();
+            accessConfig.setAccessKey(commandLine.getOptionValue('a').trim());
+            // Secretkey
+            if (commandLine.hasOption('s')) {
+                accessConfig.setSecretKey(commandLine.getOptionValue('s').trim());
+            }
+
+            // Admin
+            if (commandLine.hasOption('m')) {
+                accessConfig.setAdmin(Boolean.parseBoolean(commandLine.getOptionValue('m').trim()));
+            }
+
+            // DefaultTopicPerm
+            if (commandLine.hasOption('i')) {
+                accessConfig.setDefaultTopicPerm(commandLine.getOptionValue('i').trim());
+            }
+
+            // DefaultGroupPerm
+            if (commandLine.hasOption('u')) {
+                accessConfig.setDefaultGroupPerm(commandLine.getOptionValue('u').trim());
+            }
+
+            // WhiteRemoteAddress
+            if (commandLine.hasOption('w')) {
+                accessConfig.setWhiteRemoteAddress(commandLine.getOptionValue('w').trim());
+            }
+
+            // TopicPerms list value
+            if (commandLine.hasOption('t')) {
+                String[] topicPerms = commandLine.getOptionValue('t').trim().split(",");
+                List<String> topicPermList = new ArrayList<String>();
+                if (topicPerms != null) {
+                    for (String topicPerm : topicPerms) {
+                        topicPermList.add(topicPerm);
+                    }
+                }
+                accessConfig.setTopicPerms(topicPermList);
+            }
+
+            // GroupPerms list value
+            if (commandLine.hasOption('g')) {
+                String[] groupPerms = commandLine.getOptionValue('g').trim().split(",");
+                List<String> groupPermList = new ArrayList<String>();
+                if (groupPerms != null) {
+                    for (String groupPerm : groupPerms) {
+                        groupPermList.add(groupPerm);
+                    }
+                }
+                accessConfig.setGroupPerms(groupPermList);
+            }
+
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+
+                defaultMQAdminExt.start();
+                defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
+
+                System.out.printf("create or update plain access config to %s success.%n", addr);
+                System.out.printf("%s", accessConfig);
+                return;
+
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+
+                defaultMQAdminExt.start();
+                Set<String> masterSet =
+                    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
+                    System.out.printf("create or update plain access config to %s success.%n", addr);
+                }
+
+                System.out.printf("%s", accessConfig);
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java
new file mode 100644
index 0000000..ef9d940
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateGlobalWhiteAddrSubCommand implements SubCommand {
+
+    @Override public String commandName() {
+        return "updateGlobalWhiteAddr";
+    }
+
+    @Override public String commandDesc() {
+        return "Update global white address for acl Config File in broker";
+    }
+
+    @Override public Options buildCommandlineOptions(Options options) {
+
+        OptionGroup optionGroup = new OptionGroup();
+
+        Option opt = new Option("b", "brokerAddr", true, "update global white address to which broker");
+        optionGroup.addOption(opt);
+
+        opt = new Option("c", "clusterName", true, "update global white address to which cluster");
+        optionGroup.addOption(opt);
+
+        optionGroup.setRequired(true);
+        options.addOptionGroup(optionGroup);
+
+        opt = new Option("g", "globalWhiteRemoteAddresses", true, "set globalWhiteRemoteAddress list,eg: 10.10.103.*,192.168.0.*");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override public void execute(CommandLine commandLine, Options options,
+        RPCHook rpcHook) throws SubCommandException {
+
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        try {
+            // GlobalWhiteRemoteAddresses list value
+            String globalWhiteRemoteAddresses = commandLine.getOptionValue('g').trim();
+
+
+            if (commandLine.hasOption('b')) {
+                String addr = commandLine.getOptionValue('b').trim();
+
+                defaultMQAdminExt.start();
+                defaultMQAdminExt.updateGlobalWhiteAddrConfig(addr, globalWhiteRemoteAddresses);
+
+                System.out.printf("update global white remote addresses to %s success.%n", addr);
+                return;
+
+            } else if (commandLine.hasOption('c')) {
+                String clusterName = commandLine.getOptionValue('c').trim();
+
+                defaultMQAdminExt.start();
+                Set<String> masterSet =
+                    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.updateGlobalWhiteAddrConfig(addr, globalWhiteRemoteAddresses);
+                    System.out.printf("update global white remote addresses to %s success.%n", addr);
+                }
+                return;
+            }
+
+            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
+        }
+    }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java
similarity index 57%
copy from tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
copy to tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java
index 5ea03d6..ba8baa3 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.tools.command.topic;
+package org.apache.rocketmq.tools.command.acl;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -24,31 +24,15 @@ import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-public class UpdateTopicSubCommandTest {
+public class ClusterAclConfigVersionListSubCommandTest {
+
     @Test
     public void testExecute() {
-        UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
+        ClusterAclConfigVersionListSubCommand cmd = new ClusterAclConfigVersionListSubCommand();
         Options options = ServerUtil.buildCommandlineOptions(new Options());
-        String[] subargs = new String[] {
-            "-b 127.0.0.1:10911",
-            "-c default-cluster",
-            "-t unit-test",
-            "-r 8",
-            "-w 8",
-            "-p 6",
-            "-o false",
-            "-u false",
-            "-s false"};
+        String[] subargs = new String[] {"-c default-cluster"};
         final CommandLine commandLine =
             ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
-        assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
         assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
-        assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
-        assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
-        assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
-        assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
-        assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false");
-        assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false");
-        assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false");
     }
-}
\ No newline at end of file
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java
similarity index 58%
copy from tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
copy to tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java
index 5ea03d6..74092f4 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.tools.command.topic;
+package org.apache.rocketmq.tools.command.acl;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -24,31 +24,16 @@ import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-public class UpdateTopicSubCommandTest {
+public class DeleteAccessConfigSubCommandTest {
+
     @Test
     public void testExecute() {
-        UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
+        DeleteAccessConfigSubCommand cmd = new DeleteAccessConfigSubCommand();
         Options options = ServerUtil.buildCommandlineOptions(new Options());
-        String[] subargs = new String[] {
-            "-b 127.0.0.1:10911",
-            "-c default-cluster",
-            "-t unit-test",
-            "-r 8",
-            "-w 8",
-            "-p 6",
-            "-o false",
-            "-u false",
-            "-s false"};
+        String[] subargs = new String[] {"-a unit-test", "-c default-cluster"};
         final CommandLine commandLine =
             ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
-        assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
+        assertThat(commandLine.getOptionValue('a').trim()).isEqualTo("unit-test");
         assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
-        assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
-        assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
-        assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
-        assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
-        assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false");
-        assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false");
-        assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false");
     }
-}
\ No newline at end of file
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java
new file mode 100644
index 0000000..2c133a2
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class UpdateAccessConfigSubCommandTest {
+
+    @Test
+    public void testExecute() {
+        UpdateAccessConfigSubCommand cmd = new UpdateAccessConfigSubCommand();
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {
+            "-b 127.0.0.1:10911",
+            "-a RocketMQ",
+            "-s 12345678",
+            "-w 192.168.0.*",
+            "-i DENY",
+            "-u SUB",
+            "-t topicA=DENY;topicB=PUB|SUB",
+            "-g groupA=DENY;groupB=SUB",
+            "-m true"};
+        final CommandLine commandLine =
+            ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+        assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
+        assertThat(commandLine.getOptionValue('a').trim()).isEqualTo("RocketMQ");
+        assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("12345678");
+        assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("192.168.0.*");
+        assertThat(commandLine.getOptionValue('i').trim()).isEqualTo("DENY");
+        assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("SUB");
+        assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("topicA=DENY;topicB=PUB|SUB");
+        assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("groupA=DENY;groupB=SUB");
+        assertThat(commandLine.getOptionValue('m').trim()).isEqualTo("true");
+
+        PlainAccessConfig accessConfig = new PlainAccessConfig();
+
+        // topicPerms list value
+        if (commandLine.hasOption('t')) {
+            String[] topicPerms = commandLine.getOptionValue('t').trim().split(";");
+            List<String> topicPermList = new ArrayList<String>();
+            if (topicPerms != null) {
+                for (String topicPerm : topicPerms) {
+                    topicPermList.add(topicPerm);
+                }
+            }
+            accessConfig.setTopicPerms(topicPermList);
+        }
+
+        // groupPerms list value
+        if (commandLine.hasOption('g')) {
+            String[] groupPerms = commandLine.getOptionValue('g').trim().split(";");
+            List<String> groupPermList = new ArrayList<String>();
+            if (groupPerms != null) {
+                for (String groupPerm : groupPerms) {
+                    groupPermList.add(groupPerm);
+                }
+            }
+            accessConfig.setGroupPerms(groupPermList);
+        }
+
+        Assert.assertTrue(accessConfig.getTopicPerms().contains("topicB=PUB|SUB"));
+        Assert.assertTrue(accessConfig.getGroupPerms().contains("groupB=SUB"));
+
+    }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java
similarity index 57%
copy from tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
copy to tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java
index 5ea03d6..66d609d 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.tools.command.topic;
+package org.apache.rocketmq.tools.command.acl;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -24,31 +24,16 @@ import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-public class UpdateTopicSubCommandTest {
+public class UpdateGlobalWhiteAddrSubCommandTest {
+
     @Test
     public void testExecute() {
-        UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
+        UpdateGlobalWhiteAddrSubCommand cmd = new UpdateGlobalWhiteAddrSubCommand();
         Options options = ServerUtil.buildCommandlineOptions(new Options());
-        String[] subargs = new String[] {
-            "-b 127.0.0.1:10911",
-            "-c default-cluster",
-            "-t unit-test",
-            "-r 8",
-            "-w 8",
-            "-p 6",
-            "-o false",
-            "-u false",
-            "-s false"};
+        String[] subargs = new String[] {"-g 10.10.103.*,192.168.0.*", "-c default-cluster"};
         final CommandLine commandLine =
             ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
-        assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
+        assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("10.10.103.*,192.168.0.*");
         assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
-        assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
-        assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
-        assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
-        assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
-        assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false");
-        assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false");
-        assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false");
     }
-}
\ No newline at end of file
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
index 5ea03d6..7e7863f 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
@@ -31,7 +31,6 @@ public class UpdateTopicSubCommandTest {
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         String[] subargs = new String[] {
             "-b 127.0.0.1:10911",
-            "-c default-cluster",
             "-t unit-test",
             "-r 8",
             "-w 8",
@@ -42,7 +41,6 @@ public class UpdateTopicSubCommandTest {
         final CommandLine commandLine =
             ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
         assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
-        assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
         assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
         assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
         assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");


Mime
View raw message