This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch enchanced_acl
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/enchanced_acl by this push:
new a2165d8 [ISSUE #1156]new mqadmin API for ACL configuration (#1217)
a2165d8 is described below
commit a2165d80517d46a3f8be62a1a4e3d3a2290afab8
Author: Hu Zongtang <huzongtang@cmss.chinamobile.com>
AuthorDate: Fri Jun 14 12:22:10 2019 +0800
[ISSUE #1156]new mqadmin API for ACL configuration (#1217)
* [issue#1164]return the codes to original reput method part.
* [issue#1164]fix issue that Consumer Instance can't consume message from slave when cluster is in the high level tps and master has been killed.
* [issue#1164]if the broker is a master node,then modify reputFromOffset correctly.
* [issue#1164]add some coding comments.
* [ISSUE#1156]new mqadmin API for ACL configuration.Add dataVersion and write data to acl yaml config file in the acl module.
* [ISSUE#1156]add unit test cases for new acl mqadmin API command which adding dataversion part.
* [ISSUE#1156]implement update,delete and query Acl config version in new acl mqadmin API command.
* [ISSUE#1156]polish and optimize the implementation for new acl mqadmin API command.
* [ISSUE#1156]fix the issues that topicPerms and groupPerms can't be updated and there is NPE when querying acl config version.
* [ISSUE#1156]fix small issue that specify wrong plainAccessConfig attribute,the correct one is AccessKey.
* [ISSUE#1156]add updateGlobalWhiteAddr subcommand codes for mqadmin acl command.
* [ISSUE#1156]adjust some codes for cluster acl config version list in the mqadmin acl commands.
* [ISSUE#1156]add acl mqadmin command part in the acl user_guide docs.
* [ISSUE#1156]polish and optimize some part of acl mqadmin command codes.
* [ISSUE#1156]fix code comments issue that the first letter needs to be capitalized and adjust the contents of acl user_guide.
---
.../org/apache/rocketmq/acl/AccessValidator.java | 31 +++
.../apache/rocketmq/acl/common/AclConstants.java | 50 ++++
.../org/apache/rocketmq/acl/common/AclUtils.java | 34 ++-
.../rocketmq/acl/plain/PlainAccessResource.java | 2 +-
.../rocketmq/acl/plain/PlainAccessValidator.java | 28 +-
...sionLoader.java => PlainPermissionManager.java} | 282 ++++++++++++++-------
.../apache/rocketmq/acl/common/AclUtilsTest.java | 73 +++++-
.../acl/plain/PlainAccessValidatorTest.java | 264 +++++++++++++++++++
...erTest.java => PlainPermissionManagerTest.java} | 93 +++----
acl/src/test/resources/conf/plain_acl_correct.yml | 22 ++
acl/src/test/resources/conf/plain_acl_delete.yml | 22 ++
.../conf/plain_acl_global_white_addrs.yml | 22 ++
.../resources/conf/plain_acl_update_create.yml | 22 ++
.../resources/conf/plain_acl_with_no_accouts.yml | 3 +
.../apache/rocketmq/broker/BrokerController.java | 8 +-
.../broker/processor/AdminBrokerProcessor.java | 149 +++++++++++
.../rocketmq/client/impl/MQClientAPIImpl.java | 105 ++++++++
.../rocketmq/client/impl/MQClientAPIImplTest.java | 132 ++++++++++
.../apache/rocketmq/common/PlainAccessConfig.java | 102 ++++++++
.../java/org/apache/rocketmq/common/UtilAll.java | 28 +-
.../rocketmq/common/protocol/RequestCode.java | 8 +
.../rocketmq/common/protocol/ResponseCode.java | 7 +
.../protocol/body/ClusterAclVersionInfo.java | 64 +++++
.../header/CreateAccessConfigRequestHeader.java | 113 +++++++++
.../header/DeleteAccessConfigRequestHeader.java | 42 +--
.../header/GetBrokerAclConfigResponseHeader.java | 71 ++++++
.../UpdateGlobalWhiteAddrsConfigRequestHeader.java | 35 +--
docs/cn/acl/user_guide.md | 73 +++++-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 24 ++
.../tools/admin/DefaultMQAdminExtImpl.java | 25 ++
.../apache/rocketmq/tools/admin/MQAdminExt.java | 14 +
.../rocketmq/tools/command/MQAdminStartup.java | 10 +
.../acl/ClusterAclConfigVersionListSubCommand.java | 131 ++++++++++
.../command/acl/DeleteAccessConfigSubCommand.java | 106 ++++++++
.../command/acl/UpdateAccessConfigSubCommand.java | 185 ++++++++++++++
.../acl/UpdateGlobalWhiteAddrSubCommand.java | 101 ++++++++
...ClusterAclConfigVersionListSubCommandTest.java} | 28 +-
.../DeleteAccessConfigSubCommandTest.java} | 29 +--
.../acl/UpdateAccessConfigSubCommandTest.java | 89 +++++++
.../UpdateGlobalWhiteAddrSubCommandTest.java} | 29 +--
.../command/topic/UpdateTopicSubCommandTest.java | 2 -
41 files changed, 2397 insertions(+), 261 deletions(-)
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
index c915cf3..b87cc2f 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
@@ -17,9 +17,12 @@
package org.apache.rocketmq.acl;
+import java.util.List;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface AccessValidator {
+
/**
* Parse to get the AccessResource(user, resource, needed permission)
*
@@ -35,4 +38,32 @@ public interface AccessValidator {
* @param accessResource
*/
void validate(AccessResource accessResource);
+
+ /**
+ * Update the access resource config
+ *
+ * @param plainAccessConfig
+ * @return
+ */
+ boolean updateAccessConfig(PlainAccessConfig plainAccessConfig);
+
+ /**
+ * Delete the access resource config
+ *
+ * @return
+ */
+ boolean deleteAccessConfig(String accesskey);
+
+ /**
+ * Get the access resource config version information
+ *
+ * @return
+ */
+ String getAclConfigVersion();
+
+ /**
+ * Update globalWhiteRemoteAddresses in acl yaml config file
+ * @return
+ */
+ boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList);
}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java
new file mode 100644
index 0000000..bfe96f5
--- /dev/null
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.acl.common;
+
+public class AclConstants {
+
+ public static final String CONFIG_GLOBAL_WHITE_ADDRS = "globalWhiteRemoteAddresses";
+
+ public static final String CONFIG_ACCOUNTS = "accounts";
+
+ public static final String CONFIG_ACCESS_KEY = "accessKey";
+
+ public static final String CONFIG_SECRET_KEY = "secretKey";
+
+ public static final String CONFIG_WHITE_ADDR = "whiteRemoteAddress";
+
+ public static final String CONFIG_ADMIN_ROLE = "admin";
+
+ public static final String CONFIG_DEFAULT_TOPIC_PERM = "defaultTopicPerm";
+
+ public static final String CONFIG_DEFAULT_GROUP_PERM = "defaultGroupPerm";
+
+ public static final String CONFIG_TOPIC_PERMS = "topicPerms";
+
+ public static final String CONFIG_GROUP_PERMS = "groupPerms";
+
+ public static final String CONFIG_DATA_VERSION = "dataVersion";
+
+ public static final String CONFIG_COUNTER = "counter";
+
+ public static final String CONFIG_TIME_STAMP = "timestamp";
+
+ public static final int ACCESS_KEY_MIN_LENGTH = 6;
+
+ public static final int SECRET_KEY_MIN_LENGTH = 6;
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
index 39f75a3..20e1cfa 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
@@ -20,7 +20,9 @@ import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileWriter;
import java.io.IOException;
+import java.io.PrintWriter;
import java.util.Map;
import java.util.SortedMap;
import org.apache.commons.lang3.StringUtils;
@@ -48,7 +50,7 @@ public class AclUtils {
return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
} catch (Exception e) {
- throw new RuntimeException("incompatible exception.", e);
+ throw new RuntimeException("Incompatible exception.", e);
}
}
@@ -69,7 +71,7 @@ public class AclUtils {
public static void verify(String netaddress, int index) {
if (!AclUtils.isScope(netaddress, index)) {
- throw new AclException(String.format("netaddress examine scope Exception netaddress is %s", netaddress));
+ throw new AclException(String.format("Netaddress examine scope Exception netaddress is %s", netaddress));
}
}
@@ -127,11 +129,11 @@ public class AclUtils {
}
public static <T> T getYamlDataObject(String path, Class<T> clazz) {
- Yaml ymal = new Yaml();
+ Yaml yaml = new Yaml();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(path));
- return ymal.loadAs(fis, clazz);
+ return yaml.loadAs(fis, clazz);
} catch (FileNotFoundException ignore) {
return null;
} catch (Exception e) {
@@ -146,13 +148,31 @@ public class AclUtils {
}
}
+ public static boolean writeDataObject(String path, Map<String,Object> dataMap) {
+ Yaml yaml = new Yaml();
+ PrintWriter pw = null;
+ try {
+ pw = new PrintWriter(new FileWriter(path));
+ String dumpAsMap = yaml.dumpAsMap(dataMap);
+ pw.print(dumpAsMap);
+ pw.flush();
+ } catch (Exception e) {
+ throw new AclException(e.getMessage());
+ } finally {
+ if (pw != null) {
+ pw.close();
+ }
+ }
+ return true;
+ }
+
public static RPCHook getAclRPCHook(String fileName) {
JSONObject yamlDataObject = null;
try {
yamlDataObject = AclUtils.getYamlDataObject(fileName,
JSONObject.class);
} catch (Exception e) {
- log.error("convert yaml file to data object error, ",e);
+ log.error("Convert yaml file to data object error, ",e);
return null;
}
@@ -161,8 +181,8 @@ public class AclUtils {
return null;
}
- String accessKey = yamlDataObject.getString("accessKey");
- String secretKey = yamlDataObject.getString("secretKey");
+ String accessKey = yamlDataObject.getString(AclConstants.CONFIG_ACCESS_KEY);
+ String secretKey = yamlDataObject.getString(AclConstants.CONFIG_SECRET_KEY);
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
log.warn("AccessKey or secretKey is blank, the acl is not enabled.");
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
index 00072e8..a0cceed 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessResource.java
@@ -43,7 +43,7 @@ public class PlainAccessResource implements AccessResource {
private int requestCode;
- //the content to calculate the content
+ // The content to calculate the content
private byte[] content;
private String signature;
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
index 11b2a43..c8ce239 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.acl.plain;
+import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -25,6 +26,7 @@ import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
@@ -38,10 +40,10 @@ import static org.apache.rocketmq.acl.plain.PlainAccessResource.getRetryTopic;
public class PlainAccessValidator implements AccessValidator {
- private PlainPermissionLoader aclPlugEngine;
+ private PlainPermissionManager aclPlugEngine;
public PlainAccessValidator() {
- aclPlugEngine = new PlainPermissionLoader();
+ aclPlugEngine = new PlainPermissionManager();
}
@Override
@@ -56,8 +58,8 @@ public class PlainAccessValidator implements AccessValidator {
accessResource.setRequestCode(request.getCode());
if (request.getExtFields() == null) {
- //If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern)
- //The following logic codes depend on the request's extFields not to be null.
+ // If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern)
+ // The following logic codes depend on the request's extFields not to be null.
return accessResource;
}
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
@@ -135,4 +137,22 @@ public class PlainAccessValidator implements AccessValidator {
aclPlugEngine.validate((PlainAccessResource) accessResource);
}
+ @Override
+ public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) {
+ return aclPlugEngine.updateAccessConfig(plainAccessConfig);
+ }
+
+ @Override
+ public boolean deleteAccessConfig(String accesskey) {
+ return aclPlugEngine.deleteAccessConfig(accesskey);
+ }
+
+ @Override public String getAclConfigVersion() {
+ return aclPlugEngine.getAclConfigDataVersion();
+ }
+
+ @Override public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
+ return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
+ }
+
}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
similarity index 53%
rename from acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java
rename to acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
index e97825d..fc7f0f3 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
@@ -21,27 +21,29 @@ import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.srvutil.FileWatchService;
-public class PlainPermissionLoader {
+public class PlainPermissionManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml";
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
@@ -55,7 +57,9 @@ public class PlainPermissionLoader {
private boolean isWatchStart;
- public PlainPermissionLoader() {
+ private final DataVersion dataVersion = new DataVersion();
+
+ public PlainPermissionManager() {
load();
watch();
}
@@ -67,7 +71,6 @@ public class PlainPermissionLoader {
JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
JSONObject.class);
-
if (plainAclConfData == null || plainAclConfData.isEmpty()) {
throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName));
}
@@ -80,7 +83,7 @@ public class PlainPermissionLoader {
}
}
- JSONArray accounts = plainAclConfData.getJSONArray("accounts");
+ JSONArray accounts = plainAclConfData.getJSONArray(AclConstants.CONFIG_ACCOUNTS);
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
@@ -89,10 +92,184 @@ public class PlainPermissionLoader {
}
}
+ // For loading dataversion part just
+ JSONArray tempDataVersion = plainAclConfData.getJSONArray(AclConstants.CONFIG_DATA_VERSION);
+ if (tempDataVersion != null && !tempDataVersion.isEmpty()) {
+ List<DataVersion> dataVersion = tempDataVersion.toJavaList(DataVersion.class);
+ DataVersion firstElement = dataVersion.get(0);
+ this.dataVersion.assignNewOne(firstElement);
+ }
+
this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
this.plainAccessResourceMap = plainAccessResourceMap;
}
+ public String getAclConfigDataVersion() {
+ return this.dataVersion.toJson();
+ }
+
+ private Map<String, Object> updateAclConfigFileVersion(Map<String, Object> updateAclConfigMap) {
+
+ dataVersion.nextVersion();
+ List<Map<String, Object>> versionElement = new ArrayList<Map<String, Object>>();
+ Map<String, Object> accountsMap = new LinkedHashMap<String, Object>() {
+ {
+ put(AclConstants.CONFIG_COUNTER, dataVersion.getCounter().longValue());
+ put(AclConstants.CONFIG_TIME_STAMP, dataVersion.getTimestamp());
+ }
+ };
+ versionElement.add(accountsMap);
+ updateAclConfigMap.put(AclConstants.CONFIG_DATA_VERSION, versionElement);
+ return updateAclConfigMap;
+ }
+
+ public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) {
+
+ if (plainAccessConfig == null) {
+ log.error("Parameter value plainAccessConfig is null,Please check your parameter");
+ return false;
+ }
+
+ Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
+ Map.class);
+
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>) aclAccessConfigMap.get(AclConstants.CONFIG_ACCOUNTS);
+ Map<String, Object> updateAccountMap = null;
+ if (accounts != null) {
+ for (Map<String, Object> account : accounts) {
+ if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
+ // Update acl access config elements
+ accounts.remove(account);
+ updateAccountMap = createAclAccessConfigMap(account, plainAccessConfig);
+ accounts.add(updateAccountMap);
+ aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+
+ if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
+ return true;
+ }
+ return false;
+ }
+ }
+ // Create acl access config elements
+ accounts.add(createAclAccessConfigMap(null, plainAccessConfig));
+ aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+ if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
+ return true;
+ }
+ return false;
+ }
+
+ log.error("Users must ensure that the acl yaml config file has accounts node element");
+ return false;
+ }
+
+ private Map<String, Object> createAclAccessConfigMap(Map<String, Object> existedAccoutMap, PlainAccessConfig plainAccessConfig) {
+
+
+ Map<String, Object> newAccountsMap = null;
+ if (existedAccoutMap == null) {
+ newAccountsMap = new LinkedHashMap<String, Object>();
+ } else {
+ newAccountsMap = existedAccoutMap;
+ }
+
+ if (StringUtils.isEmpty(plainAccessConfig.getAccessKey()) ||
+ plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH) {
+ throw new AclException(String.format(
+ "The accessKey=%s cannot be null and length should longer than 6",
+ plainAccessConfig.getAccessKey()));
+ }
+ newAccountsMap.put(AclConstants.CONFIG_ACCESS_KEY, plainAccessConfig.getAccessKey());
+
+ if (!StringUtils.isEmpty(plainAccessConfig.getSecretKey())) {
+ if (plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) {
+ throw new AclException(String.format(
+ "The secretKey=%s value length should longer than 6",
+ plainAccessConfig.getSecretKey()));
+ }
+ newAccountsMap.put(AclConstants.CONFIG_SECRET_KEY, (String) plainAccessConfig.getSecretKey());
+ }
+ if (!StringUtils.isEmpty(plainAccessConfig.getWhiteRemoteAddress())) {
+ newAccountsMap.put(AclConstants.CONFIG_WHITE_ADDR, plainAccessConfig.getWhiteRemoteAddress());
+ }
+ if (!StringUtils.isEmpty(String.valueOf(plainAccessConfig.isAdmin()))) {
+ newAccountsMap.put(AclConstants.CONFIG_ADMIN_ROLE, plainAccessConfig.isAdmin());
+ }
+ if (!StringUtils.isEmpty(plainAccessConfig.getDefaultTopicPerm())) {
+ newAccountsMap.put(AclConstants.CONFIG_DEFAULT_TOPIC_PERM, plainAccessConfig.getDefaultTopicPerm());
+ }
+ if (!StringUtils.isEmpty(plainAccessConfig.getDefaultGroupPerm())) {
+ newAccountsMap.put(AclConstants.CONFIG_DEFAULT_GROUP_PERM, plainAccessConfig.getDefaultGroupPerm());
+ }
+ if (plainAccessConfig.getTopicPerms() != null && !plainAccessConfig.getTopicPerms().isEmpty()) {
+ newAccountsMap.put(AclConstants.CONFIG_TOPIC_PERMS, plainAccessConfig.getTopicPerms());
+ }
+ if (plainAccessConfig.getGroupPerms() != null && !plainAccessConfig.getGroupPerms().isEmpty()) {
+ newAccountsMap.put(AclConstants.CONFIG_GROUP_PERMS, plainAccessConfig.getGroupPerms());
+ }
+
+ return newAccountsMap;
+ }
+
+ public boolean deleteAccessConfig(String accesskey) {
+ if (StringUtils.isEmpty(accesskey)) {
+ log.error("Parameter value accesskey is null or empty String,Please check your parameter");
+ return false;
+ }
+
+ Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
+ Map.class);
+
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>) aclAccessConfigMap.get("accounts");
+ if (accounts != null) {
+ Iterator<Map<String, Object>> itemIterator = accounts.iterator();
+ while (itemIterator.hasNext()) {
+
+ if (itemIterator.next().get(AclConstants.CONFIG_ACCESS_KEY).equals(accesskey)) {
+ // Delete the related acl config element
+ itemIterator.remove();
+ aclAccessConfigMap.put(AclConstants.CONFIG_ACCOUNTS, accounts);
+
+ if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
+ return true;
+ }
+ return false;
+ }
+ }
+ }
+ log.error("Users must ensure that the acl yaml config file has related acl config elements");
+
+ return false;
+ }
+
+ public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) {
+
+ if (globalWhiteAddrsList == null) {
+ log.error("Parameter value globalWhiteAddrsList is null,Please check your parameter");
+ return false;
+ }
+
+ Map<String, Object> aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
+ Map.class);
+
+ List<String> globalWhiteRemoteAddrList = (List<String>) aclAccessConfigMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
+
+ if (globalWhiteRemoteAddrList != null) {
+ globalWhiteRemoteAddrList.clear();
+ globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList);
+
+ // Update globalWhiteRemoteAddr element in memeory map firstly
+ aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS,globalWhiteRemoteAddrList);
+ if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
+ return true;
+ }
+ return false;
+ }
+
+ log.error("Users must ensure that the acl yaml config file has globalWhiteRemoteAddresses flag firstly");
+ return false;
+ }
+
private void watch() {
try {
String watchFilePath = fileHome + fileName;
@@ -156,8 +333,8 @@ public class PlainPermissionLoader {
public PlainAccessResource buildPlainAccessResource(PlainAccessConfig plainAccessConfig) throws AclException {
if (plainAccessConfig.getAccessKey() == null
|| plainAccessConfig.getSecretKey() == null
- || plainAccessConfig.getAccessKey().length() <= 6
- || plainAccessConfig.getSecretKey().length() <= 6) {
+ || plainAccessConfig.getAccessKey().length() <= AclConstants.ACCESS_KEY_MIN_LENGTH
+ || plainAccessConfig.getSecretKey().length() <= AclConstants.SECRET_KEY_MIN_LENGTH) {
throw new AclException(String.format(
"The accessKey=%s and secretKey=%s cannot be null and length should longer than 6",
plainAccessConfig.getAccessKey(), plainAccessConfig.getSecretKey()));
@@ -217,89 +394,4 @@ public class PlainPermissionLoader {
public boolean isWatchStart() {
return isWatchStart;
}
-
- static class PlainAccessConfig {
-
- private String accessKey;
-
- private String secretKey;
-
- private String whiteRemoteAddress;
-
- private boolean admin;
-
- private String defaultTopicPerm;
-
- private String defaultGroupPerm;
-
- private List<String> topicPerms;
-
- private List<String> groupPerms;
-
- public String getAccessKey() {
- return accessKey;
- }
-
- public void setAccessKey(String accessKey) {
- this.accessKey = accessKey;
- }
-
- public String getSecretKey() {
- return secretKey;
- }
-
- public void setSecretKey(String secretKey) {
- this.secretKey = secretKey;
- }
-
- public String getWhiteRemoteAddress() {
- return whiteRemoteAddress;
- }
-
- public void setWhiteRemoteAddress(String whiteRemoteAddress) {
- this.whiteRemoteAddress = whiteRemoteAddress;
- }
-
- public boolean isAdmin() {
- return admin;
- }
-
- public void setAdmin(boolean admin) {
- this.admin = admin;
- }
-
- public String getDefaultTopicPerm() {
- return defaultTopicPerm;
- }
-
- public void setDefaultTopicPerm(String defaultTopicPerm) {
- this.defaultTopicPerm = defaultTopicPerm;
- }
-
- public String getDefaultGroupPerm() {
- return defaultGroupPerm;
- }
-
- public void setDefaultGroupPerm(String defaultGroupPerm) {
- this.defaultGroupPerm = defaultGroupPerm;
- }
-
- public List<String> getTopicPerms() {
- return topicPerms;
- }
-
- public void setTopicPerms(List<String> topicPerms) {
- this.topicPerms = topicPerms;
- }
-
- public List<String> getGroupPerms() {
- return groupPerms;
- }
-
- public void setGroupPerms(List<String> groupPerms) {
- this.groupPerms = groupPerms;
- }
-
- }
-
}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
index 4883afa..5b2627d 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
@@ -17,7 +17,11 @@
package org.apache.rocketmq.acl.common;
import com.alibaba.fastjson.JSONObject;
+import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -131,11 +135,78 @@ public class AclUtilsTest {
@Test
public void getYamlDataObjectTest() {
- Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl.yml", Map.class);
+ Map<String, Object> map = AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl_correct.yml", Map.class);
Assert.assertFalse(map.isEmpty());
}
@Test
+ public void writeDataObject2YamlFileTest() throws IOException{
+
+ String targetFileName = "src/test/resources/conf/plain_write_acl.yml";
+ File transport = new File(targetFileName);
+ transport.delete();
+ transport.createNewFile();
+
+ Map<String, Object> aclYamlMap = new HashMap<String, Object>();
+
+ // For globalWhiteRemoteAddrs element in acl yaml config file
+ List<String> globalWhiteRemoteAddrs = new ArrayList<String>();
+ globalWhiteRemoteAddrs.add("10.10.103.*");
+ globalWhiteRemoteAddrs.add("192.168.0.*");
+ aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs);
+
+ // For accounts element in acl yaml config file
+ List<Map<String, Object>> accounts = new ArrayList<Map<String, Object>>();
+ Map<String, Object> accountsMap = new LinkedHashMap<String, Object>() {
+ {
+ put("accessKey", "RocketMQ");
+ put("secretKey", "12345678");
+ put("whiteRemoteAddress", "whiteRemoteAddress");
+ put("admin", "true");
+ }
+ };
+ accounts.add(accountsMap);
+ aclYamlMap.put("accounts",accounts);
+ Assert.assertTrue(AclUtils.writeDataObject(targetFileName, aclYamlMap));
+
+ transport.delete();
+ }
+
+ @Test
+ public void updateExistedYamlFileTest() throws IOException{
+
+ String targetFileName = "src/test/resources/conf/plain_update_acl.yml";
+ File transport = new File(targetFileName);
+ transport.delete();
+ transport.createNewFile();
+
+ Map<String, Object> aclYamlMap = new HashMap<String, Object>();
+
+ // For globalWhiteRemoteAddrs element in acl yaml config file
+ List<String> globalWhiteRemoteAddrs = new ArrayList<String>();
+ globalWhiteRemoteAddrs.add("10.10.103.*");
+ globalWhiteRemoteAddrs.add("192.168.0.*");
+ aclYamlMap.put("globalWhiteRemoteAddrs",globalWhiteRemoteAddrs);
+
+ // Write file to yaml file
+ AclUtils.writeDataObject(targetFileName, aclYamlMap);
+
+ Map<String, Object> updatedMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+ List<String> globalWhiteRemoteAddrList = (List<String>) updatedMap.get("globalWhiteRemoteAddrs");
+ globalWhiteRemoteAddrList.clear();
+ globalWhiteRemoteAddrList.add("192.168.1.2");
+
+ // Update file and flush to yaml file
+ AclUtils.writeDataObject(targetFileName, updatedMap);
+
+ Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+ List<String> updatedGlobalWhiteRemoteAddrs = (List<String>) readableMap.get("globalWhiteRemoteAddrs");
+ Assert.assertEquals("192.168.1.2",updatedGlobalWhiteRemoteAddrs.get(0));
+
+ transport.delete();
+ }
+
+ @Test
public void getYamlDataIgnoreFileNotFoundExceptionTest() {
JSONObject yamlDataObject = AclUtils.getYamlDataObject("plain_acl.yml", JSONObject.class);
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
index b7cdb69..bca9075 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
@@ -16,14 +16,20 @@
*/
package org.apache.rocketmq.acl.plain;
+
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.AclConstants;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.*;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
@@ -297,4 +303,262 @@ public class PlainAccessValidatorTest {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress);
plainAccessValidator.validate(accessResource);
}
+
+ @Test
+ public void updateAccessAclYamlConfigNormalTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
+
+ String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
+ Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+ PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+ plainAccessConfig.setAccessKey("RocketMQ");
+ plainAccessConfig.setSecretKey("1234567890");
+ plainAccessConfig.setDefaultGroupPerm("PUB");
+ plainAccessConfig.setDefaultTopicPerm("SUB");
+ List<String> topicPerms = new ArrayList<String>();
+ topicPerms.add("topicC=PUB|SUB");
+ topicPerms.add("topicB=PUB");
+ plainAccessConfig.setTopicPerms(topicPerms);
+ List<String> groupPerms = new ArrayList<String>();
+ groupPerms.add("groupB=PUB|SUB");
+ groupPerms.add("groupC=DENY");
+ plainAccessConfig.setGroupPerms(groupPerms);
+
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ // Update acl access yaml config file
+ plainAccessValidator.updateAccessConfig(plainAccessConfig);
+
+ Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get("accounts");
+ Map<String, Object> verifyMap = null;
+ for (Map<String, Object> account : accounts) {
+ if (account.get("accessKey").equals(plainAccessConfig.getAccessKey())) {
+ verifyMap = account;
+ break;
+ }
+ }
+
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"1234567890");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"SUB");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_ADMIN_ROLE),false);
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_WHITE_ADDR),"192.168.0.*");
+ Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2);
+ Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2);
+
+ // Verify the dateversion element is correct or not
+ List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get("dataVersion");
+ Assert.assertEquals(1,dataVersions.get(0).get("counter"));
+
+ // Restore the backup file and flush to yaml file
+ AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+ }
+
+ @Test
+ public void updateAccessAclYamlConfigTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
+
+ String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
+ Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+ PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+ plainAccessConfig.setAccessKey("RocketMQ");
+ plainAccessConfig.setSecretKey("123456789111");
+
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ // Update element in the acl access yaml config file
+ plainAccessValidator.updateAccessConfig(plainAccessConfig);
+
+ Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+ Map<String, Object> verifyMap = null;
+ for (Map<String, Object> account : accounts) {
+ if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
+ verifyMap = account;
+ break;
+ }
+ }
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111");
+
+ // Restore the backup file and flush to yaml file
+ AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+ }
+
+
+ @Test
+ public void createAndUpdateAccessAclYamlConfigNormalTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
+
+ String targetFileName = "src/test/resources/conf/plain_acl_update_create.yml";
+ Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+ PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+ plainAccessConfig.setAccessKey("RocketMQ33");
+ plainAccessConfig.setSecretKey("123456789111");
+ plainAccessConfig.setDefaultGroupPerm("PUB");
+ plainAccessConfig.setDefaultTopicPerm("DENY");
+ List<String> topicPerms = new ArrayList<String>();
+ topicPerms.add("topicC=PUB|SUB");
+ topicPerms.add("topicB=PUB");
+ plainAccessConfig.setTopicPerms(topicPerms);
+ List<String> groupPerms = new ArrayList<String>();
+ groupPerms.add("groupB=PUB|SUB");
+ groupPerms.add("groupC=DENY");
+ plainAccessConfig.setGroupPerms(groupPerms);
+
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ // Create element in the acl access yaml config file
+ plainAccessValidator.updateAccessConfig(plainAccessConfig);
+
+ Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+ Map<String, Object> verifyMap = null;
+ for (Map<String, Object> account : accounts) {
+ if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig.getAccessKey())) {
+ verifyMap = account;
+ break;
+ }
+ }
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY),"123456789111");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM),"DENY");
+ Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM),"PUB");
+ Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(),2);
+ Assert.assertEquals(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(),2);
+ Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicC=PUB|SUB"));
+ Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).contains("topicB=PUB"));
+ Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupB=PUB|SUB"));
+ Assert.assertTrue(((List)verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).contains("groupC=DENY"));
+
+ // Verify the dateversion element is correct or not
+ List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
+ Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+
+ // Update element in the acl config yaml file
+ PlainAccessConfig plainAccessConfig2 = new PlainAccessConfig();
+ plainAccessConfig2.setAccessKey("rocketmq2");
+ plainAccessConfig2.setSecretKey("1234567890123");
+
+ // Update acl access yaml config file secondly
+ plainAccessValidator.updateAccessConfig(plainAccessConfig2);
+
+ Map<String, Object> readableMap2 = AclUtils.getYamlDataObject(targetFileName, Map.class);
+ List<Map<String, Object>> accounts2 = (List<Map<String, Object>>)readableMap2.get(AclConstants.CONFIG_ACCOUNTS);
+ Map<String, Object> verifyMap2 = null;
+ for (Map<String, Object> account : accounts2) {
+ if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(plainAccessConfig2.getAccessKey())) {
+ verifyMap2 = account;
+ break;
+ }
+ }
+
+ // Verify the dateversion element after updating is correct or not
+ List<Map<String, Object>> dataVersions2 = (List<Map<String, Object>>) readableMap2.get(AclConstants.CONFIG_DATA_VERSION);
+ Assert.assertEquals(2,dataVersions2.get(0).get(AclConstants.CONFIG_COUNTER));
+ Assert.assertEquals(verifyMap2.get(AclConstants.CONFIG_SECRET_KEY),"1234567890123");
+
+
+ // Restore the backup file and flush to yaml file
+ AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+ }
+
+ @Test(expected = AclException.class)
+ public void updateAccessAclYamlConfigExceptionTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_update_create.yml");
+
+ PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+ plainAccessConfig.setAccessKey("RocketMQ");
+ plainAccessConfig.setSecretKey("12345");
+
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ // Update acl access yaml config file
+ plainAccessValidator.updateAccessConfig(plainAccessConfig);
+ }
+
+ @Test
+ public void deleteAccessAclYamlConfigNormalTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_delete.yml");
+
+ String targetFileName = "src/test/resources/conf/plain_acl_delete.yml";
+ Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+
+ String accessKey = "rocketmq2";
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ plainAccessValidator.deleteAccessConfig(accessKey);
+
+ Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>)readableMap.get(AclConstants.CONFIG_ACCOUNTS);
+ Map<String, Object> verifyMap = null;
+ for (Map<String, Object> account : accounts) {
+ if (account.get(AclConstants.CONFIG_ACCESS_KEY).equals(accessKey)) {
+ verifyMap = account;
+ break;
+ }
+ }
+
+ // Verify the specified element is removed or not
+ Assert.assertEquals(verifyMap,null);
+ // Verify the dateversion element is correct or not
+ List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
+ Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+
+ // Restore the backup file and flush to yaml file
+ AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+ }
+
+ @Test
+ public void updateAccessAclYamlConfigWithNoAccoutsExceptionTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_with_no_accouts.yml");
+
+ String targetFileName = "src/test/resources/conf/plain_acl_with_no_accouts.yml";
+ Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+ PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
+ plainAccessConfig.setAccessKey("RocketMQ");
+ plainAccessConfig.setSecretKey("1234567890");
+
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ // Update acl access yaml config file and verify the return value is true
+ Assert.assertEquals(plainAccessValidator.updateAccessConfig(plainAccessConfig), false);
+ }
+
+ @Test
+ public void updateGlobalWhiteAddrsNormalTest() {
+ System.setProperty("rocketmq.home.dir", "src/test/resources");
+ System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_global_white_addrs.yml");
+
+ String targetFileName = "src/test/resources/conf/plain_acl_global_white_addrs.yml";
+ Map<String, Object> backUpAclConfigMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+ PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
+ // Update global white remote addr value list in the acl access yaml config file
+
+ List<String> globalWhiteAddrsList = new ArrayList<String>();
+ globalWhiteAddrsList.add("10.10.154.1");
+ globalWhiteAddrsList.add("10.10.154.2");
+ globalWhiteAddrsList.add("10.10.154.3");
+ plainAccessValidator.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList);
+
+ Map<String, Object> readableMap = AclUtils.getYamlDataObject(targetFileName, Map.class);
+
+ List<String> globalWhiteAddrList = (List<String>)readableMap.get(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS);
+ Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.1"));
+ Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.2"));
+ Assert.assertTrue(globalWhiteAddrList.contains("10.10.154.3"));
+
+ // Verify the dateversion element is correct or not
+ List<Map<String, Object>> dataVersions = (List<Map<String, Object>>) readableMap.get(AclConstants.CONFIG_DATA_VERSION);
+ Assert.assertEquals(1,dataVersions.get(0).get(AclConstants.CONFIG_COUNTER));
+
+ // Restore the backup file and flush to yaml file
+ AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
+ }
+
}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
similarity index 75%
rename from acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java
rename to acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
index 575c901..d5ffb0c 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionLoaderTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainPermissionManagerTest.java
@@ -26,32 +26,31 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.acl.common.Permission;
-import org.apache.rocketmq.acl.plain.PlainPermissionLoader.PlainAccessConfig;
-import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-public class PlainPermissionLoaderTest {
+public class PlainPermissionManagerTest {
- PlainPermissionLoader plainPermissionLoader;
+ PlainPermissionManager plainPermissionManager;
PlainAccessResource PUBPlainAccessResource;
PlainAccessResource SUBPlainAccessResource;
PlainAccessResource ANYPlainAccessResource;
PlainAccessResource DENYPlainAccessResource;
PlainAccessResource plainAccessResource = new PlainAccessResource();
PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
- PlainAccessResource plainAccessResourceTwo = new PlainAccessResource();
Set<Integer> adminCode = new HashSet<>();
@Before
public void init() throws NoSuchFieldException, SecurityException, IOException {
- // UPDATE_AND_CREATE_TOPIC
+ // UPDATE_AND_CREATE_TOPIC
adminCode.add(17);
- // UPDATE_BROKER_CONFIG
+ // UPDATE_BROKER_CONFIG
adminCode.add(25);
- // DELETE_TOPIC_IN_BROKER
+ // DELETE_TOPIC_IN_BROKER
adminCode.add(215);
// UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
adminCode.add(200);
@@ -65,7 +64,8 @@ public class PlainPermissionLoaderTest {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
- plainPermissionLoader = new PlainPermissionLoader();
+
+ plainPermissionManager = new PlainPermissionManager();
}
@@ -95,16 +95,16 @@ public class PlainPermissionLoaderTest {
plainAccess.setAccessKey("RocketMQ");
plainAccess.setSecretKey("12345678");
- plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+ plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getAccessKey(), "RocketMQ");
Assert.assertEquals(plainAccessResource.getSecretKey(), "12345678");
plainAccess.setWhiteRemoteAddress("127.0.0.1");
- plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+ plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.getWhiteRemoteAddress(), "127.0.0.1");
plainAccess.setAdmin(true);
- plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+ plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
Assert.assertEquals(plainAccessResource.isAdmin(), true);
List<String> groups = new ArrayList<String>();
@@ -112,7 +112,7 @@ public class PlainPermissionLoaderTest {
groups.add("groupB=PUB|SUB");
groups.add("groupC=PUB");
plainAccess.setGroupPerms(groups);
- plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+ plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
Map<String, Byte> resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 3);
@@ -125,7 +125,7 @@ public class PlainPermissionLoaderTest {
topics.add("topicB=PUB|SUB");
topics.add("topicC=PUB");
plainAccess.setTopicPerms(topics);
- plainAccessResource = plainPermissionLoader.buildPlainAccessResource(plainAccess);
+ plainAccessResource = plainPermissionManager.buildPlainAccessResource(plainAccess);
resourcePermMap = plainAccessResource.getResourcePermMap();
Assert.assertEquals(resourcePermMap.size(), 6);
@@ -138,7 +138,7 @@ public class PlainPermissionLoaderTest {
public void checkPermAdmin() {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.setRequestCode(17);
- plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
+ plainPermissionManager.checkPerm(plainAccessResource, PUBPlainAccessResource);
}
@Test
@@ -146,15 +146,15 @@ public class PlainPermissionLoaderTest {
PlainAccessResource plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
- plainPermissionLoader.checkPerm(plainAccessResource, PUBPlainAccessResource);
+ plainPermissionManager.checkPerm(plainAccessResource, PUBPlainAccessResource);
plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
- plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
+ plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource);
plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicB", Permission.SUB);
- plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
+ plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource);
plainAccessResource.addResourceAndPerm("topicA", Permission.PUB);
- plainPermissionLoader.checkPerm(plainAccessResource, ANYPlainAccessResource);
+ plainPermissionManager.checkPerm(plainAccessResource, ANYPlainAccessResource);
}
@Test(expected = AclException.class)
@@ -162,55 +162,58 @@ public class PlainPermissionLoaderTest {
plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicF", Permission.PUB);
- plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
+ plainPermissionManager.checkPerm(plainAccessResource, SUBPlainAccessResource);
}
@Test(expected = AclException.class)
public void accountNullTest() {
plainAccessConfig.setAccessKey(null);
- plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
+ plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void accountThanTest() {
plainAccessConfig.setAccessKey("123");
- plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
+ plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void passWordtNullTest() {
plainAccessConfig.setAccessKey(null);
- plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
+ plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void passWordThanTest() {
plainAccessConfig.setAccessKey("123");
- plainPermissionLoader.buildPlainAccessResource(plainAccessConfig);
+ plainPermissionManager.buildPlainAccessResource(plainAccessConfig);
}
@Test(expected = AclException.class)
public void testPlainAclPlugEngineInit() {
System.setProperty("rocketmq.home.dir", "");
- new PlainPermissionLoader().load();
+ new PlainPermissionManager().load();
}
@SuppressWarnings("unchecked")
@Test
public void cleanAuthenticationInfoTest() throws IllegalAccessException {
- //plainPermissionLoader.addPlainAccessResource(plainAccessResource);
- Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+ // PlainPermissionManager.addPlainAccessResource(plainAccessResource);
+ Map<String, List<PlainAccessResource>> plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
Assert.assertFalse(plainAccessResourceMap.isEmpty());
- plainPermissionLoader.clearPermissionInfo();
- plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+ plainPermissionManager.clearPermissionInfo();
+ plainAccessResourceMap = (Map<String, List<PlainAccessResource>>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
Assert.assertTrue(plainAccessResourceMap.isEmpty());
+ // RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml");
}
@Test
public void isWatchStartTest() {
- PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
- Assert.assertTrue(plainPermissionLoader.isWatchStart());
+ PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
+ Assert.assertTrue(plainPermissionManager.isWatchStart());
+ // RemoveDataVersionFromYamlFile("src/test/resources/conf/plain_acl.yml");
+
}
@@ -231,11 +234,12 @@ public class PlainPermissionLoaderTest {
writer.flush();
writer.close();
- PlainPermissionLoader plainPermissionLoader = new PlainPermissionLoader();
- Assert.assertTrue(plainPermissionLoader.isWatchStart());
+
+ PlainPermissionManager plainPermissionManager = new PlainPermissionManager();
+ Assert.assertTrue(plainPermissionManager.isWatchStart());
{
- Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+ Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq");
Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "12345678");
@@ -243,17 +247,19 @@ public class PlainPermissionLoaderTest {
}
- writer = new FileWriter(new File(fileName), true);
- writer.write("- accessKey: watchrocketmq1\r\n");
- writer.write(" secretKey: 88888888\r\n");
- writer.write(" whiteRemoteAddress: 127.0.0.1\r\n");
- writer.write(" admin: false\r\n");
- writer.flush();
- writer.close();
+ Map<String, Object> updatedMap = AclUtils.getYamlDataObject(fileName, Map.class);
+ List<Map<String, Object>> accounts = (List<Map<String, Object>>) updatedMap.get("accounts");
+ accounts.get(0).remove("accessKey");
+ accounts.get(0).remove("secretKey");
+ accounts.get(0).put("accessKey", "watchrocketmq1");
+ accounts.get(0).put("secretKey", "88888888");
+ accounts.get(0).put("admin", "false");
+ // Update file and flush to yaml file
+ AclUtils.writeDataObject(fileName, updatedMap);
Thread.sleep(1000);
{
- Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionLoader, "plainAccessResourceMap", true);
+ Map<String, PlainAccessResource> plainAccessResourceMap = (Map<String, PlainAccessResource>) FieldUtils.readDeclaredField(plainPermissionManager, "plainAccessResourceMap", true);
PlainAccessResource accessResource = plainAccessResourceMap.get("watchrocketmq1");
Assert.assertNotNull(accessResource);
Assert.assertEquals(accessResource.getSecretKey(), "88888888");
@@ -268,8 +274,7 @@ public class PlainPermissionLoaderTest {
@Test(expected = AclException.class)
public void initializeTest() {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml");
- new PlainPermissionLoader();
+ new PlainPermissionManager();
}
-
}
diff --git a/acl/src/test/resources/conf/plain_acl_correct.yml b/acl/src/test/resources/conf/plain_acl_correct.yml
new file mode 100644
index 0000000..40d66d9
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_correct.yml
@@ -0,0 +1,22 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+accounts:
+- accessKey: RocketMQ
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.0.*
+ admin: false
+ defaultTopicPerm: DENY
+ defaultGroupPerm: SUB
+ topicPerms:
+ - topicA=DENY
+ - topicB=PUB|SUB
+ - topicC=SUB
+ groupPerms:
+ - groupA=DENY
+ - groupB=SUB
+ - groupC=SUB
+- accessKey: rocketmq2
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.1.*
+ admin: true
diff --git a/acl/src/test/resources/conf/plain_acl_delete.yml b/acl/src/test/resources/conf/plain_acl_delete.yml
new file mode 100644
index 0000000..40d66d9
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_delete.yml
@@ -0,0 +1,22 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+accounts:
+- accessKey: RocketMQ
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.0.*
+ admin: false
+ defaultTopicPerm: DENY
+ defaultGroupPerm: SUB
+ topicPerms:
+ - topicA=DENY
+ - topicB=PUB|SUB
+ - topicC=SUB
+ groupPerms:
+ - groupA=DENY
+ - groupB=SUB
+ - groupC=SUB
+- accessKey: rocketmq2
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.1.*
+ admin: true
diff --git a/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml b/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml
new file mode 100644
index 0000000..40d66d9
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_global_white_addrs.yml
@@ -0,0 +1,22 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+accounts:
+- accessKey: RocketMQ
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.0.*
+ admin: false
+ defaultTopicPerm: DENY
+ defaultGroupPerm: SUB
+ topicPerms:
+ - topicA=DENY
+ - topicB=PUB|SUB
+ - topicC=SUB
+ groupPerms:
+ - groupA=DENY
+ - groupB=SUB
+ - groupC=SUB
+- accessKey: rocketmq2
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.1.*
+ admin: true
diff --git a/acl/src/test/resources/conf/plain_acl_update_create.yml b/acl/src/test/resources/conf/plain_acl_update_create.yml
new file mode 100644
index 0000000..40d66d9
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_update_create.yml
@@ -0,0 +1,22 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
+accounts:
+- accessKey: RocketMQ
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.0.*
+ admin: false
+ defaultTopicPerm: DENY
+ defaultGroupPerm: SUB
+ topicPerms:
+ - topicA=DENY
+ - topicB=PUB|SUB
+ - topicC=SUB
+ groupPerms:
+ - groupA=DENY
+ - groupB=SUB
+ - groupC=SUB
+- accessKey: rocketmq2
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.1.*
+ admin: true
diff --git a/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml b/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml
new file mode 100644
index 0000000..08274b1
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_with_no_accouts.yml
@@ -0,0 +1,3 @@
+globalWhiteRemoteAddresses:
+- 10.10.103.*
+- 192.168.0.*
\ No newline at end of file
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 427f861..56e3fe4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -162,7 +163,7 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
-
+ private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();
public BrokerController(
final BrokerConfig brokerConfig,
@@ -502,6 +503,7 @@ public class BrokerController {
for (AccessValidator accessValidator: accessValidators) {
final AccessValidator validator = accessValidator;
+ accessValidatorMap.put(validator.getClass(),validator);
this.registerServerRPCHook(new RPCHook() {
@Override
@@ -1101,7 +1103,9 @@ public class BrokerController {
}
-
+ public Map<Class, AccessValidator> getAccessValidatorMap() {
+ return accessValidatorMap;
+ }
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 73fe439..f23cca6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -30,6 +30,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -37,6 +39,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -44,6 +47,10 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -201,6 +208,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return fetchAllConsumeStatsInBroker(ctx, request);
case RequestCode.QUERY_CONSUME_QUEUE:
return queryConsumeQueue(ctx, request);
+ case RequestCode.UPDATE_AND_CREATE_ACL_CONFIG:
+ return updateAndCreateAccessConfig(ctx, request);
+ case RequestCode.DELETE_ACL_CONFIG:
+ return deleteAccessConfig(ctx, request);
+ case RequestCode.GET_BROKER_CLUSTER_ACL_INFO:
+ return getBrokerAclConfigVersion(ctx, request);
+ case RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG:
+ return updateGlobalWhiteAddrsConfig(ctx, request);
default:
break;
}
@@ -269,6 +284,140 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
+ private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ final CreateAccessConfigRequestHeader requestHeader =
+ (CreateAccessConfigRequestHeader) request.decodeCommandCustomHeader(CreateAccessConfigRequestHeader.class);
+
+ PlainAccessConfig accessConfig = new PlainAccessConfig();
+ accessConfig.setAccessKey(requestHeader.getAccessKey());
+ accessConfig.setSecretKey(requestHeader.getSecretKey());
+ accessConfig.setWhiteRemoteAddress(requestHeader.getWhiteRemoteAddress());
+ accessConfig.setDefaultTopicPerm(requestHeader.getDefaultTopicPerm());
+ accessConfig.setDefaultGroupPerm(requestHeader.getDefaultGroupPerm());
+ accessConfig.setTopicPerms(UtilAll.String2List(requestHeader.getTopicPerms(),","));
+ accessConfig.setGroupPerms(UtilAll.String2List(requestHeader.getGroupPerms(),","));
+ accessConfig.setAdmin(requestHeader.isAdmin());
+ try {
+
+ AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
+ if (accessValidator.updateAccessConfig(accessConfig)) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ response.setRemark(null);
+ ctx.writeAndFlush(response);
+ } else {
+ String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been updated failed.";
+ log.warn(errorMsg);
+ response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
+ response.setRemark(errorMsg);
+ return response;
+ }
+ } catch (Exception e) {
+ log.error("Failed to generate a proper update accessvalidator response", e);
+ response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+
+ return null;
+ }
+
+ private synchronized RemotingCommand deleteAccessConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ final DeleteAccessConfigRequestHeader requestHeader =
+ (DeleteAccessConfigRequestHeader) request.decodeCommandCustomHeader(DeleteAccessConfigRequestHeader.class);
+ log.info("DeleteAccessConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ try {
+ String accessKey = requestHeader.getAccessKey();
+ AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
+ if (accessValidator.deleteAccessConfig(accessKey)) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ response.setRemark(null);
+ ctx.writeAndFlush(response);
+ } else {
+ String errorMsg = "The accesskey[" + requestHeader.getAccessKey() + "] corresponding to accessConfig has been deleted failed.";
+ log.warn(errorMsg);
+ response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
+ response.setRemark(errorMsg);
+ return response;
+ }
+
+ } catch (Exception e) {
+ log.error("Failed to generate a proper delete accessvalidator response", e);
+ response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+
+ return null;
+ }
+
+ private synchronized RemotingCommand updateGlobalWhiteAddrsConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ final UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader =
+ (UpdateGlobalWhiteAddrsConfigRequestHeader) request.decodeCommandCustomHeader(UpdateGlobalWhiteAddrsConfigRequestHeader.class);
+
+ try {
+ AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
+ if (accessValidator.updateGlobalWhiteAddrsConfig(UtilAll.String2List(requestHeader.getGlobalWhiteAddrs(),","))) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ response.setRemark(null);
+ ctx.writeAndFlush(response);
+ } else {
+ String errorMsg = "The globalWhiteAddresses[" + requestHeader.getGlobalWhiteAddrs() + "] has been updated failed.";
+ log.warn(errorMsg);
+ response.setCode(ResponseCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED);
+ response.setRemark(errorMsg);
+ return response;
+ }
+ } catch (Exception e) {
+ log.error("Failed to generate a proper update globalWhiteAddresses response", e);
+ response.setCode(ResponseCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+
+ return null;
+ }
+
+ private RemotingCommand getBrokerAclConfigVersion(ChannelHandlerContext ctx, RemotingCommand request) {
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerAclConfigResponseHeader.class);
+
+ final GetBrokerAclConfigResponseHeader responseHeader = (GetBrokerAclConfigResponseHeader)response.readCustomHeader();
+
+ try {
+ AccessValidator accessValidator = this.brokerController.getAccessValidatorMap().get(PlainAccessValidator.class);
+
+ responseHeader.setVersion(accessValidator.getAclConfigVersion());
+ responseHeader.setBrokerAddr(this.brokerController.getBrokerAddr());
+ responseHeader.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+ responseHeader.setClusterName(this.brokerController.getBrokerConfig().getBrokerClusterName());
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ } catch (Exception e) {
+ log.error("Failed to generate a proper getBrokerAclConfigVersion response", e);
+ }
+
+ return null;
+ }
+
private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);
// final GetAllTopicConfigResponseHeader responseHeader =
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 9048ab8..c3382ca 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -44,8 +44,10 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
@@ -63,6 +65,7 @@ import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -86,10 +89,13 @@ import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteAccessConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
@@ -123,6 +129,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
@@ -284,6 +291,104 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
+ public void createPlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig,
+ final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ CreateAccessConfigRequestHeader requestHeader = new CreateAccessConfigRequestHeader();
+ requestHeader.setAccessKey(plainAccessConfig.getAccessKey());
+ requestHeader.setSecretKey(plainAccessConfig.getSecretKey());
+ requestHeader.setAdmin(plainAccessConfig.isAdmin());
+ requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
+ requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
+ requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
+ requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),","));
+ requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),","));
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public void deleteAccessConfig(final String addr, final String accessKey, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ DeleteAccessConfigRequestHeader requestHeader = new DeleteAccessConfigRequestHeader();
+ requestHeader.setAccessKey(accessKey);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_ACL_CONFIG, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+ UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
+ requestHeader.setGlobalWhiteAddrs(globalWhiteAddrs);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_GLOBAL_WHITE_ADDRS_CONFIG, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetBrokerAclConfigResponseHeader responseHeader =
+ (GetBrokerAclConfigResponseHeader) response.decodeCommandCustomHeader(GetBrokerAclConfigResponseHeader.class);
+
+ ClusterAclVersionInfo clusterAclVersionInfo = new ClusterAclVersionInfo();
+ clusterAclVersionInfo.setClusterName(responseHeader.getClusterName());
+ clusterAclVersionInfo.setBrokerName(responseHeader.getBrokerName());
+ clusterAclVersionInfo.setBrokerAddr(responseHeader.getBrokerAddr());
+ clusterAclVersionInfo.setAclConfigDataVersion(DataVersion.fromJson(responseHeader.getVersion(), DataVersion.class));
+ return clusterAclVersionInfo;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+
+ }
+
public SendResult sendMessage(
final String addr,
final String brokerName,
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index c13e75c..e3a25b9 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -19,17 +19,24 @@ package org.apache.rocketmq.client.impl;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.common.protocol.header.CreateAccessConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -210,6 +217,79 @@ public class MQClientAPIImplTest {
}
}
+ @Test
+ public void testCreatePlainAccessConfig_Success() throws InterruptedException, RemotingException, MQBrokerException {
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ RemotingCommand request = mock.getArgument(1);
+ return createSuccessResponse4UpdateAclConfig(request);
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ PlainAccessConfig config = createUpdateAclConfig();
+
+ try {
+ mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000);
+ } catch (MQClientException ex) {
+
+ }
+ }
+
+ @Test
+ public void testCreatePlainAccessConfig_Exception() throws InterruptedException, RemotingException, MQBrokerException {
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ RemotingCommand request = mock.getArgument(1);
+ return createErrorResponse4UpdateAclConfig(request);
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ PlainAccessConfig config = createUpdateAclConfig();
+ try {
+ mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000);
+ } catch (MQClientException ex) {
+ assertThat(ex.getResponseCode()).isEqualTo(209);
+ assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been updated failed");
+ }
+ }
+
+ @Test
+ public void testDeleteAccessConfig_Success() throws InterruptedException, RemotingException, MQBrokerException {
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ RemotingCommand request = mock.getArgument(1);
+ return createSuccessResponse4DeleteAclConfig(request);
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+
+ }
+
+ @Test
+ public void testDeleteAccessConfig_Exception() throws InterruptedException, RemotingException, MQBrokerException {
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ RemotingCommand request = mock.getArgument(1);
+ return createErrorResponse4DeleteAclConfig(request);
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ try {
+ mqClientAPI.deleteAccessConfig(brokerAddr, "11111", 3 * 1000);
+ } catch (MQClientException ex) {
+ assertThat(ex.getResponseCode()).isEqualTo(210);
+ assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
+ }
+ }
+
private RemotingCommand createSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
@@ -228,6 +308,58 @@ public class MQClientAPIImplTest {
return response;
}
+ private RemotingCommand createSuccessResponse4UpdateAclConfig(RemotingCommand request) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ response.setRemark(null);
+
+ return response;
+ }
+
+ private RemotingCommand createSuccessResponse4DeleteAclConfig(RemotingCommand request) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ response.setRemark(null);
+
+ return response;
+ }
+
+ private RemotingCommand createErrorResponse4UpdateAclConfig(RemotingCommand request) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.UPDATE_AND_CREATE_ACL_CONFIG_FAILED);
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ response.setRemark("corresponding to accessConfig has been updated failed");
+
+ return response;
+ }
+
+ private RemotingCommand createErrorResponse4DeleteAclConfig(RemotingCommand request) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.DELETE_ACL_CONFIG_FAILED);
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ response.setRemark("corresponding to accessConfig has been deleted failed");
+
+ return response;
+ }
+
+ private PlainAccessConfig createUpdateAclConfig() {
+
+ PlainAccessConfig config = new PlainAccessConfig();
+ config.setAccessKey("Rocketmq111");
+ config.setSecretKey("123456789");
+ config.setAdmin(true);
+ config.setWhiteRemoteAddress("127.0.0.1");
+ config.setDefaultTopicPerm("DENY");
+ config.setDefaultGroupPerm("SUB");
+ return config;
+ }
+
private SendMessageRequestHeader createSendMessageRequestHeader() {
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setBornTimestamp(System.currentTimeMillis());
diff --git a/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java b/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java
new file mode 100644
index 0000000..b193f43
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/PlainAccessConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import java.util.List;
+
+public class PlainAccessConfig {
+
+ private String accessKey;
+
+ private String secretKey;
+
+ private String whiteRemoteAddress;
+
+ private boolean admin;
+
+ private String defaultTopicPerm;
+
+ private String defaultGroupPerm;
+
+ private List<String> topicPerms;
+
+ private List<String> groupPerms;
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getWhiteRemoteAddress() {
+ return whiteRemoteAddress;
+ }
+
+ public void setWhiteRemoteAddress(String whiteRemoteAddress) {
+ this.whiteRemoteAddress = whiteRemoteAddress;
+ }
+
+ public boolean isAdmin() {
+ return admin;
+ }
+
+ public void setAdmin(boolean admin) {
+ this.admin = admin;
+ }
+
+ public String getDefaultTopicPerm() {
+ return defaultTopicPerm;
+ }
+
+ public void setDefaultTopicPerm(String defaultTopicPerm) {
+ this.defaultTopicPerm = defaultTopicPerm;
+ }
+
+ public String getDefaultGroupPerm() {
+ return defaultGroupPerm;
+ }
+
+ public void setDefaultGroupPerm(String defaultGroupPerm) {
+ this.defaultGroupPerm = defaultGroupPerm;
+ }
+
+ public List<String> getTopicPerms() {
+ return topicPerms;
+ }
+
+ public void setTopicPerms(List<String> topicPerms) {
+ this.topicPerms = topicPerms;
+ }
+
+ public List<String> getGroupPerms() {
+ return groupPerms;
+ }
+
+ public void setGroupPerms(List<String> groupPerms) {
+ this.groupPerms = groupPerms;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index dee6ca2..33674dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -28,15 +28,17 @@ import java.net.NetworkInterface;
import java.text.NumberFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.Enumeration;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
-
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -529,4 +531,28 @@ public class UtilAll {
file.delete();
}
}
+
+ public static String List2String(List<String> list,String splitor) {
+ if (list == null || list.size() == 0) {
+ return null;
+ }
+ StringBuffer str = new StringBuffer();
+ for (int i = 0;i < list.size();i++) {
+ str.append(list.get(i));
+ if (i == list.size() - 1) {
+ continue;
+ }
+ str.append(splitor);
+ }
+ return str.toString();
+ }
+
+ public static List<String> String2List(String str,String splitor) {
+ if (StringUtils.isEmpty(str)) {
+ return null;
+ }
+
+ String[] addrArray = str.split(splitor);
+ return Arrays.asList(addrArray);
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 8cf2d46..b771b77 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -70,6 +70,14 @@ public class RequestCode {
public static final int CHECK_CLIENT_CONFIG = 46;
+ public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;
+
+ public static final int DELETE_ACL_CONFIG = 51;
+
+ public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;
+
+ public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
+
public static final int PUT_KV_CONFIG = 100;
public static final int GET_KV_CONFIG = 101;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index f62c4ea..dc74444 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -73,4 +73,11 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int CONSUME_MSG_TIMEOUT = 207;
public static final int NO_MESSAGE = 208;
+
+ public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;
+
+ public static final int DELETE_ACL_CONFIG_FAILED = 210;
+
+ public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java
new file mode 100644
index 0000000..aeae9d5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterAclVersionInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class ClusterAclVersionInfo extends RemotingSerializable {
+
+ private String brokerName;
+
+ private String brokerAddr;
+
+ private DataVersion aclConfigDataVersion;
+
+ private String clusterName;
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+ public void setBrokerAddr(String brokerAddr) {
+ this.brokerAddr = brokerAddr;
+ }
+
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public DataVersion getAclConfigDataVersion() {
+ return aclConfigDataVersion;
+ }
+
+ public void setAclConfigDataVersion(DataVersion aclConfigDataVersion) {
+ this.aclConfigDataVersion = aclConfigDataVersion;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java
new file mode 100644
index 0000000..36990fc
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateAccessConfigRequestHeader.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class CreateAccessConfigRequestHeader implements CommandCustomHeader {
+
+ @CFNotNull
+ private String accessKey;
+
+ private String secretKey;
+
+ private String whiteRemoteAddress;
+
+ private boolean admin;
+
+ private String defaultTopicPerm;
+
+ private String defaultGroupPerm;
+
+ // list string,eg: topicA=DENY,topicD=SUB
+ private String topicPerms;
+
+ // list string,eg: groupD=DENY,groupD=SUB
+ private String groupPerms;
+
+
+ @Override public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getWhiteRemoteAddress() {
+ return whiteRemoteAddress;
+ }
+
+ public void setWhiteRemoteAddress(String whiteRemoteAddress) {
+ this.whiteRemoteAddress = whiteRemoteAddress;
+ }
+
+ public boolean isAdmin() {
+ return admin;
+ }
+
+ public void setAdmin(boolean admin) {
+ this.admin = admin;
+ }
+
+ public String getDefaultTopicPerm() {
+ return defaultTopicPerm;
+ }
+
+ public void setDefaultTopicPerm(String defaultTopicPerm) {
+ this.defaultTopicPerm = defaultTopicPerm;
+ }
+
+ public String getDefaultGroupPerm() {
+ return defaultGroupPerm;
+ }
+
+ public void setDefaultGroupPerm(String defaultGroupPerm) {
+ this.defaultGroupPerm = defaultGroupPerm;
+ }
+
+ public String getTopicPerms() {
+ return topicPerms;
+ }
+
+ public void setTopicPerms(String topicPerms) {
+ this.topicPerms = topicPerms;
+ }
+
+ public String getGroupPerms() {
+ return groupPerms;
+ }
+
+ public void setGroupPerms(String groupPerms) {
+ this.groupPerms = groupPerms;
+ }
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java
similarity index 57%
copy from acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java
index c915cf3..293480c 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteAccessConfigRequestHeader.java
@@ -15,24 +15,26 @@
* limitations under the License.
*/
-package org.apache.rocketmq.acl;
-
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-public interface AccessValidator {
- /**
- * Parse to get the AccessResource(user, resource, needed permission)
- *
- * @param request
- * @param remoteAddr
- * @return Plain access resource result,include access key,signature and some other access attributes.
- */
- AccessResource parse(RemotingCommand request, String remoteAddr);
-
- /**
- * Validate the access resource.
- *
- * @param accessResource
- */
- void validate(AccessResource accessResource);
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class DeleteAccessConfigRequestHeader implements CommandCustomHeader {
+
+ @CFNotNull
+ private String accessKey;
+
+ @Override public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java
new file mode 100644
index 0000000..43fbe47
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerAclConfigResponseHeader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetBrokerAclConfigResponseHeader implements CommandCustomHeader {
+
+ @CFNotNull
+ private String version;
+
+ @CFNotNull
+ private String brokerName;
+
+ @CFNotNull
+ private String brokerAddr;
+
+ @CFNotNull
+ private String clusterName;
+
+ @Override public void checkFields() throws RemotingCommandException {
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+ public void setBrokerAddr(String brokerAddr) {
+ this.brokerAddr = brokerAddr;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java
similarity index 54%
copy from acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java
index c915cf3..2d42c75 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateGlobalWhiteAddrsConfigRequestHeader.java
@@ -14,25 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.rocketmq.common.protocol.header;
-package org.apache.rocketmq.acl;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+public class UpdateGlobalWhiteAddrsConfigRequestHeader implements CommandCustomHeader {
-public interface AccessValidator {
- /**
- * Parse to get the AccessResource(user, resource, needed permission)
- *
- * @param request
- * @param remoteAddr
- * @return Plain access resource result,include access key,signature and some other access attributes.
- */
- AccessResource parse(RemotingCommand request, String remoteAddr);
+ @CFNotNull
+ private String globalWhiteAddrs;
- /**
- * Validate the access resource.
- *
- * @param accessResource
- */
- void validate(AccessResource accessResource);
+ @Override public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getGlobalWhiteAddrs() {
+ return globalWhiteAddrs;
+ }
+
+ public void setGlobalWhiteAddrs(String globalWhiteAddrs) {
+ this.globalWhiteAddrs = globalWhiteAddrs;
+ }
}
diff --git a/docs/cn/acl/user_guide.md b/docs/cn/acl/user_guide.md
index 1fea9ef..01c37dc 100644
--- a/docs/cn/acl/user_guide.md
+++ b/docs/cn/acl/user_guide.md
@@ -82,5 +82,76 @@ RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可
(2)如果ACL与高可用部署(多副本Dledger架构)同时启用,由于出现节点宕机时,Dledger Group组内会自动选主,那么就需要将Dledger Group组
内所有Broker节点的plain_acl.yml配置文件的白名单设置所有Broker节点的ip地址。
-**特别注意**在[4.5.0]版本中即使使用上面所述的白名单也无法解决开启ACL的问题,解决该问题的[PR链接](https://github.com/apache/rocketmq/pull/1149)
+## 7. ACL mqadmin配置管理命令
+### 7.1 更新ACL配置文件中“account”的属性值
+
+该命令的示例如下:
+
+sh mqadmin updateAclConfig -n 192.168.1.2:9876 -b 192.168.12.134:10911 -a RocketMQ -s 1234567809123
+-t topicA=DENY,topicD=SUB -g groupD=DENY,groupB=SUB
+
+说明:如果不存在则会在ACL Config YAML配置文件中创建;若存在,则会更新对应的“accounts”的属性值;
+如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
+
+| 参数 | 取值 | 含义 |
+| --- | --- | --- |
+| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
+| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
+| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
+| a | eg:RocketMQ | Access Key值(必填) |
+| s | eg:1234567809123 | Secret Key值(可选) |
+| m | eg:true | 是否管理员账户(可选) |
+| w | eg:192.168.0.* | whiteRemoteAddress,用户IP白名单(可选) |
+| i | eg:DENY;PUB;SUB;PUB\|SUB | defaultTopicPerm,默认Topic权限(可选) |
+| u | eg:DENY;PUB;SUB;PUB\|SUB | defaultGroupPerm,默认ConsumerGroup权限(可选) |
+| t | eg:topicA=DENY,topicD=SUB | topicPerms,各个Topic的权限(可选) |
+| g | eg:groupD=DENY,groupB=SUB | groupPerms,各个ConsumerGroup的权限(可选) |
+
+### 7.2 删除ACL配置文件里面的对应“account”
+该命令的示例如下:
+
+sh mqadmin deleteAccessConfig -n 192.168.1.2:9876 -c DefaultCluster -a RocketMQ
+
+说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
+其中,参数"a"为Access Key的值,用以标识唯一账户id,因此该命令的参数中指定账户id即可。
+
+| 参数 | 取值 | 含义 |
+| --- | --- | --- |
+| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
+| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
+| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
+| a | eg:RocketMQ | Access Key的值(必填) |
+
+
+### 7.3 更新ACL配置文件里面中的全局白名单
+该命令的示例如下:
+
+sh mqadmin updateGlobalWhiteAddr -n 192.168.1.2:9876 -b 192.168.12.134:10911 -g 10.10.154.1,10.10.154.2
+
+说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
+其中,参数"g"为全局IP白名的值,用以更新ACL配置文件中的“globalWhiteRemoteAddresses”字段的属性值。
+
+| 参数 | 取值 | 含义 |
+| --- | --- | --- |
+| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
+| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
+| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
+| g | eg:10.10.154.1,10.10.154.2 | 全局IP白名单(必填) |
+
+### 7.4 查询集群/Broker的ACL配置文件版本信息
+该命令的示例如下:
+
+sh mqadmin clusterAclConfigVersion -n 192.168.1.2:9876 -c DefaultCluster
+
+说明:如果指定的是集群名称,则会在集群中各个broker节点执行该命令;否则会在单个broker节点执行该命令。
+
+| 参数 | 取值 | 含义 |
+| --- | --- | --- |
+| n | eg:192.168.1.2:9876 | namesrv地址(必填) |
+| c | eg:DefaultCluster | 指定集群名称(与broker地址二选一) |
+| b | eg:192.168.12.134:10911 | 指定broker地址(与集群名称二选一) |
+
+
+**特别注意**开启Acl鉴权认证后导致Master/Slave和Dledger模式下Broker同步数据异常的问题,
+在社区[4.5.1]版本中已经修复,具体的PR链接为:https://github.com/apache/rocketmq/pull/1149;
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index dc829c1..f00dcef 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
@@ -33,6 +34,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -161,6 +163,27 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
+ public void createAndUpdatePlainAccessConfig(String addr,
+ PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExtImpl.createAndUpdatePlainAccessConfig(addr, config);
+ }
+
+ @Override public void deletePlainAccessConfig(String addr,
+ String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExtImpl.deletePlainAccessConfig(addr, accessKey);
+ }
+
+ @Override public void updateGlobalWhiteAddrConfig(String addr,
+ String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExtImpl.updateGlobalWhiteAddrConfig(addr, globalWhiteAddrs);
+ }
+
+ @Override public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
+ String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExtImpl.examineBrokerClusterAclVersionInfo(addr);
+ }
+
+ @Override
public void createAndUpdateSubscriptionGroupConfig(String addr,
SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
@@ -305,6 +328,7 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
defaultMQAdminExtImpl.deleteKvConfig(namespace, key);
}
+ @Override
public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
boolean force)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 2a7815b..502e9da 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
@@ -47,6 +48,7 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
@@ -87,6 +89,7 @@ import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.api.TrackType;
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
+
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQAdminExt defaultMQAdminExt;
private ServiceState serviceState = ServiceState.CREATE_JUST;
@@ -178,6 +181,27 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
+ @Override public void createAndUpdatePlainAccessConfig(String addr,
+ PlainAccessConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().createPlainAccessConfig(addr, config, timeoutMillis);
+ }
+
+ @Override public void deletePlainAccessConfig(String addr,
+ String accessKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().deleteAccessConfig(addr, accessKey, timeoutMillis);
+ }
+
+ @Override public void updateGlobalWhiteAddrConfig(String addr,
+ String globalWhiteAddrs) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ this.mqClientInstance.getMQClientAPIImpl().updateGlobalWhiteAddrsConfig(addr, globalWhiteAddrs, timeoutMillis);
+ }
+
+ @Override
+ public ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(
+ String addr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterAclInfo(addr, timeoutMillis);
+ }
+
@Override
public void createAndUpdateSubscriptionGroupConfig(String addr,
SubscriptionGroupConfig config) throws RemotingException,
@@ -548,6 +572,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return Collections.EMPTY_MAP;
}
+ @Override
public void createOrUpdateOrderConf(String key, String value,
boolean isCluster) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 16b4427..930785e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -68,6 +70,18 @@ public interface MQAdminExt extends MQAdmin {
final TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
+ void createAndUpdatePlainAccessConfig(final String addr, final PlainAccessConfig plainAccessConfig) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ void deletePlainAccessConfig(final String addr, final String accessKey) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ void updateGlobalWhiteAddrConfig(final String addr, final String globalWhiteAddrs)throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+ ClusterAclVersionInfo examineBrokerClusterAclVersionInfo(final String addr) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
void createAndUpdateSubscriptionGroupConfig(final String addr,
final SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index f2531de..614fed8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -30,6 +30,10 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.acl.ClusterAclConfigVersionListSubCommand;
+import org.apache.rocketmq.tools.command.acl.DeleteAccessConfigSubCommand;
+import org.apache.rocketmq.tools.command.acl.UpdateAccessConfigSubCommand;
+import org.apache.rocketmq.tools.command.acl.UpdateGlobalWhiteAddrSubCommand;
import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
@@ -199,6 +203,12 @@ public class MQAdminStartup {
initCommand(new QueryConsumeQueueCommand());
initCommand(new SendMessageCommand());
initCommand(new ConsumeMessageCommand());
+
+ //for acl command
+ initCommand(new UpdateAccessConfigSubCommand());
+ initCommand(new DeleteAccessConfigSubCommand());
+ initCommand(new ClusterAclConfigVersionListSubCommand());
+ initCommand(new UpdateGlobalWhiteAddrSubCommand());
}
private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java
new file mode 100644
index 0000000..c1e86fb
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommand.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class ClusterAclConfigVersionListSubCommand implements SubCommand {
+
+ @Override public String commandName() {
+ return "clusterAclConfigVersion";
+ }
+
+ @Override public String commandDesc() {
+ return "List all of acl config version information in cluster";
+ }
+
+ @Override public Options buildCommandlineOptions(Options options) {
+ OptionGroup optionGroup = new OptionGroup();
+
+ Option opt = new Option("b", "brokerAddr", true, "query acl config version for which broker");
+ optionGroup.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "query acl config version for specified cluster");
+ optionGroup.addOption(opt);
+
+ optionGroup.setRequired(true);
+ options.addOptionGroup(optionGroup);
+
+ return options;
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+ defaultMQAdminExt.start();
+ printClusterBaseInfo(defaultMQAdminExt, addr);
+
+ System.out.printf("get broker's plain access config version success.%n", addr);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+ defaultMQAdminExt.start();
+
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ System.out.printf("%-16s %-22s %-22s %-20s %-22s%n",
+ "#Cluster Name",
+ "#Broker Name",
+ "#Broker Addr",
+ "#AclConfigVersionNum",
+ "#AclLastUpdateTime"
+ );
+ for (String addr : masterSet) {
+ printClusterBaseInfo(defaultMQAdminExt, addr);
+ }
+ System.out.printf("get cluster's plain access config version success.%n");
+
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ private void printClusterBaseInfo(
+ final DefaultMQAdminExt defaultMQAdminExt, final String addr) throws
+ InterruptedException, MQBrokerException, RemotingException, MQClientException {
+
+
+ ClusterAclVersionInfo clusterAclVersionInfo = defaultMQAdminExt.examineBrokerClusterAclVersionInfo(addr);
+ DataVersion aclDataVersion = clusterAclVersionInfo.getAclConfigDataVersion();
+ String versionNum = String.valueOf(aclDataVersion.getCounter());
+
+ DateFormat sdf = new SimpleDateFormat(UtilAll.YYYY_MM_DD_HH_MM_SS);
+ String timeStampStr = sdf.format(new Timestamp(aclDataVersion.getTimestamp()));
+
+ System.out.printf("%-16s %-22s %-22s %-20s %-22s%n",
+ clusterAclVersionInfo.getClusterName(),
+ clusterAclVersionInfo.getBrokerName(),
+ clusterAclVersionInfo.getBrokerAddr(),
+ versionNum,
+ timeStampStr
+ );
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java
new file mode 100644
index 0000000..8570b2f
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommand.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class DeleteAccessConfigSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "deleteAccessConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Delete Acl Config Account in broker";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ OptionGroup optionGroup = new OptionGroup();
+
+ Option opt = new Option("b", "brokerAddr", true, "delete acl config account to which broker");
+ optionGroup.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "delete cl config account to which cluster");
+ optionGroup.addOption(opt);
+
+ optionGroup.setRequired(true);
+ options.addOptionGroup(optionGroup);
+
+ opt = new Option("a", "accessKey", true, "set accessKey in acl config file for deleting which account");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+
+ String accessKey = commandLine.getOptionValue('a').trim();
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+
+ defaultMQAdminExt.start();
+ defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
+
+ System.out.printf("delete plain access config account to %s success.%n", addr);
+ System.out.printf("account's accesskey is:%s", accessKey);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+ defaultMQAdminExt.start();
+
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.deletePlainAccessConfig(addr, accessKey);
+ System.out.printf("delete plain access config account to %s success.%n", addr);
+ }
+
+ System.out.printf("account's accesskey is:%s", accessKey);
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
new file mode 100644
index 0000000..10241bf
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateAccessConfigSubCommand implements SubCommand {
+
+ @Override
+ public String commandName() {
+ return "updateAclConfig";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Update acl config yaml file in broker";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ OptionGroup optionGroup = new OptionGroup();
+
+ Option opt = new Option("b", "brokerAddr", true, "update acl config file to which broker");
+ optionGroup.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "update cl config file to which cluster");
+ optionGroup.addOption(opt);
+
+ optionGroup.setRequired(true);
+ options.addOptionGroup(optionGroup);
+
+ opt = new Option("a", "accessKey", true, "set accessKey in acl config file");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("s", "secretKey", true, "set secretKey in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("w", "whiteRemoteAddress", true, "set white ip Address for account in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("i", "defaultTopicPerm", true, "set default topicPerm in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("u", "defaultGroupPerm", true, "set default GroupPerm in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("t", "topicPerms", true, "set topicPerms list,eg: topicA=DENY,topicD=SUB");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "groupPerms", true, "set groupPerms list,eg: groupD=DENY,groupD=SUB");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("m", "admin", true, "set admin flag in acl config file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ PlainAccessConfig accessConfig = new PlainAccessConfig();
+ accessConfig.setAccessKey(commandLine.getOptionValue('a').trim());
+ // Secretkey
+ if (commandLine.hasOption('s')) {
+ accessConfig.setSecretKey(commandLine.getOptionValue('s').trim());
+ }
+
+ // Admin
+ if (commandLine.hasOption('m')) {
+ accessConfig.setAdmin(Boolean.parseBoolean(commandLine.getOptionValue('m').trim()));
+ }
+
+ // DefaultTopicPerm
+ if (commandLine.hasOption('i')) {
+ accessConfig.setDefaultTopicPerm(commandLine.getOptionValue('i').trim());
+ }
+
+ // DefaultGroupPerm
+ if (commandLine.hasOption('u')) {
+ accessConfig.setDefaultGroupPerm(commandLine.getOptionValue('u').trim());
+ }
+
+ // WhiteRemoteAddress
+ if (commandLine.hasOption('w')) {
+ accessConfig.setWhiteRemoteAddress(commandLine.getOptionValue('w').trim());
+ }
+
+ // TopicPerms list value
+ if (commandLine.hasOption('t')) {
+ String[] topicPerms = commandLine.getOptionValue('t').trim().split(",");
+ List<String> topicPermList = new ArrayList<String>();
+ if (topicPerms != null) {
+ for (String topicPerm : topicPerms) {
+ topicPermList.add(topicPerm);
+ }
+ }
+ accessConfig.setTopicPerms(topicPermList);
+ }
+
+ // GroupPerms list value
+ if (commandLine.hasOption('g')) {
+ String[] groupPerms = commandLine.getOptionValue('g').trim().split(",");
+ List<String> groupPermList = new ArrayList<String>();
+ if (groupPerms != null) {
+ for (String groupPerm : groupPerms) {
+ groupPermList.add(groupPerm);
+ }
+ }
+ accessConfig.setGroupPerms(groupPermList);
+ }
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+
+ defaultMQAdminExt.start();
+ defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
+
+ System.out.printf("create or update plain access config to %s success.%n", addr);
+ System.out.printf("%s", accessConfig);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+ defaultMQAdminExt.start();
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig);
+ System.out.printf("create or update plain access config to %s success.%n", addr);
+ }
+
+ System.out.printf("%s", accessConfig);
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java
new file mode 100644
index 0000000..ef9d940
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommand.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class UpdateGlobalWhiteAddrSubCommand implements SubCommand {
+
+ @Override public String commandName() {
+ return "updateGlobalWhiteAddr";
+ }
+
+ @Override public String commandDesc() {
+ return "Update global white address for acl Config File in broker";
+ }
+
+ @Override public Options buildCommandlineOptions(Options options) {
+
+ OptionGroup optionGroup = new OptionGroup();
+
+ Option opt = new Option("b", "brokerAddr", true, "update global white address to which broker");
+ optionGroup.addOption(opt);
+
+ opt = new Option("c", "clusterName", true, "update global white address to which cluster");
+ optionGroup.addOption(opt);
+
+ optionGroup.setRequired(true);
+ options.addOptionGroup(optionGroup);
+
+ opt = new Option("g", "globalWhiteRemoteAddresses", true, "set globalWhiteRemoteAddress list,eg: 10.10.103.*,192.168.0.*");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override public void execute(CommandLine commandLine, Options options,
+ RPCHook rpcHook) throws SubCommandException {
+
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ // GlobalWhiteRemoteAddresses list value
+ String globalWhiteRemoteAddresses = commandLine.getOptionValue('g').trim();
+
+
+ if (commandLine.hasOption('b')) {
+ String addr = commandLine.getOptionValue('b').trim();
+
+ defaultMQAdminExt.start();
+ defaultMQAdminExt.updateGlobalWhiteAddrConfig(addr, globalWhiteRemoteAddresses);
+
+ System.out.printf("update global white remote addresses to %s success.%n", addr);
+ return;
+
+ } else if (commandLine.hasOption('c')) {
+ String clusterName = commandLine.getOptionValue('c').trim();
+
+ defaultMQAdminExt.start();
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.updateGlobalWhiteAddrConfig(addr, globalWhiteRemoteAddresses);
+ System.out.printf("update global white remote addresses to %s success.%n", addr);
+ }
+ return;
+ }
+
+ ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java
similarity index 57%
copy from tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
copy to tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java
index 5ea03d6..ba8baa3 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/ClusterAclConfigVersionListSubCommandTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.tools.command.topic;
+package org.apache.rocketmq.tools.command.acl;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -24,31 +24,15 @@ import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
-public class UpdateTopicSubCommandTest {
+public class ClusterAclConfigVersionListSubCommandTest {
+
@Test
public void testExecute() {
- UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
+ ClusterAclConfigVersionListSubCommand cmd = new ClusterAclConfigVersionListSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {
- "-b 127.0.0.1:10911",
- "-c default-cluster",
- "-t unit-test",
- "-r 8",
- "-w 8",
- "-p 6",
- "-o false",
- "-u false",
- "-s false"};
+ String[] subargs = new String[] {"-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
- assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
- assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
- assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
- assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
- assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
- assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false");
- assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false");
- assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false");
}
-}
\ No newline at end of file
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java
similarity index 58%
copy from tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
copy to tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java
index 5ea03d6..74092f4 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/DeleteAccessConfigSubCommandTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.tools.command.topic;
+package org.apache.rocketmq.tools.command.acl;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -24,31 +24,16 @@ import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
-public class UpdateTopicSubCommandTest {
+public class DeleteAccessConfigSubCommandTest {
+
@Test
public void testExecute() {
- UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
+ DeleteAccessConfigSubCommand cmd = new DeleteAccessConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {
- "-b 127.0.0.1:10911",
- "-c default-cluster",
- "-t unit-test",
- "-r 8",
- "-w 8",
- "-p 6",
- "-o false",
- "-u false",
- "-s false"};
+ String[] subargs = new String[] {"-a unit-test", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
- assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
+ assertThat(commandLine.getOptionValue('a').trim()).isEqualTo("unit-test");
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
- assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
- assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
- assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
- assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
- assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false");
- assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false");
- assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false");
}
-}
\ No newline at end of file
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java
new file mode 100644
index 0000000..2c133a2
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommandTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.acl;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class UpdateAccessConfigSubCommandTest {
+
+ @Test
+ public void testExecute() {
+ UpdateAccessConfigSubCommand cmd = new UpdateAccessConfigSubCommand();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[] {
+ "-b 127.0.0.1:10911",
+ "-a RocketMQ",
+ "-s 12345678",
+ "-w 192.168.0.*",
+ "-i DENY",
+ "-u SUB",
+ "-t topicA=DENY;topicB=PUB|SUB",
+ "-g groupA=DENY;groupB=SUB",
+ "-m true"};
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+ assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
+ assertThat(commandLine.getOptionValue('a').trim()).isEqualTo("RocketMQ");
+ assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("12345678");
+ assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("192.168.0.*");
+ assertThat(commandLine.getOptionValue('i').trim()).isEqualTo("DENY");
+ assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("SUB");
+ assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("topicA=DENY;topicB=PUB|SUB");
+ assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("groupA=DENY;groupB=SUB");
+ assertThat(commandLine.getOptionValue('m').trim()).isEqualTo("true");
+
+ PlainAccessConfig accessConfig = new PlainAccessConfig();
+
+ // topicPerms list value
+ if (commandLine.hasOption('t')) {
+ String[] topicPerms = commandLine.getOptionValue('t').trim().split(";");
+ List<String> topicPermList = new ArrayList<String>();
+ if (topicPerms != null) {
+ for (String topicPerm : topicPerms) {
+ topicPermList.add(topicPerm);
+ }
+ }
+ accessConfig.setTopicPerms(topicPermList);
+ }
+
+ // groupPerms list value
+ if (commandLine.hasOption('g')) {
+ String[] groupPerms = commandLine.getOptionValue('g').trim().split(";");
+ List<String> groupPermList = new ArrayList<String>();
+ if (groupPerms != null) {
+ for (String groupPerm : groupPerms) {
+ groupPermList.add(groupPerm);
+ }
+ }
+ accessConfig.setGroupPerms(groupPermList);
+ }
+
+ Assert.assertTrue(accessConfig.getTopicPerms().contains("topicB=PUB|SUB"));
+ Assert.assertTrue(accessConfig.getGroupPerms().contains("groupB=SUB"));
+
+ }
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java
similarity index 57%
copy from tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
copy to tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java
index 5ea03d6..66d609d 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/acl/UpdateGlobalWhiteAddrSubCommandTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.tools.command.topic;
+package org.apache.rocketmq.tools.command.acl;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -24,31 +24,16 @@ import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
-public class UpdateTopicSubCommandTest {
+public class UpdateGlobalWhiteAddrSubCommandTest {
+
@Test
public void testExecute() {
- UpdateTopicSubCommand cmd = new UpdateTopicSubCommand();
+ UpdateGlobalWhiteAddrSubCommand cmd = new UpdateGlobalWhiteAddrSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {
- "-b 127.0.0.1:10911",
- "-c default-cluster",
- "-t unit-test",
- "-r 8",
- "-w 8",
- "-p 6",
- "-o false",
- "-u false",
- "-s false"};
+ String[] subargs = new String[] {"-g 10.10.103.*,192.168.0.*", "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
- assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
+ assertThat(commandLine.getOptionValue('g').trim()).isEqualTo("10.10.103.*,192.168.0.*");
assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
- assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
- assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
- assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
- assertThat(commandLine.getOptionValue('p').trim()).isEqualTo("6");
- assertThat(commandLine.getOptionValue('o').trim()).isEqualTo("false");
- assertThat(commandLine.getOptionValue('u').trim()).isEqualTo("false");
- assertThat(commandLine.getOptionValue('s').trim()).isEqualTo("false");
}
-}
\ No newline at end of file
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
index 5ea03d6..7e7863f 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommandTest.java
@@ -31,7 +31,6 @@ public class UpdateTopicSubCommandTest {
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {
"-b 127.0.0.1:10911",
- "-c default-cluster",
"-t unit-test",
"-r 8",
"-w 8",
@@ -42,7 +41,6 @@ public class UpdateTopicSubCommandTest {
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
assertThat(commandLine.getOptionValue('b').trim()).isEqualTo("127.0.0.1:10911");
- assertThat(commandLine.getOptionValue('c').trim()).isEqualTo("default-cluster");
assertThat(commandLine.getOptionValue('r').trim()).isEqualTo("8");
assertThat(commandLine.getOptionValue('w').trim()).isEqualTo("8");
assertThat(commandLine.getOptionValue('t').trim()).isEqualTo("unit-test");
|