sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sra...@apache.org
Subject git commit: SENTRY-432: Synchronization of HDFS permissions with Sentry permissions: More restart tests ( Arun Suresh via Sravya Tirukkovalur)
Date Fri, 17 Oct 2014 23:11:14 GMT
Repository: incubator-sentry
Updated Branches:
  refs/heads/sentry-hdfs-plugin c059d3d76 -> 0152e3a3d


SENTRY-432: Synchronization of HDFS permissions with Sentry permissions: More restart tests
( Arun Suresh via Sravya Tirukkovalur)


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/0152e3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/0152e3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/0152e3a3

Branch: refs/heads/sentry-hdfs-plugin
Commit: 0152e3a3daf34ce04de0ad39ffc3a6f857370205
Parents: c059d3d
Author: Sravya Tirukkovalur <sravya@clouera.com>
Authored: Fri Oct 17 16:10:05 2014 -0700
Committer: Sravya Tirukkovalur <sravya@clouera.com>
Committed: Fri Oct 17 16:10:39 2014 -0700

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 sentry-dist/src/main/assembly/sentry-hdfs.xml   |  14 +
 .../sentry/hdfs/SentryAuthorizationInfo.java    |  10 +-
 .../hdfs/SentryAuthorizationProvider.java       |   8 +-
 .../org/apache/sentry/hdfs/SentryUpdater.java   |   1 +
 .../provider/db/SimpleDBProviderBackend.java    |  26 +-
 sentry-tests/sentry-tests-hive/pom.xml          |   2 +
 .../tests/e2e/hdfs/TestHDFSIntegration.java     | 497 +++++++++++++------
 8 files changed, 409 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 952d702..d901718 100644
--- a/pom.xml
+++ b/pom.xml
@@ -432,9 +432,9 @@ limitations under the License.
     <module>sentry-provider</module>
     <module>sentry-policy</module>
     <module>sentry-tests</module>
-    <module>sentry-dist</module>
     <module>sentry-service-client</module>
     <module>sentry-hdfs</module>
+    <module>sentry-dist</module>
   </modules>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-dist/src/main/assembly/sentry-hdfs.xml
----------------------------------------------------------------------
diff --git a/sentry-dist/src/main/assembly/sentry-hdfs.xml b/sentry-dist/src/main/assembly/sentry-hdfs.xml
index 8d85d8f..22ced14 100644
--- a/sentry-dist/src/main/assembly/sentry-hdfs.xml
+++ b/sentry-dist/src/main/assembly/sentry-hdfs.xml
@@ -30,6 +30,20 @@
 
   <baseDirectory>sentry-hdfs-${project.version}</baseDirectory>
 
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>/</outputDirectory>
+      <unpack>false</unpack>
+      <useProjectArtifact>false</useProjectArtifact>
+      <useStrictFiltering>true</useStrictFiltering>
+      <useTransitiveFiltering>false</useTransitiveFiltering>
+      <includes>
+        <include>org.apache.thrift:libthrift</include>
+        <include>org.apache.thrift:libfb303</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+
   <fileSets>
     <fileSet>
       <directory>${project.parent.basedir}/sentry-hdfs/sentry-hdfs-dist/target</directory>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
index 23e06dd..3081ae1 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java
@@ -112,9 +112,13 @@ public class SentryAuthorizationInfo implements Runnable {
       lock.writeLock().lock();
       try {
         authzPaths = newAuthzPaths;
-        LOG.warn("##### FULL Updated paths seq Num [" + authzPaths.getLastUpdatedSeqNum()
+ "]");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("FULL Updated paths seq Num [" + authzPaths.getLastUpdatedSeqNum() +
"]");
+        }
         authzPermissions = newAuthzPerms;
-        LOG.warn("##### FULL Updated perms seq Num [" + authzPermissions.getLastUpdatedSeqNum()
+ "]");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("FULL Updated perms seq Num [" + authzPermissions.getLastUpdatedSeqNum()
+ "]");
+        }
       } finally {
         lock.writeLock().unlock();
       }
@@ -192,7 +196,7 @@ public class SentryAuthorizationInfo implements Runnable {
   public boolean isStale() {
     long now = System.currentTimeMillis();
     boolean stale = now - lastUpdate > staleThresholdMillisec;
-    if (stale && now - lastStaleReport > 30 * 1000) {
+    if (stale && now - lastStaleReport > retryWaitMillisec) {
       LOG.warn("Authorization information has been stale for [{}]s", 
           (now - lastUpdate) / 1000);
       lastStaleReport = now;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java
index 3edd5fa..cfd5862 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java
@@ -309,7 +309,7 @@ public class SentryAuthorizationProvider
         String user = defaultAuthzProvider.getUser(node, snapshotId);
         String group = defaultAuthzProvider.getGroup(node, snapshotId);
         INodeAuthorizationInfo pNode = node.getParent();
-        while  (group == null || pNode != null) {
+        while  (group == null && pNode != null) {
           group = defaultAuthzProvider.getGroup(pNode, snapshotId);
           pNode = pNode.getParent();
         }
@@ -334,9 +334,9 @@ public class SentryAuthorizationProvider
     if (LOG.isDebugEnabled()) {
       LOG.debug("### getAclEntry [" + (p == null ? "null" : p) + "] : ["
           + "isManaged=" + isManaged
-          + ",isStale=" + isStale
-          + ",hasAuthzObj=" + hasAuthzObj
-          + ",origAtuhzAsAcl=" + originalAuthzAsAcl + "]"
+          + ", isStale=" + isStale
+          + ", hasAuthzObj=" + hasAuthzObj
+          + ", origAuthzAsAcl=" + originalAuthzAsAcl + "]"
           + "[" + (f == null ? "null" : f.getEntries()) + "]");
     }
     return f;

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
index 905553e..9540397 100644
--- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
+++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java
@@ -42,6 +42,7 @@ public class SentryUpdater {
       } catch (Exception e) {
         LOG.error("Error connecting to Sentry ['{}'] !!",
             e.getMessage());
+        sentryClient = null;
         return null;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
----------------------------------------------------------------------
diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
index b66037a..ff549fe 100644
--- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
+++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java
@@ -39,9 +39,10 @@ public class SimpleDBProviderBackend implements ProviderBackend {
   private static final Logger LOGGER = LoggerFactory
       .getLogger(SimpleDBProviderBackend.class);
 
-  private final SentryPolicyServiceClient policyServiceClient;
+  private SentryPolicyServiceClient policyServiceClient;
 
   private volatile boolean initialized;
+  private Configuration conf; 
 
   public SimpleDBProviderBackend(Configuration conf, String resourcePath) throws IOException
{
     // DB Provider doesn't use policy file path
@@ -50,6 +51,8 @@ public class SimpleDBProviderBackend implements ProviderBackend {
 
   public SimpleDBProviderBackend(Configuration conf) throws IOException {
     this(new SentryPolicyServiceClient(conf));
+    this.initialized = false;
+    this.conf = conf;
   }
 
   @VisibleForTesting
@@ -78,10 +81,16 @@ public class SimpleDBProviderBackend implements ProviderBackend {
       throw new IllegalStateException("Backend has not been properly initialized");
     }
     try {
-      return ImmutableSet.copyOf(policyServiceClient.listPrivilegesForProvider(groups, roleSet,
authorizableHierarchy));
+      return ImmutableSet.copyOf(getSentryClient().listPrivilegesForProvider(groups, roleSet,
authorizableHierarchy));
     } catch (SentryUserException e) {
       String msg = "Unable to obtain privileges from server: " + e.getMessage();
       LOGGER.error(msg, e);
+      try {
+        policyServiceClient.close();
+      } catch (Exception ex) {
+        // Ignore
+      }
+      policyServiceClient = null;
     }
     return ImmutableSet.of();
   }
@@ -101,6 +110,19 @@ public class SimpleDBProviderBackend implements ProviderBackend {
     }
   }
 
+  private SentryPolicyServiceClient getSentryClient() {
+    if (policyServiceClient == null) {
+      try {
+        policyServiceClient = new SentryPolicyServiceClient(conf);
+      } catch (Exception e) {
+        LOGGER.error("Error connecting to Sentry ['{}'] !!",
+            e.getMessage());
+        policyServiceClient = null;
+        return null;
+      }
+    }
+    return policyServiceClient;
+  }
   /**
    * SimpleDBProviderBackend does not implement validatePolicy()
    */

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-tests/sentry-tests-hive/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/pom.xml b/sentry-tests/sentry-tests-hive/pom.xml
index fde850f..a8a33eb 100644
--- a/sentry-tests/sentry-tests-hive/pom.xml
+++ b/sentry-tests/sentry-tests-hive/pom.xml
@@ -249,12 +249,14 @@ limitations under the License.
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minicluster</artifactId>
       <scope>test</scope>
+<!--
       <exclusions>
           <exclusion>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
           </exclusion>
       </exclusions>
+-->
     </dependency>
       <dependency>
           <groupId>org.hamcrest</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
index 41f8af8..26cf978 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java
@@ -16,28 +16,37 @@
  */
 package org.apache.sentry.tests.e2e.hdfs;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.StringTokenizer;
 import java.util.concurrent.TimeoutException;
 
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -45,6 +54,21 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.MiniMRClientCluster;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.security.GroupMappingServiceProvider;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl;
@@ -94,14 +118,48 @@ public class TestHDFSIntegration {
     }
   }
 
+  public static class WordCountMapper extends MapReduceBase implements
+      Mapper<LongWritable, Text, String, Long> {
+
+    public void map(LongWritable key, Text value,
+        OutputCollector<String, Long> output, Reporter reporter)
+        throws IOException {
+      StringTokenizer st = new StringTokenizer(value.toString());
+      while (st.hasMoreTokens()) {
+        output.collect(st.nextToken(), 1L);
+      }
+    }
+
+  }
+
+  public static class SumReducer extends MapReduceBase implements
+      Reducer<Text, Long, Text, Long> {
+
+    public void reduce(Text key, Iterator<Long> values,
+        OutputCollector<Text, Long> output, Reporter reporter)
+        throws IOException {
+
+      long sum = 0;
+      while (values.hasNext()) {
+        sum += values.next();
+      }
+      output.collect(key, sum);
+    }
+
+  }
+
   private MiniDFSCluster miniDFS;
+  private MiniMRClientCluster miniMR;
   private InternalHiveServer hiveServer2;
   private InternalMetastoreServer metastore;
+  private SentryService sentryService;
   private String fsURI;
   private int hmsPort;
-  private int sentryPort;
+  private int sentryPort = -1;
   private File baseDir;
-  private UserGroupInformation admin;
+  private File policyFileLocation;
+  private UserGroupInformation adminUgi;
+  private UserGroupInformation hiveUgi;
 
   protected static File assertCreateDir(File dir) {
     if(!dir.isDirectory()) {
@@ -117,10 +175,10 @@ public class TestHDFSIntegration {
     return port;
   }
 
-  private static void startSentryService(SentryService sentryServer) throws Exception {
-    sentryServer.start();
+  private void waitOnSentryService() throws Exception {
+    sentryService.start();
     final long start = System.currentTimeMillis();
-    while (!sentryServer.isRunning()) {
+    while (!sentryService.isRunning()) {
       Thread.sleep(1000);
       if (System.currentTimeMillis() - start > 60000L) {
         throw new TimeoutException("Server did not start after 60 seconds");
@@ -132,112 +190,29 @@ public class TestHDFSIntegration {
   public void setup() throws Exception {
     Class.forName("org.apache.hive.jdbc.HiveDriver");
     baseDir = Files.createTempDir();
-    final File policyFileLocation = new File(baseDir, HiveServerFactory.AUTHZ_PROVIDER_FILENAME);
+    policyFileLocation = new File(baseDir, HiveServerFactory.AUTHZ_PROVIDER_FILENAME);
     PolicyFile policyFile = PolicyFile.setAdminOnServer1("hive")
         .setUserGroupMapping(StaticUserGroup.getStaticMapping());
     policyFile.write(policyFileLocation);
 
-    admin = UserGroupInformation.createUserForTesting(
+    adminUgi = UserGroupInformation.createUserForTesting(
         System.getProperty("user.name"), new String[] { "supergroup" });
 
-    UserGroupInformation hiveUgi = UserGroupInformation.createUserForTesting(
+    hiveUgi = UserGroupInformation.createUserForTesting(
         "hive", new String[] { "hive" });
 
-    hiveUgi.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        Configuration sentryConf = new Configuration(false);
-        Map<String, String> properties = Maps.newHashMap();
-        properties.put(HiveServerFactory.AUTHZ_PROVIDER_BACKEND,
-            SimpleDBProviderBackend.class.getName());
-        properties.put(ConfVars.HIVE_AUTHORIZATION_TASK_FACTORY.varname,
-            SentryHiveAuthorizationTaskFactoryImpl.class.getName());
-        properties
-            .put(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS.varname, "2");
-        properties.put("hive.metastore.uris", "thrift://localhost:" + hmsPort);
-        properties.put(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE);
-        properties.put("sentry.hive.testing.mode", "true");
-        properties.put(ServerConfig.ADMIN_GROUPS, "hive,admin");
-        properties.put(ServerConfig.RPC_ADDRESS, "localhost");
-        properties.put(ServerConfig.RPC_PORT, String.valueOf(0));
-        properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
+    // Start Sentry
+    startSentry();
 
-        properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
-        properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());
-        properties.put(ServerConfig.SENTRY_STORE_JDBC_URL,
-            "jdbc:derby:;databaseName=" + baseDir.getPath()
-                + "/sentrystore_db;create=true");
-        properties.put("sentry.service.processor.factories",
-            "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
-        properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin");
-        properties.put(ServerConfig.RPC_MIN_THREADS, "3");
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-          sentryConf.set(entry.getKey(), entry.getValue());
-        }
-        SentryService sentryServer = new SentryServiceFactory().create(sentryConf);
-        properties.put(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.getAddress()
-            .getHostName());
-        sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.getAddress()
-            .getHostName());
-        properties.put(ClientConfig.SERVER_RPC_PORT,
-            String.valueOf(sentryServer.getAddress().getPort()));
-        sentryConf.set(ClientConfig.SERVER_RPC_PORT,
-            String.valueOf(sentryServer.getAddress().getPort()));
-        startSentryService(sentryServer);
-        sentryPort = sentryServer.getAddress().getPort();
-        System.out.println("\n\n Sentry port : " + sentryPort + "\n\n");
-        return null;
-      }
-    });
-
-    admin.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data");
-        Configuration conf = new HdfsConfiguration();
-        conf.set(DFSConfigKeys.DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY,
-            SentryAuthorizationProvider.class.getName());
-        conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
-        
-        File dfsDir = assertCreateDir(new File(baseDir, "dfs"));
-        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath());
-        conf.set("hadoop.security.group.mapping",
-            MiniDFS.PseudoGroupMappingService.class.getName());
-        Configuration.addDefaultResource("test.xml");
+    // Start HDFS and MR
+    startDFSandYARN();
 
-        conf.set("sentry.authorization-provider.hdfs-path-prefixes", "/user/hive/warehouse");
-        conf.set("sentry.hdfs.service.security.mode", "none");
-        conf.set("sentry.hdfs.service.client.server.rpc-address", "localhost");
-        conf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort));
-        EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
-        miniDFS = new MiniDFSCluster.Builder(conf).build();
-        Path tmpPath = new Path("/tmp");
-        Path hivePath = new Path("/user/hive");
-        Path warehousePath = new Path(hivePath, "warehouse");
-        miniDFS.getFileSystem().mkdirs(warehousePath);
-        boolean directory = miniDFS.getFileSystem().isDirectory(warehousePath);
-        System.out.println("\n\n Is dir :" + directory + "\n\n");
-        System.out.println("\n\n DefaultFS :" + miniDFS.getFileSystem().getUri() + "\n\n");
-        fsURI = miniDFS.getFileSystem().getUri().toString();
-        miniDFS.getFileSystem().mkdirs(tmpPath);
-        miniDFS.getFileSystem().setPermission(tmpPath, FsPermission.valueOf("drwxrwxrwx"));
-        miniDFS.getFileSystem().setOwner(hivePath, "hive", "hive");
-        miniDFS.getFileSystem().setOwner(warehousePath, "hive", "hive");
-        System.out.println("\n\n Owner :"
-            + miniDFS.getFileSystem().getFileStatus(warehousePath).getOwner()
-            + ", "
-            + miniDFS.getFileSystem().getFileStatus(warehousePath).getGroup()
-            + "\n\n");
-        System.out.println("\n\n Owner tmp :"
-            + miniDFS.getFileSystem().getFileStatus(tmpPath).getOwner() + ", "
-            + miniDFS.getFileSystem().getFileStatus(tmpPath).getGroup() + ", "
-            + miniDFS.getFileSystem().getFileStatus(tmpPath).getPermission() + ", "
-            + "\n\n");
-        return null;
-      }
-    });
+    // Start HiveServer2 and Metastore
+    startHiveAndMetastore();
 
+  }
 
+  private void startHiveAndMetastore() throws IOException, InterruptedException {
     hiveUgi.doAs(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
@@ -286,7 +261,6 @@ public class TestHDFSIntegration {
         authzConf.writeXml(out);
         out.close();
 
-//        hiveConf.set("hive.sentry.conf.url", "file://" + accessSite.getCanonicalPath());
         hiveConf.set("hive.sentry.conf.url", accessSite.getPath());
         System.out.println("Sentry client file : " + accessSite.getPath());
 
@@ -312,7 +286,114 @@ public class TestHDFSIntegration {
         return null;
       }
     });
+  }
+
+  private void startDFSandYARN() throws IOException,
+      InterruptedException {
+    adminUgi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data");
+        Configuration conf = new HdfsConfiguration();
+        conf.set(DFSConfigKeys.DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY,
+            SentryAuthorizationProvider.class.getName());
+        conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+        conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+        File dfsDir = assertCreateDir(new File(baseDir, "dfs"));
+        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath());
+        conf.set("hadoop.security.group.mapping",
+            MiniDFS.PseudoGroupMappingService.class.getName());
+        Configuration.addDefaultResource("test.xml");
+
+        conf.set("sentry.authorization-provider.hdfs-path-prefixes", "/user/hive/warehouse");
+        conf.set("sentry.authorization-provider.cache-refresh-retry-wait.ms", "5000");
+        conf.set("sentry.authorization-provider.cache-stale-threshold.ms", "3000");
+
+        conf.set("sentry.hdfs.service.security.mode", "none");
+        conf.set("sentry.hdfs.service.client.server.rpc-address", "localhost");
+        conf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort));
+        EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+        miniDFS = new MiniDFSCluster.Builder(conf).build();
+        Path tmpPath = new Path("/tmp");
+        Path hivePath = new Path("/user/hive");
+        Path warehousePath = new Path(hivePath, "warehouse");
+        miniDFS.getFileSystem().mkdirs(warehousePath);
+        boolean directory = miniDFS.getFileSystem().isDirectory(warehousePath);
+        System.out.println("\n\n Is dir :" + directory + "\n\n");
+        System.out.println("\n\n DefaultFS :" + miniDFS.getFileSystem().getUri() + "\n\n");
+        fsURI = miniDFS.getFileSystem().getUri().toString();
+        conf.set("fs.defaultFS", fsURI);
+
+        // Create Yarn cluster
+        // miniMR = MiniMRClientClusterFactory.create(this.getClass(), 1, conf);
+
+        miniDFS.getFileSystem().mkdirs(tmpPath);
+        miniDFS.getFileSystem().setPermission(tmpPath, FsPermission.valueOf("drwxrwxrwx"));
+        miniDFS.getFileSystem().setOwner(hivePath, "hive", "hive");
+        miniDFS.getFileSystem().setOwner(warehousePath, "hive", "hive");
+        System.out.println("\n\n Owner :"
+            + miniDFS.getFileSystem().getFileStatus(warehousePath).getOwner()
+            + ", "
+            + miniDFS.getFileSystem().getFileStatus(warehousePath).getGroup()
+            + "\n\n");
+        System.out.println("\n\n Owner tmp :"
+            + miniDFS.getFileSystem().getFileStatus(tmpPath).getOwner() + ", "
+            + miniDFS.getFileSystem().getFileStatus(tmpPath).getGroup() + ", "
+            + miniDFS.getFileSystem().getFileStatus(tmpPath).getPermission() + ", "
+            + "\n\n");
+        return null;
+      }
+    });
+  }
 
+  private void startSentry() throws IOException,
+      InterruptedException {
+    hiveUgi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        Configuration sentryConf = new Configuration(false);
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put(HiveServerFactory.AUTHZ_PROVIDER_BACKEND,
+            SimpleDBProviderBackend.class.getName());
+        properties.put(ConfVars.HIVE_AUTHORIZATION_TASK_FACTORY.varname,
+            SentryHiveAuthorizationTaskFactoryImpl.class.getName());
+        properties
+            .put(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS.varname, "2");
+        properties.put("hive.metastore.uris", "thrift://localhost:" + hmsPort);
+        properties.put(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE);
+        properties.put("sentry.hive.testing.mode", "true");
+        properties.put(ServerConfig.ADMIN_GROUPS, "hive,admin");
+        properties.put(ServerConfig.RPC_ADDRESS, "localhost");
+        properties.put(ServerConfig.RPC_PORT, String.valueOf(sentryPort < 0 ? 0 : sentryPort));
+        properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false");
+
+        properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING);
+        properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());
+        properties.put(ServerConfig.SENTRY_STORE_JDBC_URL,
+            "jdbc:derby:;databaseName=" + baseDir.getPath()
+                + "/sentrystore_db;create=true");
+        properties.put("sentry.service.processor.factories",
+            "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory");
+        properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin");
+        properties.put(ServerConfig.RPC_MIN_THREADS, "3");
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+          sentryConf.set(entry.getKey(), entry.getValue());
+        }
+        sentryService = new SentryServiceFactory().create(sentryConf);
+        properties.put(ClientConfig.SERVER_RPC_ADDRESS, sentryService.getAddress()
+            .getHostName());
+        sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryService.getAddress()
+            .getHostName());
+        properties.put(ClientConfig.SERVER_RPC_PORT,
+            String.valueOf(sentryService.getAddress().getPort()));
+        sentryConf.set(ClientConfig.SERVER_RPC_PORT,
+            String.valueOf(sentryService.getAddress().getPort()));
+        waitOnSentryService();
+        sentryPort = sentryService.getAddress().getPort();
+        System.out.println("\n\n Sentry port : " + sentryPort + "\n\n");
+        return null;
+      }
+    });
   }
 
   @After
@@ -334,25 +415,8 @@ public class TestHDFSIntegration {
     }
   }
 
-//  public Connection createConnection(String username) throws Exception {
-//    String password = username;
-//    Connection connection =  hiveServer2.createConnection(username, password);
-//    assertNotNull("Connection is null", connection);
-//    assertFalse("Connection should not be closed", connection.isClosed());
-//    Statement statement  = connection.createStatement();
-//    statement.close();
-//    return connection;
-//  }
-//
-//  public Statement createStatement(Connection connection)
-//  throws Exception {
-//    Statement statement  = connection.createStatement();
-//    assertNotNull("Statement is null", statement);
-//    return statement;
-//  }
-
   @Test
-  public void testSimple() throws Exception {
+  public void testEnd2End() throws Exception {
     Connection conn = hiveServer2.createConnection("hive", "hive");
     Statement stmt = conn.createStatement();
     stmt.execute("create role admin_role");
@@ -363,38 +427,189 @@ public class TestHDFSIntegration {
     stmt.execute("alter table p1 add partition (month=1, day=2)");
     stmt.execute("alter table p1 add partition (month=2, day=1)");
     stmt.execute("alter table p1 add partition (month=2, day=2)");
-    AclStatus aclStatus = miniDFS.getFileSystem().getAclStatus(new Path("/user/hive/warehouse/p1"));
-    Set<String> groups = new HashSet<String>(); 
-    for (AclEntry ent : aclStatus.getEntries()) {
-      if (ent.getType().equals(AclEntryType.GROUP)) {
-        groups.add(ent.getName());
-      }
-    }
-    System.out.println("Final acls [" + aclStatus + "]");
-    Assert.assertEquals(false, groups.contains("hbase"));
 
     stmt.execute("create role p1_admin");
     stmt.execute("grant role p1_admin to group hbase");
+
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false);
+
+    loadData(stmt);
+
+    verifyHDFSandMR(stmt);
+
+    stmt.execute("revoke select on table p1 from role p1_admin");
+    Thread.sleep(1000);
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false);
+
+    stmt.execute("grant all on table p1 to role p1_admin");
+    Thread.sleep(1000);
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.ALL, "hbase", true);
+
+    stmt.execute("revoke select on table p1 from role p1_admin");
+    Thread.sleep(1000);
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.WRITE_EXECUTE, "hbase", true);
+
+    sentryService.stop();
+    // Verify that Sentry permission are still enforced for the "stale" period
+    Thread.sleep(500);
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.WRITE_EXECUTE, "hbase", true);
+
+    // Verify that Sentry permission are NOT enforced AFTER "stale" period
+    Thread.sleep(3000);
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false);
+
+    startSentry();
+    // Verify that After Sentry restart permissions are re-enforced
+    Thread.sleep(5000);
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.WRITE_EXECUTE, "hbase", true);
+
+    // Create new table and verify everything is fine after restart...
+    stmt.execute("create table p2 (s string) partitioned by (month int, day int)");
+    try {
+      stmt.execute("alter table p2 add partition (month=1, day=1)");
+    } catch (Exception e) {
+      // Metastore throws and exception first time after sentry restart
+      stmt.execute("alter table p2 add partition (month=1, day=1)");
+    }
+    stmt.execute("alter table p2 add partition (month=1, day=2)");
+    stmt.execute("alter table p2 add partition (month=2, day=1)");
+    stmt.execute("alter table p2 add partition (month=2, day=2)");
+
+    Thread.sleep(1000);
+    verifyOnAllSubDirs("/user/hive/warehouse/p2", null, "hbase", false);
+
+    stmt.execute("grant select on table p2 to role p1_admin");
+    Thread.sleep(1000);
+    verifyOnAllSubDirs("/user/hive/warehouse/p2", FsAction.READ_EXECUTE, "hbase", true);
+
+    stmt.close();
+    conn.close();
+  }
+
+  private void loadData(Statement stmt) throws IOException, SQLException {
+    FSDataOutputStream f1 = miniDFS.getFileSystem().create(new Path("/tmp/f1.txt"));
+    f1.writeChars("m1d1_t1\n");
+    f1.writeChars("m1d1_t2\n");
+    f1.writeChars("m1d1_t3\n");
+    f1.flush();
+    f1.close();
+    stmt.execute("load data inpath \'/tmp/f1.txt\' overwrite into table p1 partition (month=1,
day=1)");
+    FSDataOutputStream f2 = miniDFS.getFileSystem().create(new Path("/tmp/f2.txt"));
+    f2.writeChars("m2d2_t4\n");
+    f2.writeChars("m2d2_t5\n");
+    f2.writeChars("m2d2_t6\n");
+    f2.flush();
+    f2.close();
+    stmt.execute("load data inpath \'/tmp/f2.txt\' overwrite into table p1 partition (month=2,
day=2)");
+    ResultSet rs = stmt.executeQuery("select * from p1");
+    List<String> vals = new ArrayList<String>(); 
+    while (rs.next()) {
+      vals.add(rs.getString(1));
+    }
+    Assert.assertEquals(6, vals.size());
+    rs.close();
+  }
+
+  private void verifyHDFSandMR(Statement stmt) throws IOException,
+      InterruptedException, SQLException, Exception {
+    // hbase user should not be allowed to read...
+    UserGroupInformation hbaseUgi = UserGroupInformation.createUserForTesting("hbase", new
String[] {"hbase"});
+    hbaseUgi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        try {
+          miniDFS.getFileSystem().open(new Path("/user/hive/warehouse/p1/month=1/day=1/f1.txt"));
+          Assert.fail("Should not be allowed !!");
+        } catch (Exception e) {
+          Assert.assertEquals("Wrong Error : " + e.getMessage(), true, e.getMessage().contains("Permission
denied: user=hbase"));
+        }
+        return null;
+      }
+    });
+
+    // WordCount should fail..
+    // runWordCount(new JobConf(miniMR.getConfig()), "/user/hive/warehouse/p1/month=1/day=1",
"/tmp/wc_out");
+
     stmt.execute("grant select on table p1 to role p1_admin");
+
     Thread.sleep(1000);
-    aclStatus = miniDFS.getFileSystem().getAclStatus(new Path("/user/hive/warehouse/p1"));
-    groups = new HashSet<String>();
-    for (AclEntry ent : aclStatus.getEntries()) {
-      if (ent.getType().equals(AclEntryType.GROUP)) {
-        groups.add(ent.getName());
+    verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.READ_EXECUTE, "hbase", true);
+    // hbase user should now be allowed to read...
+    hbaseUgi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        Path p = new Path("/user/hive/warehouse/p1/month=2/day=2/f2.txt");
+        BufferedReader in = new BufferedReader(new InputStreamReader(miniDFS.getFileSystem().open(p)));
+        String line = null;
+        List<String> lines = new ArrayList<String>();
+        do {
+          line = in.readLine();
+          if (line != null) lines.add(line);
+        } while (line != null);
+        Assert.assertEquals(3, lines.size());
+        in.close();
+        return null;
       }
+    });
+
+  }
+
+  private void verifyOnAllSubDirs(String path, FsAction fsAction, String group, boolean groupShouldExist)
throws Exception {
+    verifyOnAllSubDirs(new Path(path), fsAction, group, groupShouldExist);
+  }
+
+  private void verifyOnAllSubDirs(Path p, FsAction fsAction, String group, boolean groupShouldExist)
throws Exception {
+    FileStatus fStatus = miniDFS.getFileSystem().getFileStatus(p);
+    if (groupShouldExist) {
+      Assert.assertEquals(fsAction, getAcls(p).get(group));
+    } else {
+      Assert.assertFalse(getAcls(p).containsKey(group));
     }
-    Assert.assertEquals(true, groups.contains("hbase"));
+    if (fStatus.isDirectory()) {
+      FileStatus[] children = miniDFS.getFileSystem().listStatus(p);
+      for (FileStatus fs : children) {
+        verifyOnAllSubDirs(fs.getPath(), fsAction, group, groupShouldExist);
+      }
+    }
+  }
 
-    stmt.execute("revoke select on table p1 from role p1_admin");
-    Thread.sleep(1000);
-    aclStatus = miniDFS.getFileSystem().getAclStatus(new Path("/user/hive/warehouse/p1"));
-    groups = new HashSet<String>();
+  private Map<String, FsAction> getAcls(Path path) throws Exception {
+    AclStatus aclStatus = miniDFS.getFileSystem().getAclStatus(path);
+    Map<String, FsAction> acls = new HashMap<String, FsAction>();
     for (AclEntry ent : aclStatus.getEntries()) {
       if (ent.getType().equals(AclEntryType.GROUP)) {
-        groups.add(ent.getName());
+        acls.put(ent.getName(), ent.getPermission());
       }
     }
-    Assert.assertEquals(false, groups.contains("hbase"));
+    return acls;
+  }
+
+  private void runWordCount(JobConf job, String inPath, String outPath) throws Exception
{
+    Path in = new Path(inPath);
+    Path out = new Path(outPath);
+    miniDFS.getFileSystem().delete(out, true);
+    job.setJobName("TestWC");
+    JobClient jobClient = new JobClient(job);
+    RunningJob submittedJob = null;
+    FileInputFormat.setInputPaths(job, in);
+    FileOutputFormat.setOutputPath(job, out);
+    job.set("mapreduce.output.textoutputformat.separator", " ");
+    job.setInputFormat(TextInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+    job.setMapperClass(WordCountMapper.class);
+    job.setReducerClass(SumReducer.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setNumReduceTasks(1);
+    job.setInt("mapreduce.map.maxattempts", 1);
+    job.setInt("mapreduce.reduce.maxattempts", 1);
+
+    submittedJob = jobClient.submitJob(job);
+    if (!jobClient.monitorAndPrintJob(job, submittedJob)) {
+      throw new IOException("job Failed !!");
+    }
+    
   }
 }


Mime
View raw message