sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From co...@apache.org
Subject [04/13] sentry git commit: SENTRY-999: Refactor the sentry to integrate with external components quickly (Colin Ma, reviewed by Dapeng Sun)
Date Fri, 22 Apr 2016 06:28:24 GMT
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/pom.xml b/sentry-policy/sentry-policy-kafka/pom.xml
deleted file mode 100644
index 44739fa..0000000
--- a/sentry-policy/sentry-policy-kafka/pom.xml
+++ /dev/null
@@ -1,80 +0,0 @@
-<?xml version="1.0"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.sentry</groupId>
-    <artifactId>sentry-policy</artifactId>
-    <version>1.8.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>sentry-policy-kafka</artifactId>
-  <name>Sentry Policy for Kafka</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minicluster</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.shiro</groupId>
-      <artifactId>shiro-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.sentry</groupId>
-      <artifactId>sentry-core-model-kafka</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.sentry</groupId>
-      <artifactId>sentry-provider-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.sentry</groupId>
-      <artifactId>sentry-provider-file</artifactId>
-    </dependency>
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
deleted file mode 100644
index 7be4241..0000000
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.policy.kafka;
-
-import org.apache.sentry.core.model.kafka.Cluster;
-import org.apache.sentry.core.model.kafka.ConsumerGroup;
-import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
-import org.apache.sentry.core.model.kafka.KafkaAuthorizable.AuthorizableType;
-import org.apache.sentry.core.model.kafka.Host;
-import org.apache.sentry.core.model.kafka.Topic;
-import org.apache.sentry.policy.common.KeyValue;
-import org.apache.shiro.config.ConfigurationException;
-
-public class KafkaModelAuthorizables {
-  public static KafkaAuthorizable from(KeyValue keyValue) throws ConfigurationException {
-    String prefix = keyValue.getKey().toLowerCase();
-    String name = keyValue.getValue();
-    for (AuthorizableType type : AuthorizableType.values()) {
-      if (prefix.equalsIgnoreCase(type.name())) {
-        return from(type, name);
-      }
-    }
-    return null;
-  }
-
-  public static KafkaAuthorizable from(String keyValue) throws ConfigurationException {
-    return from(new KeyValue(keyValue));
-  }
-
-  public static KafkaAuthorizable from(AuthorizableType type, String name) throws ConfigurationException {
-    switch (type) {
-      case HOST:
-        return new Host(name);
-      case CLUSTER: {
-        if (!name.equals(Cluster.NAME)) {
-          throw new ConfigurationException("Kafka's cluster resource can only have name " + Cluster.NAME);
-        }
-        return new Cluster();
-      }
-      case TOPIC:
-        return new Topic(name);
-      case CONSUMERGROUP:
-        return new ConsumerGroup(name);
-      default:
-        return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
deleted file mode 100644
index 7383e50..0000000
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaPrivilegeValidator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.policy.kafka;
-
-import static org.apache.sentry.policy.common.PolicyConstants.AUTHORIZABLE_SPLITTER;
-import static org.apache.sentry.policy.common.PolicyConstants.PRIVILEGE_PREFIX;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.sentry.core.model.kafka.KafkaActionFactory;
-import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
-import org.apache.sentry.core.model.kafka.Host;
-import org.apache.sentry.policy.common.PrivilegeValidator;
-import org.apache.sentry.policy.common.PrivilegeValidatorContext;
-import org.apache.shiro.config.ConfigurationException;
-
-import com.google.common.collect.Lists;
-
-/**
- * Validator for Kafka privileges.
- * Below are the requirements for a kafka privilege to be valid.
- * 1. Privilege must start with Host resource.
- * 2. Privilege must have at most one non Host resource, Cluster or Topic or ConsumerGroup, followed
- *    by Host resource.
- * 3. Privilege must end with exactly one action.
- */
-public class KafkaPrivilegeValidator implements PrivilegeValidator {
-
-  public static final String KafkaPrivilegeHelpMsg =
-      "Invalid Kafka privilege." +
-      " Kafka privilege must be of the form host=<HOST>-><RESOURCE>=<RESOURCE_NAME>->action=<ACTION>," +
-      " where <HOST> can be '*' or any valid host name," +
-      " <RESOURCE> can be one of " + Arrays.toString(getKafkaAuthorizablesExceptHost()) +
-      " <RESOURCE_NAME> is name of the resource," +
-      " <ACTION> can be one of " + Arrays.toString(KafkaActionFactory.KafkaActionType.values()) +
-      ".";
-
-  private static KafkaAuthorizable.AuthorizableType[] getKafkaAuthorizablesExceptHost() {
-    final KafkaAuthorizable.AuthorizableType[] authorizableTypes = KafkaAuthorizable.AuthorizableType.values();
-    List<KafkaAuthorizable.AuthorizableType> authorizableTypesWithoutHost = new ArrayList<>(authorizableTypes.length - 1);
-    for (KafkaAuthorizable.AuthorizableType authorizableType: authorizableTypes) {
-      if (!authorizableType.equals(KafkaAuthorizable.AuthorizableType.HOST)) {
-        authorizableTypesWithoutHost.add(authorizableType);
-      }
-    }
-    return authorizableTypesWithoutHost.toArray(new KafkaAuthorizable.AuthorizableType[authorizableTypesWithoutHost.size()]);
-  }
-
-  public KafkaPrivilegeValidator() {
-  }
-
-  @Override
-  public void validate(PrivilegeValidatorContext context) throws ConfigurationException {
-    List<String> splits = Lists.newArrayList();
-    for (String section : AUTHORIZABLE_SPLITTER.split(context.getPrivilege())) {
-      splits.add(section);
-    }
-
-    // Check privilege splits length is 2 or 3
-    if (splits.size() < 2 || splits.size() > 3) {
-      throw new ConfigurationException(KafkaPrivilegeHelpMsg);
-    }
-
-    // Check privilege starts with Host resource
-    if (isAction(splits.get(0))) {
-      throw new ConfigurationException("Kafka privilege can not start with an action.\n" + KafkaPrivilegeHelpMsg);
-    }
-    KafkaAuthorizable hostAuthorizable = KafkaModelAuthorizables.from(splits.get(0));
-    if (hostAuthorizable == null) {
-      throw new ConfigurationException("No Kafka authorizable found for " + splits.get(0) + "\n." + KafkaPrivilegeHelpMsg);
-    }
-    if (!(hostAuthorizable instanceof Host)) {
-      throw new ConfigurationException("Kafka privilege must begin with host authorizable.\n" + KafkaPrivilegeHelpMsg);
-    }
-
-    // Check privilege has at most one non Host resource following Host resource
-    if (splits.size() == 3) {
-      if (isAction(splits.get(1))) {
-        throw new ConfigurationException("Kafka privilege can have action only at the end of privilege.\n" + KafkaPrivilegeHelpMsg);
-      }
-      KafkaAuthorizable authorizable = KafkaModelAuthorizables.from(splits.get(1));
-      if (authorizable == null) {
-        throw new ConfigurationException("No Kafka authorizable found for " + splits.get(1) + "\n." + KafkaPrivilegeHelpMsg);
-      }
-      if (authorizable instanceof Host) {
-        throw new ConfigurationException("Host authorizable can be specified just once in a Kafka privilege.\n" + KafkaPrivilegeHelpMsg);
-      }
-    }
-
-    // Check privilege ends with exactly one valid action
-    if (!isAction(splits.get(splits.size() - 1))) {
-      throw new ConfigurationException("Kafka privilege must end with a valid action.\n" + KafkaPrivilegeHelpMsg);
-    }
-  }
-
-  private boolean isAction(String privilegePart) {
-    final String privilege = privilegePart.toLowerCase();
-    final String action = privilege.replace(PRIVILEGE_PREFIX, "").toLowerCase();
-    return privilege.startsWith(PRIVILEGE_PREFIX) &&
-        KafkaActionFactory.getInstance().getActionByName(action) != null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
deleted file mode 100644
index 6803a46..0000000
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaWildcardPrivilege.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.policy.kafka;
-
-import static org.apache.sentry.policy.common.PolicyConstants.AUTHORIZABLE_SPLITTER;
-
-import java.util.List;
-
-import org.apache.sentry.core.model.kafka.KafkaActionConstant;
-import org.apache.sentry.core.model.kafka.KafkaAuthorizable;
-import org.apache.sentry.policy.common.Privilege;
-import org.apache.sentry.policy.common.PrivilegeFactory;
-import org.apache.sentry.policy.common.KeyValue;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class KafkaWildcardPrivilege implements Privilege {
-
-  private static String ALL_HOSTS = "*";
-
-  public static class Factory implements PrivilegeFactory {
-    @Override
-    public Privilege createPrivilege(String permission) {
-      return new KafkaWildcardPrivilege(permission);
-    }
-  }
-
-  private final ImmutableList<KeyValue> parts;
-
-  public KafkaWildcardPrivilege(String permission) {
-    if (Strings.isNullOrEmpty(permission)) {
-      throw new IllegalArgumentException("Permission string cannot be null or empty.");
-    }
-    List<KeyValue>parts = Lists.newArrayList();
-    for (String authorizable : AUTHORIZABLE_SPLITTER.trimResults().split(permission.trim())) {
-      if (authorizable.isEmpty()) {
-        throw new IllegalArgumentException("Privilege '" + permission + "' has an empty section");
-      }
-      parts.add(new KeyValue(authorizable));
-    }
-    if (parts.isEmpty()) {
-      throw new AssertionError("Privilege,  " + permission + ", did not consist of any valid authorizable.");
-    }
-    this.parts = ImmutableList.copyOf(parts);
-  }
-
-  @Override
-  public boolean implies(Privilege p) {
-    if (!(p instanceof KafkaWildcardPrivilege)) {
-      return false;
-    }
-    KafkaWildcardPrivilege wp = (KafkaWildcardPrivilege)p;
-    List<KeyValue> otherParts = wp.parts;
-    if(equals(wp)) {
-      return true;
-    }
-    int index = 0;
-    for (KeyValue otherPart : otherParts) {
-      // If this privilege has less parts than the other privilege, everything
-      // after the number of parts contained
-      // in this privilege is automatically implied, so return true
-      if (parts.size() - 1 < index) {
-        return true;
-      } else {
-        KeyValue part = parts.get(index);
-        // Support for action inheritance from parent to child
-        if (part.getKey().equalsIgnoreCase(KafkaActionConstant.actionName)
-            && !(otherPart.getKey().equalsIgnoreCase(KafkaActionConstant.actionName))) {
-          continue;
-        }
-        // are the keys even equal
-        if(!part.getKey().equalsIgnoreCase(otherPart.getKey())) {
-          return false;
-        }
-        if (!impliesKeyValue(part, otherPart)) {
-          return false;
-        }
-        index++;
-      }
-    }
-    // If this privilege has more parts than
-    // the other parts, only imply it if
-    // all of the other parts are "*" or "ALL"
-    for (; index < parts.size(); index++) {
-      KeyValue part = parts.get(index);
-      if (!part.getValue().equals(KafkaActionConstant.ALL)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean impliesKeyValue(KeyValue policyPart, KeyValue requestPart) {
-    Preconditions.checkState(policyPart.getKey().equalsIgnoreCase(requestPart.getKey()),
-        "Please report, this method should not be called with two different keys");
-
-    // Host is a special resource, not declared as resource in Kafka. Each Kafka resource can be
-    // authorized based on the host request originated from and to handle this, Sentry uses host as
-    // a resource. Kafka allows using '*' as wildcard for all hosts. '*' however is not a valid
-    // Kafka action.
-    if (hasHostWidCard(policyPart)) {
-      return true;
-    }
-
-    if (KafkaActionConstant.actionName.equalsIgnoreCase(policyPart.getKey())) { // is action
-      return policyPart.getValue().equalsIgnoreCase(KafkaActionConstant.ALL) ||
-          policyPart.getValue().equalsIgnoreCase(requestPart.getValue());
-    } else {
-      return policyPart.getValue().equals(requestPart.getValue());
-    }
-  }
-
-  private boolean hasHostWidCard(KeyValue policyPart) {
-    if (policyPart.getKey().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.toString()) &&
-        policyPart.getValue().equalsIgnoreCase(ALL_HOSTS)) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    for(KeyValue kv: this.parts) {
-      sb.append(kv.getKey() + "=" + kv.getValue() + "->");
-    }
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java
deleted file mode 100644
index 4d90544..0000000
--- a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/SimpleKafkaPolicyEngine.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.policy.kafka;
-
-import java.util.Set;
-
-import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.SentryConfigurationException;
-import org.apache.sentry.policy.common.PolicyEngine;
-import org.apache.sentry.policy.common.PrivilegeFactory;
-import org.apache.sentry.policy.common.PrivilegeValidator;
-import org.apache.sentry.provider.common.ProviderBackend;
-import org.apache.sentry.provider.common.ProviderBackendContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-public class SimpleKafkaPolicyEngine implements PolicyEngine {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaPolicyEngine.class);
-  private final ProviderBackend providerBackend;
-
-  public SimpleKafkaPolicyEngine(ProviderBackend providerBackend) {
-    this.providerBackend = providerBackend;
-    ProviderBackendContext context = new ProviderBackendContext();
-    context.setAllowPerDatabase(false);
-    context.setValidators(ImmutableList.<PrivilegeValidator>of(new KafkaPrivilegeValidator()));
-    this.providerBackend.initialize(context);
-  }
-
-  @Override
-  public PrivilegeFactory getPrivilegeFactory() {
-    return new KafkaWildcardPrivilege.Factory();
-  }
-
-  @Override
-  public ImmutableSet<String> getAllPrivileges(Set<String> groups, ActiveRoleSet roleSet)
-      throws SentryConfigurationException {
-    return getPrivileges(groups, roleSet);
-  }
-
-  @Override
-  public ImmutableSet<String> getPrivileges(Set<String> groups, ActiveRoleSet roleSet,
-                                            Authorizable... authorizableHierarchy)
-      throws SentryConfigurationException {
-    if(LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Getting permissions for {}", groups);
-    }
-    ImmutableSet<String> result = providerBackend.getPrivileges(groups, roleSet);
-    if(LOGGER.isDebugEnabled()) {
-      LOGGER.debug("result = " + result);
-    }
-    return result;
-  }
-
-  @Override
-  public void close() {
-    if (providerBackend != null) {
-      providerBackend.close();
-    }
-  }
-
-  @Override
-  public void validatePolicy(boolean strictValidation)
-      throws SentryConfigurationException {
-    if (providerBackend != null) {
-      providerBackend.validatePolicy(strictValidation);
-    }
-  }
-
-  @Override
-  public ImmutableSet<String> getAllPrivileges(Set<String> groups, Set<String> users,
-      ActiveRoleSet roleSet) throws SentryConfigurationException {
-    return getPrivileges(groups, users, roleSet);
-  }
-
-  @Override
-  public ImmutableSet<String> getPrivileges(Set<String> groups, Set<String> users,
-      ActiveRoleSet roleSet, Authorizable... authorizableHierarchy)
-      throws SentryConfigurationException {
-    if(LOGGER.isDebugEnabled()) {
-      LOGGER.debug("Getting permissions for groups: {}, users: {}", groups, users);
-    }
-    ImmutableSet<String> result = providerBackend.getPrivileges(groups, users, roleSet);
-    if(LOGGER.isDebugEnabled()) {
-      LOGGER.debug("result = " + result);
-    }
-    return result;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java
deleted file mode 100644
index c4a2f7b..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyFileProviderBackend.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.provider.file.SimpleFileProviderBackend;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaPolicyFileProviderBackend extends SimpleKafkaPolicyEngine {
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPolicyFileProviderBackend.class);
-  public KafkaPolicyFileProviderBackend(String resource) throws IOException {
-    super(new SimpleFileProviderBackend(new Configuration(), resource));
-    LOGGER.warn("The DB provider backend is the preferred option over file provider backend as the kafka policy engine");
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
deleted file mode 100644
index 572c74d..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka;
-
-import java.util.Set;
-
-import org.apache.sentry.provider.common.GroupMappingService;
-
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-public class MockGroupMappingServiceProvider implements GroupMappingService {
-  private final Multimap<String, String> userToGroupMap;
-
-  public MockGroupMappingServiceProvider(Multimap<String, String> userToGroupMap) {
-    this.userToGroupMap = userToGroupMap;
-  }
-  @Override
-  public Set<String> getGroups(String user) {
-    return Sets.newHashSet(userToGroupMap.get(user));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
deleted file mode 100644
index 421466e..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.fail;
-
-import org.apache.sentry.core.model.kafka.Cluster;
-import org.apache.sentry.core.model.kafka.ConsumerGroup;
-import org.apache.sentry.core.model.kafka.Host;
-import org.apache.sentry.core.model.kafka.Topic;
-import org.apache.shiro.config.ConfigurationException;
-import org.junit.Test;
-
-public class TestKafkaModelAuthorizables {
-
-  @Test
-  public void testHost() throws Exception {
-    Host host1 = (Host)KafkaModelAuthorizables.from("HOST=host1");
-    assertEquals("host1", host1.getName());
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testNoKV() throws Exception {
-    System.out.println(KafkaModelAuthorizables.from("nonsense"));
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testEmptyKey() throws Exception {
-    System.out.println(KafkaModelAuthorizables.from("=host1"));
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testEmptyValue() throws Exception {
-    System.out.println(KafkaModelAuthorizables.from("HOST="));
-  }
-
-  @Test
-  public void testNotAuthorizable() throws Exception {
-    assertNull(KafkaModelAuthorizables.from("k=v"));
-  }
-
-  @Test
-  public void testResourceNameIsCaseSensitive() throws Exception {
-    Host host1 = (Host)KafkaModelAuthorizables.from("HOST=Host1");
-    assertEquals("Host1", host1.getName());
-
-    Cluster cluster1 = (Cluster)KafkaModelAuthorizables.from("Cluster=kafka-cluster");
-    assertEquals("kafka-cluster", cluster1.getName());
-
-    Topic topic1 = (Topic)KafkaModelAuthorizables.from("topic=topiC1");
-    assertEquals("topiC1", topic1.getName());
-
-    ConsumerGroup consumergroup1 = (ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1");
-    assertEquals("CG1", consumergroup1.getName());
-  }
-
-  @Test
-  public void testClusterResourceNameIsRestricted() throws Exception {
-    try {
-      KafkaModelAuthorizables.from("Cluster=cluster1");
-      fail("Cluster with name other than " + Cluster.NAME + " must not have been created.");
-    } catch (ConfigurationException cex) {
-      assertEquals("Exception message is not as expected.", "Kafka's cluster resource can only have name " + Cluster.NAME, cex.getMessage());
-    } catch (Exception ex) {
-      fail("Configuration exception was expected for invalid Cluster name.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
deleted file mode 100644
index 7caa3a9..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka;
-
-import junit.framework.Assert;
-
-import org.apache.sentry.policy.common.PrivilegeValidatorContext;
-import org.apache.shiro.config.ConfigurationException;
-import org.junit.Test;
-
-public class TestKafkaPrivilegeValidator {
-  @Test
-  public void testOnlyHostResource() {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1"));
-    } catch (ConfigurationException ex) {
-      Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testWithoutHostResource() throws Exception {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    testHostResourceIsChecked(kafkaPrivilegeValidator, "cluster=kafka-cluster->action=read");
-    testHostResourceIsChecked(kafkaPrivilegeValidator, "topic=t1->action=read");
-    testHostResourceIsChecked(kafkaPrivilegeValidator, "consumergroup=g1->action=read");
-  }
-
-  private void testHostResourceIsChecked(KafkaPrivilegeValidator kafkaPrivilegeValidator, String privilege) {
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext(privilege));
-      Assert.fail("Expected ConfigurationException");
-    } catch (ConfigurationException ex) {
-      Assert.assertEquals("Kafka privilege must begin with host authorizable.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testValidPrivileges() throws Exception {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->cluster=kafka-cluster->action=read"));
-    } catch (ConfigurationException ex) {
-      Assert.fail("Not expected ConfigurationException");
-    }
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->action=read"));
-    } catch (ConfigurationException ex) {
-      Assert.fail("Not expected ConfigurationException");
-    }
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->consumergroup=g1->action=read"));
-    } catch (ConfigurationException ex) {
-      Assert.fail("Not expected ConfigurationException");
-    }
-  }
-
-  @Test
-  public void testInvalidHostResource() throws Exception {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("hhost=host1->cluster=kafka-cluster->action=read"));
-      Assert.fail("Expected ConfigurationException");
-    } catch (ConfigurationException ex) {
-    }
-  }
-
-  @Test
-  public void testInvalidClusterResource() throws Exception {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->clluster=kafka-cluster->action=read"));
-      Assert.fail("Expected ConfigurationException");
-    } catch (ConfigurationException ex) {
-    }
-  }
-
-  @Test
-  public void testInvalidTopicResource() throws Exception {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->ttopic=t1->action=read"));
-      Assert.fail("Expected ConfigurationException");
-    } catch (ConfigurationException ex) {
-    }
-  }
-
-  @Test
-  public void testInvalidConsumerGroupResource() throws Exception {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->coonsumergroup=g1->action=read"));
-      Assert.fail("Expected ConfigurationException");
-    } catch (ConfigurationException ex) {
-    }
-  }
-
-  @Test
-  public void testPrivilegeMustHaveExcatlyOneHost() {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->host=host2->action=read"));
-      Assert.fail("Multiple Host resources are not allowed within a Kafka privilege.");
-    } catch (ConfigurationException ex) {
-      Assert.assertEquals("Host authorizable can be specified just once in a Kafka privilege.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testPrivilegeCanNotStartWithAction() {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("action=write->host=host1->topic=t1"));
-      Assert.fail("Kafka privilege can not start with an action.");
-    } catch (ConfigurationException ex) {
-      Assert.assertEquals("Kafka privilege can not start with an action.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testPrivilegeWithMoreParts() {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->consumergroup=cg1->action=read"));
-      Assert.fail("Kafka privilege can have one Host authorizable, at most one non Host authorizable and one action.");
-    } catch (ConfigurationException ex) {
-      Assert.assertEquals(KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testPrivilegeNotEndingWithAction() {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->consumergroup=cg1"));
-      Assert.fail("Kafka privilege must end with a valid action.");
-    } catch (ConfigurationException ex) {
-      Assert.assertEquals("Kafka privilege must end with a valid action.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
-    }
-  }
-
-  @Test
-  public void testPrivilegeNotEndingWithValidAction() {
-    KafkaPrivilegeValidator kafkaPrivilegeValidator = new KafkaPrivilegeValidator();
-    try {
-      kafkaPrivilegeValidator.validate(new PrivilegeValidatorContext("host=host1->topic=t1->action=bla"));
-      Assert.fail("Kafka privilege must end with a valid action.");
-    } catch (ConfigurationException ex) {
-      Assert.assertEquals("Kafka privilege must end with a valid action.\n" + KafkaPrivilegeValidator.KafkaPrivilegeHelpMsg, ex.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
deleted file mode 100644
index bdef91c..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaWildcardPrivilege.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
-
-import org.apache.sentry.core.model.kafka.KafkaActionConstant;
-import org.apache.sentry.policy.common.PolicyConstants;
-import org.apache.sentry.policy.common.Privilege;
-import org.apache.sentry.policy.common.KeyValue;
-import org.junit.Test;
-
-public class TestKafkaWildcardPrivilege {
-  private static final Privilege KAFKA_HOST1_ALL =
-      create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.ALL));
-  private static final Privilege KAFKA_HOST1_READ =
-      create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.READ));
-  private static final Privilege KAFKA_HOST1_WRITE =
-      create(new KeyValue("HOST", "host1"), new KeyValue("action", KafkaActionConstant.WRITE));
-
-  private static final Privilege KAFKA_HOST1_TOPIC1_ALL =
-      create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.ALL));
-  private static final Privilege KAFKA_HOST1_TOPIC1_READ =
-      create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.READ));
-  private static final Privilege KAFKA_HOST1_TOPIC1_WRITE =
-      create(new KeyValue("HOST", "host1"), new KeyValue("TOPIC", "topic1"), new KeyValue("action", KafkaActionConstant.WRITE));
-
-  private static final Privilege KAFKA_HOST1_CLUSTER1_ALL =
-      create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.ALL));
-  private static final Privilege KAFKA_HOST1_CLUSTER1_READ =
-      create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.READ));
-  private static final Privilege KAFKA_HOST1_CLUSTER1_WRITE =
-      create(new KeyValue("HOST", "host1"), new KeyValue("CLUSTER", "cluster1"), new KeyValue("action", KafkaActionConstant.WRITE));
-
-  private static final Privilege KAFKA_HOST1_GROUP1_ALL =
-      create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.ALL));
-  private static final Privilege KAFKA_HOST1_GROUP1_READ =
-      create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.READ));
-  private static final Privilege KAFKA_HOST1_GROUP1_WRITE =
-      create(new KeyValue("HOST", "host1"), new KeyValue("GROUP", "cgroup1"), new KeyValue("action", KafkaActionConstant.WRITE));
-
-  @Test
-  public void testSimpleAction() throws Exception {
-    //host
-    assertFalse(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_READ));
-    assertFalse(KAFKA_HOST1_READ.implies(KAFKA_HOST1_WRITE));
-    //consumer group
-    assertFalse(KAFKA_HOST1_GROUP1_WRITE.implies(KAFKA_HOST1_GROUP1_READ));
-    assertFalse(KAFKA_HOST1_GROUP1_READ.implies(KAFKA_HOST1_GROUP1_WRITE));
-    //topic
-    assertFalse(KAFKA_HOST1_TOPIC1_READ.implies(KAFKA_HOST1_TOPIC1_WRITE));
-    assertFalse(KAFKA_HOST1_TOPIC1_WRITE.implies(KAFKA_HOST1_TOPIC1_READ));
-    //cluster
-    assertFalse(KAFKA_HOST1_CLUSTER1_READ.implies(KAFKA_HOST1_CLUSTER1_WRITE));
-    assertFalse(KAFKA_HOST1_CLUSTER1_WRITE.implies(KAFKA_HOST1_CLUSTER1_READ));
-  }
-
-  @Test
-  public void testShorterThanRequest() throws Exception {
-    //topic
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_ALL));
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_READ));
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_TOPIC1_WRITE));
-
-    assertFalse(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_READ));
-    assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_TOPIC1_READ));
-    assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_TOPIC1_WRITE));
-
-    //cluster
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_ALL));
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_READ));
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_CLUSTER1_WRITE));
-
-    assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_CLUSTER1_READ));
-    assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_CLUSTER1_WRITE));
-
-    //consumer group
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_ALL));
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_READ));
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_GROUP1_WRITE));
-
-    assertTrue(KAFKA_HOST1_READ.implies(KAFKA_HOST1_GROUP1_READ));
-    assertTrue(KAFKA_HOST1_WRITE.implies(KAFKA_HOST1_GROUP1_WRITE));
-  }
-
-  @Test
-  public void testActionAll() throws Exception {
-    //host
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_READ));
-    assertTrue(KAFKA_HOST1_ALL.implies(KAFKA_HOST1_WRITE));
-
-    //topic
-    assertTrue(KAFKA_HOST1_TOPIC1_ALL.implies(KAFKA_HOST1_TOPIC1_READ));
-    assertTrue(KAFKA_HOST1_TOPIC1_ALL.implies(KAFKA_HOST1_TOPIC1_WRITE));
-
-    //cluster
-    assertTrue(KAFKA_HOST1_CLUSTER1_ALL.implies(KAFKA_HOST1_CLUSTER1_READ));
-    assertTrue(KAFKA_HOST1_CLUSTER1_ALL.implies(KAFKA_HOST1_CLUSTER1_WRITE));
-
-    //consumer group
-    assertTrue(KAFKA_HOST1_GROUP1_ALL.implies(KAFKA_HOST1_GROUP1_READ));
-    assertTrue(KAFKA_HOST1_GROUP1_ALL.implies(KAFKA_HOST1_GROUP1_WRITE));
-  }
-
-  @Test
-  public void testUnexpected() throws Exception {
-    Privilege p = new Privilege() {
-      @Override
-      public boolean implies(Privilege p) {
-        return false;
-      }
-    };
-    Privilege topic1 = create(new KeyValue("HOST", "host"), new KeyValue("TOPIC", "topic1"));
-    assertFalse(topic1.implies(null));
-    assertFalse(topic1.implies(p));
-    assertFalse(topic1.equals(null));
-    assertFalse(topic1.equals(p));
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testNullString() throws Exception {
-    System.out.println(create((String)null));
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testEmptyString() throws Exception {
-    System.out.println(create(""));
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testEmptyKey() throws Exception {
-    System.out.println(create(PolicyConstants.KV_JOINER.join("", "host1")));
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testEmptyValue() throws Exception {
-    System.out.println(create(PolicyConstants.KV_JOINER.join("HOST", "")));
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testEmptyPart() throws Exception {
-    System.out.println(create(PolicyConstants.AUTHORIZABLE_JOINER.
-        join(PolicyConstants.KV_JOINER.join("HOST", "host1"), "")));
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testOnlySeperators() throws Exception {
-    System.out.println(create(PolicyConstants.AUTHORIZABLE_JOINER.
-        join(PolicyConstants.KV_SEPARATOR, PolicyConstants.KV_SEPARATOR, PolicyConstants.KV_SEPARATOR)));
-  }
-
-  static KafkaWildcardPrivilege create(KeyValue... keyValues) {
-    return create(PolicyConstants.AUTHORIZABLE_JOINER.join(keyValues));
-
-  }
-  static KafkaWildcardPrivilege create(String s) {
-    return new KafkaWildcardPrivilege(s);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
deleted file mode 100644
index 810c05e..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/AbstractTestKafkaPolicyEngine.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.sentry.policy.kafka.engine;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Set;
-import java.util.TreeSet;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.policy.common.PolicyEngine;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-
-public abstract class AbstractTestKafkaPolicyEngine {
-
-  private static final String ADMIN = "host=*->action=all";
-  private static final String ADMIN_HOST1 = "host=host1->action=all";
-  private static final String CONSUMER_T1_ALL = "host=*->topic=t1->action=read";
-  private static final String CONSUMER_T1_HOST1 = "host=host1->topic=t1->action=read";
-  private static final String CONSUMER_T2_HOST2 = "host=host2->topic=t2->action=read";
-  private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write";
-  private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write";
-  private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write";
-  private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all";
-
-  private PolicyEngine policy;
-  private static File baseDir;
-
-  @BeforeClass
-  public static void setupClazz() throws IOException {
-    baseDir = Files.createTempDir();
-  }
-
-  @AfterClass
-  public static void teardownClazz() throws IOException {
-    if (baseDir != null) {
-      FileUtils.deleteQuietly(baseDir);
-    }
-  }
-
-  protected void setPolicy(PolicyEngine policy) {
-    this.policy = policy;
-  }
-
-  protected static File getBaseDir() {
-    return baseDir;
-  }
-
-  @Before
-  public void setup() throws IOException {
-    afterSetup();
-  }
-
-  @After
-  public void teardown() throws IOException {
-    beforeTeardown();
-  }
-
-  protected void afterSetup() throws IOException {}
-
-  protected void beforeTeardown() throws IOException {}
-
-
-  @Test
-  public void testConsumer0() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_ALL));
-    Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("consumer_group0"), ActiveRoleSet.ALL))
-            .toString());
-  }
-
-  @Test
-  public void testConsumer1() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_HOST1));
-    Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("consumer_group1"), ActiveRoleSet.ALL))
-            .toString());
-  }
-
-  @Test
-  public void testConsumer2() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T2_HOST2));
-    Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("consumer_group2"), ActiveRoleSet.ALL))
-            .toString());
-  }
-
-  @Test
-  public void testProducer0() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_ALL));
-    Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("producer_group0"), ActiveRoleSet.ALL))
-            .toString());
-  }
-
-  @Test
-  public void testProducer1() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_HOST1));
-    Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("producer_group1"), ActiveRoleSet.ALL))
-            .toString());
-  }
-
-
-  @Test
-  public void testProducer2() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T2_HOST2));
-    Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("producer_group2"), ActiveRoleSet.ALL))
-            .toString());
-  }
-
-  @Test
-  public void testConsumerProducer0() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1));
-    Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("consumer_producer_group0"), ActiveRoleSet.ALL))
-            .toString());
-  }
-
-  @Test
-  public void testSubAdmin() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1));
-    Assert.assertEquals(expected.toString(),
-        new TreeSet<String>(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL))
-            .toString());
-  }
-
-  @Test
-  public void testAdmin() throws Exception {
-    Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN));
-    Assert
-        .assertEquals(expected.toString(),
-            new TreeSet<String>(policy.getPrivileges(set("admin_group"), ActiveRoleSet.ALL))
-                .toString());
-  }
-
-  private static Set<String> set(String... values) {
-    return Sets.newHashSet(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java
deleted file mode 100644
index f2bd3c8..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineDFS.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka.engine;
-
-import java.io.File;
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
-import org.apache.sentry.provider.file.PolicyFiles;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-public class TestKafkaPolicyEngineDFS extends AbstractTestKafkaPolicyEngine {
-  private static MiniDFSCluster dfsCluster;
-  private static FileSystem fileSystem;
-  private static Path root;
-  private static Path etc;
-
-  @BeforeClass
-  public static void setupLocalClazz() throws IOException {
-    File baseDir = getBaseDir();
-    Assert.assertNotNull(baseDir);
-    File dfsDir = new File(baseDir, "dfs");
-    Assert.assertTrue(dfsDir.isDirectory() || dfsDir.mkdirs());
-    Configuration conf = new Configuration();
-    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath());
-    dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    fileSystem = dfsCluster.getFileSystem();
-    root = new Path(fileSystem.getUri().toString());
-    etc = new Path(root, "/etc");
-    fileSystem.mkdirs(etc);
-  }
-
-  @AfterClass
-  public static void teardownLocalClazz() {
-    if(dfsCluster != null) {
-      dfsCluster.shutdown();
-    }
-  }
-
-  @Override
-  protected void afterSetup() throws IOException {
-    fileSystem.delete(etc, true);
-    fileSystem.mkdirs(etc);
-    PolicyFiles.copyToDir(fileSystem, etc, "test-authz-provider.ini");
-    setPolicy(new KafkaPolicyFileProviderBackend(new Path(etc,
-        "test-authz-provider.ini").toString()));
-  }
-
-  @Override
-  protected void beforeTeardown() throws IOException {
-    fileSystem.delete(etc, true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java
deleted file mode 100644
index 4bc061d..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/engine/TestKafkaPolicyEngineLocalFS.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka.engine;
-
-import java.io.File;
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
-import org.apache.sentry.provider.file.PolicyFiles;
-
-public class TestKafkaPolicyEngineLocalFS extends AbstractTestKafkaPolicyEngine {
-
-  @Override
-  protected void  afterSetup() throws IOException {
-    File baseDir = getBaseDir();
-    Assert.assertNotNull(baseDir);
-    Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
-    PolicyFiles.copyToDir(baseDir, "test-authz-provider.ini");
-    setPolicy(new KafkaPolicyFileProviderBackend(new File(baseDir, "test-authz-provider.ini").getPath()));
-  }
-
-  @Override
-  protected void beforeTeardown() throws IOException {
-    File baseDir = getBaseDir();
-    Assert.assertNotNull(baseDir);
-    FileUtils.deleteQuietly(baseDir);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
deleted file mode 100644
index 386d2d5..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka.provider;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.sentry.core.common.Action;
-import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.Subject;
-import org.apache.sentry.core.model.kafka.Cluster;
-import org.apache.sentry.core.model.kafka.ConsumerGroup;
-import org.apache.sentry.core.model.kafka.KafkaActionConstant;
-import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
-import org.apache.sentry.core.model.kafka.Host;
-import org.apache.sentry.core.model.kafka.Topic;
-import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
-import org.apache.sentry.policy.kafka.MockGroupMappingServiceProvider;
-import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider;
-import org.apache.sentry.provider.common.ResourceAuthorizationProvider;
-import org.apache.sentry.provider.file.PolicyFiles;
-import org.junit.After;
-import org.junit.Test;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-
-public class TestKafkaAuthorizationProviderGeneralCases {
-  private static final Multimap<String, String> USER_TO_GROUP_MAP = HashMultimap.create();
-
-  private static final Host HOST_1 = new Host("host1");
-  private static final Host HOST_2 = new Host("host2");
-  private static final Cluster cluster1 = new Cluster();
-  private static final Topic topic1 = new Topic("t1");
-  private static final Topic topic2 = new Topic("t2");
-  private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1");
-  private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2");
-
-  private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL);
-  private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ);
-  private static final KafkaAction WRITE = new KafkaAction(KafkaActionConstant.WRITE);
-  private static final KafkaAction CREATE = new KafkaAction(KafkaActionConstant.CREATE);
-  private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE);
-  private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER);
-  private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE);
-  private static final KafkaAction CLUSTER_ACTION = new KafkaAction(
-      KafkaActionConstant.CLUSTER_ACTION);
-
-  private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION);
-
-  private static final Subject ADMIN = new Subject("admin1");
-  private static final Subject SUB_ADMIN = new Subject("subadmin1");
-  private static final Subject CONSUMER0 = new Subject("consumer0");
-  private static final Subject CONSUMER1 = new Subject("consumer1");
-  private static final Subject CONSUMER2 = new Subject("consumer2");
-  private static final Subject PRODUCER0 = new Subject("producer0");
-  private static final Subject PRODUCER1 = new Subject("producer1");
-  private static final Subject PRODUCER2 = new Subject("producer2");
-  private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0");
-
-  private static final String ADMIN_GROUP  = "admin_group";
-  private static final String SUBADMIN_GROUP  = "subadmin_group";
-  private static final String CONSUMER_GROUP0 = "consumer_group0";
-  private static final String CONSUMER_GROUP1 = "consumer_group1";
-  private static final String CONSUMER_GROUP2 = "consumer_group2";
-  private static final String PRODUCER_GROUP0 = "producer_group0";
-  private static final String PRODUCER_GROUP1 = "producer_group1";
-  private static final String PRODUCER_GROUP2 = "producer_group2";
-  private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0";
-
-  static {
-    USER_TO_GROUP_MAP.putAll(ADMIN.getName(), Arrays.asList(ADMIN_GROUP));
-    USER_TO_GROUP_MAP.putAll(SUB_ADMIN.getName(),  Arrays.asList(SUBADMIN_GROUP ));
-    USER_TO_GROUP_MAP.putAll(CONSUMER0.getName(),  Arrays.asList(CONSUMER_GROUP0));
-    USER_TO_GROUP_MAP.putAll(CONSUMER1.getName(),  Arrays.asList(CONSUMER_GROUP1));
-    USER_TO_GROUP_MAP.putAll(CONSUMER2.getName(),  Arrays.asList(CONSUMER_GROUP2));
-    USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(),  Arrays.asList(PRODUCER_GROUP0));
-    USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(),  Arrays.asList(PRODUCER_GROUP1));
-    USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(),  Arrays.asList(PRODUCER_GROUP2));
-    USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(),  Arrays.asList(CONSUMER_PRODUCER_GROUP0));
-  }
-
-  private final ResourceAuthorizationProvider authzProvider;
-  private File baseDir;
-
-  public TestKafkaAuthorizationProviderGeneralCases() throws IOException {
-    baseDir = Files.createTempDir();
-    PolicyFiles.copyToDir(baseDir, "test-authz-provider.ini");
-    authzProvider = new HadoopGroupResourceAuthorizationProvider(
-        new KafkaPolicyFileProviderBackend(new File(baseDir, "test-authz-provider.ini").getPath()),
-        new MockGroupMappingServiceProvider(USER_TO_GROUP_MAP));
-  }
-
-  @After
-  public void teardown() {
-    if(baseDir != null) {
-      FileUtils.deleteQuietly(baseDir);
-    }
-  }
-
-  private void doTestResourceAuthorizationProvider(Subject subject, List<? extends Authorizable> authorizableHierarchy,
-      Set<? extends Action> actions, boolean expected) throws Exception {
-    Objects.ToStringHelper helper = Objects.toStringHelper("TestParameters");
-    helper.add("Subject", subject).add("authzHierarchy", authorizableHierarchy).add("action", actions);
-    Assert.assertEquals(helper.toString(), expected,
-        authzProvider.hasAccess(subject, authorizableHierarchy, actions, ActiveRoleSet.ALL));
-  }
-
-  @Test
-  public void testAdmin() throws Exception {
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1), allActions, true);
-
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cluster1), allActions, false);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic1), allActions, false);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic2), allActions, false);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, false);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, false);
-    doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2), allActions, false);
-
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic1), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic2), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1), allActions, true);
-
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cluster1), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic1), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic2), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, true);
-    doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2), allActions, true);
-  }
-
-  @Test
-  public void testConsumer() throws Exception {
-    for (KafkaAction action : allActions) {
-      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
-        doTestResourceAuthorizationProvider(CONSUMER0, Arrays.asList(host, topic1),
-            Sets.newHashSet(action), READ.equals(action));
-      }
-    }
-    for (KafkaAction action : allActions) {
-      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
-        doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1),
-            Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action));
-      }
-    }
-    for (KafkaAction action : allActions) {
-      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
-        doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2),
-            Sets.newHashSet(action), HOST_2.equals(host) && READ.equals(action));
-      }
-    }
-  }
-
-  @Test
-  public void testProducer() throws Exception {
-    for (KafkaAction action : allActions) {
-      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
-        doTestResourceAuthorizationProvider(PRODUCER0, Arrays.asList(host, topic1),
-            Sets.newHashSet(action), WRITE.equals(action));
-      }
-    }
-    for (KafkaAction action : allActions) {
-      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
-        doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1),
-            Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action));
-      }
-    }
-    for (KafkaAction action : allActions) {
-      for (Host host : Sets.newHashSet(HOST_1, HOST_2)) {
-        doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2),
-            Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action));
-      }
-    }
-  }
-
-  @Test
-  public void testConsumerProducer() throws Exception {
-    for (KafkaAction action : allActions) {
-      doTestResourceAuthorizationProvider(CONSUMER_PRODUCER0, Arrays.asList(HOST_1, topic1),
-          Sets.newHashSet(action), true);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java
deleted file mode 100644
index 0a453ce..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderSpecialCases.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sentry.policy.kafka.provider;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.sentry.core.common.Action;
-import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.core.common.Authorizable;
-import org.apache.sentry.core.common.Subject;
-import org.apache.sentry.core.model.kafka.KafkaActionConstant;
-import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction;
-import org.apache.sentry.core.model.kafka.Host;
-import org.apache.sentry.core.model.kafka.Topic;
-import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
-import org.apache.sentry.provider.common.AuthorizationProvider;
-import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider;
-import org.apache.sentry.provider.file.PolicyFile;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-
-public class TestKafkaAuthorizationProviderSpecialCases {
-  private AuthorizationProvider authzProvider;
-  private PolicyFile policyFile;
-  private File baseDir;
-  private File iniFile;
-  private String initResource;
-  @Before
-  public void setup() throws IOException {
-    baseDir = Files.createTempDir();
-    iniFile = new File(baseDir, "policy.ini");
-    initResource = "file://" + iniFile.getPath();
-    policyFile = new PolicyFile();
-  }
-
-  @After
-  public void teardown() throws IOException {
-    if(baseDir != null) {
-      FileUtils.deleteQuietly(baseDir);
-    }
-  }
-
-  @Test
-  public void testDuplicateEntries() throws Exception {
-    Subject user1 = new Subject("user1");
-    Host host1 = new Host("host1");
-    Topic topic1 = new Topic("t1");
-    Set<? extends Action> actions = Sets.newHashSet(new KafkaAction(KafkaActionConstant.READ));
-    policyFile.addGroupsToUser(user1.getName(), true, "group1", "group1")
-      .addRolesToGroup("group1",  true, "role1", "role1")
-      .addPermissionsToRole("role1", true, "host=host1->topic=t1->action=read",
-          "host=host1->topic=t1->action=read");
-    policyFile.write(iniFile);
-    KafkaPolicyFileProviderBackend policy = new KafkaPolicyFileProviderBackend(initResource);
-    authzProvider = new LocalGroupResourceAuthorizationProvider(initResource, policy);
-    List<? extends Authorizable> authorizableHierarchy = ImmutableList.of(host1, topic1);
-    Assert.assertTrue(authorizableHierarchy.toString(),
-        authzProvider.hasAccess(user1, authorizableHierarchy, actions, ActiveRoleSet.ALL));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
deleted file mode 100644
index 1cb694a..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaPolicyNegative.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sentry.policy.kafka.provider;
-
-import java.io.File;
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.sentry.core.common.ActiveRoleSet;
-import org.apache.sentry.policy.common.PolicyEngine;
-import org.apache.sentry.policy.kafka.KafkaPolicyFileProviderBackend;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
-
-public class TestKafkaPolicyNegative {
-  private File baseDir;
-  private File globalPolicyFile;
-
-  @Before
-  public void setup() {
-    baseDir = Files.createTempDir();
-    globalPolicyFile = new File(baseDir, "global.ini");
-  }
-
-  @After
-  public void teardown() {
-    if(baseDir != null) {
-      FileUtils.deleteQuietly(baseDir);
-    }
-  }
-
-  private void append(String from, File to) throws IOException {
-    Files.append(from + "\n", to, Charsets.UTF_8);
-  }
-
-  @Test
-  public void testauthorizedKafkaInPolicyFile() throws Exception {
-    append("[groups]", globalPolicyFile);
-    append("other_group = other_role", globalPolicyFile);
-    append("[roles]", globalPolicyFile);
-    append("other_role = host=host1->topic=t1->action=read, host=host1->consumergroup=l1->action=read", globalPolicyFile);
-    PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
-    //malicious_group has no privilege
-    ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("malicious_group"), ActiveRoleSet.ALL);
-    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
-    //other_group has two privileges
-    permissions = policy.getAllPrivileges(Sets.newHashSet("other_group"), ActiveRoleSet.ALL);
-    Assert.assertTrue(permissions.toString(), permissions.size() == 2);
-  }
-
-  @Test
-  public void testNoHostNameConfig() throws Exception {
-    append("[groups]", globalPolicyFile);
-    append("other_group = malicious_role", globalPolicyFile);
-    append("[roles]", globalPolicyFile);
-    append("malicious_role = topic=t1->action=read", globalPolicyFile);
-    PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
-    ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("other_group"), ActiveRoleSet.ALL);
-    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
-  }
-
-  @Test
-  public void testHostAllName() throws Exception {
-    append("[groups]", globalPolicyFile);
-    append("group = malicious_role", globalPolicyFile);
-    append("[roles]", globalPolicyFile);
-    append("malicious_role = host=*->action=read", globalPolicyFile);
-    PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
-    ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("group"), ActiveRoleSet.ALL);
-    Assert.assertTrue(permissions.toString(), permissions.size() == 1);
-  }
-
-  @Test
-  public void testAll() throws Exception {
-    append("[groups]", globalPolicyFile);
-    append("group = malicious_role", globalPolicyFile);
-    append("[roles]", globalPolicyFile);
-    append("malicious_role = *", globalPolicyFile);
-    PolicyEngine policy = new KafkaPolicyFileProviderBackend(globalPolicyFile.getPath());
-    ImmutableSet<String> permissions = policy.getAllPrivileges(Sets.newHashSet("group"), ActiveRoleSet.ALL);
-    Assert.assertTrue(permissions.toString(), permissions.isEmpty());
-  }
-}

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties b/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties
deleted file mode 100644
index 7703069..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Define some default values that can be overridden by system properties.
-#
-# For testing, it may also be convenient to specify
-
-log4j.rootLogger=DEBUG,console
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n
-
-log4j.logger.org.apache.hadoop.conf.Configuration=INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
----------------------------------------------------------------------
diff --git a/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini b/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
deleted file mode 100644
index 1951aba..0000000
--- a/sentry-policy/sentry-policy-kafka/src/test/resources/test-authz-provider.ini
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-[groups]
-admin_group = admin_all
-subadmin_group = admin_host1
-consumer_group0 = consumer_t1_all
-consumer_group1 = consumer_t1_host1
-consumer_group2 = consumer_t2_host2
-producer_group0 = producer_t1_all
-producer_group1 = producer_t1_host1
-producer_group2 = producer_t2_host2
-consumer_producer_group0 = consumer_producer_t1
-
-[roles]
-admin_all = host=*->action=all
-admin_host1 = host=host1->action=all
-consumer_t1_all = host=*->topic=t1->action=read
-consumer_t1_host1 = host=host1->topic=t1->action=read
-consumer_t2_host2 = host=host2->topic=t2->action=read
-producer_t1_all = host=*->topic=t1->action=write
-producer_t1_host1 = host=host1->topic=t1->action=write
-producer_t2_host2 = host=host2->topic=t2->action=write
-consumer_producer_t1 = host=host1->topic=t1->action=all


Mime
View raw message