sentry-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pras...@apache.org
Subject [24/25] SENTRY-432: Synchronization of HDFS permissions with Sentry permissions. Initial patch (Arun Suresh via Prasad Mujumdar)
Date Fri, 10 Oct 2014 03:48:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
new file mode 100644
index 0000000..17f15f0
--- /dev/null
+++ b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class MetastorePlugin extends SentryMetastoreListenerPlugin {
+  
+  private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class);
+  
+  private final Configuration conf;
+  private SentryHDFSServiceClient sentryClient;
+
+  //Initialized to some value > 1 so that the first update notification
+ // will trigger a full Image fetch
+  private final AtomicInteger seqNum = new AtomicInteger(5);
+
+  public MetastorePlugin(Configuration conf) {
+    this.conf = conf;
+    try {
+      sentryClient = new SentryHDFSServiceClient(conf);
+    } catch (IOException e) {
+      sentryClient = null;
+      LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
+    }
+  }
+
+  @Override
+  public void addPath(String authzObj, String path) {
+    PathsUpdate update = createHMSUpdate();
+    update.newPathChange(authzObj).addToAddPaths(PathsUpdate.cleanPath(path));
+    try {
+      notifySentry(update);
+    } catch (MetaException e) {
+      LOGGER.error("Could not send update to Sentry HDFS Service !!", e);
+    }
+  }
+
+  @Override
+  public void removeAllPaths(String authzObj) {
+    PathsUpdate update = createHMSUpdate();
+    update.newPathChange(authzObj).addToDelPaths(Lists.newArrayList(PathsUpdate.ALL_PATHS));
+    try {
+      notifySentry(update);
+    } catch (MetaException e) {
+      LOGGER.error("Could not send update to Sentry HDFS Service !!", e);
+    }
+  }
+
+  @Override
+  public void removePath(String authzObj, String path) {
+    PathsUpdate update = createHMSUpdate();
+    update.newPathChange(authzObj).addToDelPaths(PathsUpdate.cleanPath(path));
+    try {
+      notifySentry(update);
+    } catch (MetaException e) {
+      LOGGER.error("Could not send update to Sentry HDFS Service !!", e);
+    }
+  }
+
+  private SentryHDFSServiceClient getClient() {
+    if (sentryClient == null) {
+      try {
+        sentryClient = new SentryHDFSServiceClient(conf);
+      } catch (IOException e) {
+        sentryClient = null;
+        LOGGER.error("Could not connect to Sentry HDFS Service !!", e);
+      }
+    }
+    return sentryClient;
+  }
+
+  private PathsUpdate createHMSUpdate() {
+    PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false);
+    return update;
+  }
+
+  private void notifySentry(PathsUpdate update) throws MetaException {
+    try {
+      getClient().notifyHMSUpdate(update);
+    } catch (IOException e) {
+      throw new MetaException("Error sending update to Sentry [" + e.getMessage() + "]");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
new file mode 100644
index 0000000..2b1b554
--- /dev/null
+++ b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceClient.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Client;
+import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
+import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
+import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
+import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SentryHDFSServiceClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceClient.class);
+  
+  public static class SentryAuthzUpdate {
+
+    private final List<PermissionsUpdate> permUpdates;
+    private final List<PathsUpdate> pathUpdates;
+
+    public SentryAuthzUpdate(List<PermissionsUpdate> permUpdates, List<PathsUpdate> pathUpdates) {
+      this.permUpdates = permUpdates;
+      this.pathUpdates = pathUpdates;
+    }
+
+    public List<PermissionsUpdate> getPermUpdates() {
+      return permUpdates;
+    }
+
+    public List<PathsUpdate> getPathUpdates() {
+      return pathUpdates;
+    }
+  }
+  
+  /**
+   * This transport wraps the Sasl transports to set up the right UGI context for open().
+   */
+  public static class UgiSaslClientTransport extends TSaslClientTransport {
+    protected UserGroupInformation ugi = null;
+
+    public UgiSaslClientTransport(String mechanism, String authorizationId,
+        String protocol, String serverName, Map<String, String> props,
+        CallbackHandler cbh, TTransport transport, boolean wrapUgi)
+        throws IOException {
+      super(mechanism, authorizationId, protocol, serverName, props, cbh,
+          transport);
+      if (wrapUgi) {
+        ugi = UserGroupInformation.getLoginUser();
+      }
+    }
+
+    // open the SASL transport with using the current UserGroupInformation
+    // This is needed to get the current login context stored
+    @Override
+    public void open() throws TTransportException {
+      if (ugi == null) {
+        baseOpen();
+      } else {
+        try {
+          ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            public Void run() throws TTransportException {
+              baseOpen();
+              return null;
+            }
+          });
+        } catch (IOException e) {
+          throw new TTransportException("Failed to open SASL transport", e);
+        } catch (InterruptedException e) {
+          throw new TTransportException(
+              "Interrupted while opening underlying transport", e);
+        }
+      }
+    }
+
+    private void baseOpen() throws TTransportException {
+      super.open();
+    }
+  }
+
+  private final Configuration conf;
+  private final InetSocketAddress serverAddress;
+  private final int connectionTimeout;
+  private boolean kerberos;
+  private TTransport transport;
+
+  private String[] serverPrincipalParts;
+  private Client client;
+  
+  public SentryHDFSServiceClient(Configuration conf) throws IOException {
+    this.conf = conf;
+    Preconditions.checkNotNull(this.conf, "Configuration object cannot be null");
+    this.serverAddress = NetUtils.createSocketAddr(Preconditions.checkNotNull(
+                           conf.get(ClientConfig.SERVER_RPC_ADDRESS), "Config key "
+                           + ClientConfig.SERVER_RPC_ADDRESS + " is required"), conf.getInt(
+                           ClientConfig.SERVER_RPC_PORT, ClientConfig.SERVER_RPC_PORT_DEFAULT));
+    this.connectionTimeout = conf.getInt(ClientConfig.SERVER_RPC_CONN_TIMEOUT,
+                                         ClientConfig.SERVER_RPC_CONN_TIMEOUT_DEFAULT);
+    kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
+        conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim());
+    transport = new TSocket(serverAddress.getHostName(),
+        serverAddress.getPort(), connectionTimeout);
+    if (kerberos) {
+      String serverPrincipal = Preconditions.checkNotNull(
+          conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required");
+
+      // Resolve server host in the same way as we are doing on server side
+      serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress());
+      LOGGER.info("Using server kerberos principal: " + serverPrincipal);
+
+      serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal);
+      Preconditions.checkArgument(serverPrincipalParts.length == 3,
+           "Kerberos principal should have 3 parts: " + serverPrincipal);
+      boolean wrapUgi = "true".equalsIgnoreCase(conf
+          .get(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "true"));
+      transport = new UgiSaslClientTransport(AuthMethod.KERBEROS.getMechanismName(),
+          null, serverPrincipalParts[0], serverPrincipalParts[1],
+          ClientConfig.SASL_PROPERTIES, null, transport, wrapUgi);
+    } else {
+      serverPrincipalParts = null;
+    }
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      throw new IOException("Transport exception while opening transport: " + e.getMessage(), e);
+    }
+    LOGGER.info("Successfully opened transport: " + transport + " to " + serverAddress);
+    TMultiplexedProtocol protocol = new TMultiplexedProtocol(
+      new TCompactProtocol(transport),
+      SentryHDFSServiceProcessor.SENTRY_HDFS_SERVICE_NAME);
+    client = new SentryHDFSService.Client(protocol);
+    LOGGER.info("Successfully created client");
+  }
+
+  public synchronized void notifyHMSUpdate(PathsUpdate update)
+      throws IOException {
+    try {
+      client.handle_hms_notification(update.getThriftObject());
+    } catch (Exception e) {
+      throw new IOException("Thrift Exception occurred !!", e);
+    }
+  }
+
+  public synchronized SentryAuthzUpdate getAllUpdatesFrom(long permSeqNum, long pathSeqNum)
+      throws IOException {
+    SentryAuthzUpdate retVal = new SentryAuthzUpdate(new LinkedList<PermissionsUpdate>(), new LinkedList<PathsUpdate>());
+    try {
+      TAuthzUpdateResponse sentryUpdates = client.get_all_authz_updates_from(permSeqNum, pathSeqNum);
+      if (sentryUpdates.getAuthzPathUpdate() != null) {
+        for (TPathsUpdate pathsUpdate : sentryUpdates.getAuthzPathUpdate()) {
+          retVal.getPathUpdates().add(new PathsUpdate(pathsUpdate));
+        }
+      }
+      if (sentryUpdates.getAuthzPermUpdate() != null) {
+        for (TPermissionsUpdate permsUpdate : sentryUpdates.getAuthzPermUpdate()) {
+          retVal.getPermUpdates().add(new PermissionsUpdate(permsUpdate));
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException("Thrift Exception occurred !!", e);
+    }
+    return retVal;
+  }
+
+  public void close() {
+    if (transport != null) {
+      transport.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
new file mode 100644
index 0000000..1198619
--- /dev/null
+++ b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse;
+import org.apache.sentry.hdfs.service.thrift.TPathsUpdate;
+import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceProcessor.class);
+
+  public static final String SENTRY_HDFS_SERVICE_NAME = "SentryHDFSService";
+
+  @Override
+  public TAuthzUpdateResponse get_all_authz_updates_from(long permSeqNum, long pathSeqNum)
+      throws TException {
+    TAuthzUpdateResponse retVal = new TAuthzUpdateResponse();
+    retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>());
+    retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>());
+    if (SentryPlugin.instance != null) {
+      List<PermissionsUpdate> permUpdates = SentryPlugin.instance.getAllPermsUpdatesFrom(permSeqNum);
+      List<PathsUpdate> pathUpdates = SentryPlugin.instance.getAllPathsUpdatesFrom(pathSeqNum);
+      try {
+        for (PathsUpdate update : pathUpdates) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("### Sending PATH preUpdate seq [" + update.getSeqNum() + "] ###");
+            LOGGER.debug("### Sending PATH preUpdate [" + update.getThriftObject() + "] ###");
+          }
+          retVal.getAuthzPathUpdate().add(update.getThriftObject());
+        }
+        for (PermissionsUpdate update : permUpdates) {
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("### Sending PERM preUpdate seq [" + update.getSeqNum() + "] ###");
+            LOGGER.debug("### Sending PERM preUpdate [" + update.getThriftObject() + "] ###");
+          }
+          retVal.getAuthzPermUpdate().add(update.getThriftObject());
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error Sending updates to downstream Cache", e);
+        throw new TException(e);
+      }
+    } else {
+      LOGGER.error("SentryPlugin not initialized yet !!");
+    }
+    
+    return retVal;
+  }
+
+  @Override
+  public void handle_hms_notification(TPathsUpdate update) throws TException {
+    try {
+      PathsUpdate hmsUpdate = new PathsUpdate(update);
+      if (SentryPlugin.instance != null) {
+        SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate);
+        LOGGER.info("Authz Paths update [" + hmsUpdate.getSeqNum() + "]..");
+      } else {
+        LOGGER.error("SentryPlugin not initialized yet !!");
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error handling notification from HMS", e);
+      throw new TException(e);
+    }
+  }
+
+  /**
+   * Not implemented for the time being..
+   */
+  @Override
+  public Map<String, List<String>> get_all_related_paths(String arg0,
+      boolean arg1) throws TException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
new file mode 100644
index 0000000..bf64bbc
--- /dev/null
+++ b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessorFactory.java
@@ -0,0 +1,86 @@
+package org.apache.sentry.hdfs;
+
+import java.net.Socket;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService;
+import org.apache.sentry.hdfs.service.thrift.SentryHDFSService.Iface;
+import org.apache.sentry.provider.db.log.util.CommandUtil;
+import org.apache.sentry.service.thrift.ProcessorFactory;
+import org.apache.thrift.TException;
+import org.apache.thrift.TMultiplexedProcessor;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSaslClientTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SentryHDFSServiceProcessorFactory extends ProcessorFactory{
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceProcessorFactory.class);
+
+  static class ProcessorWrapper extends SentryHDFSService.Processor<SentryHDFSService.Iface> {
+
+    public ProcessorWrapper(Iface iface) {
+      super(iface);
+    }
+    @Override
+    public boolean process(TProtocol in, TProtocol out) throws TException {
+      setIpAddress(in);
+      setImpersonator(in);
+      return super.process(in, out);
+    }
+
+    private void setImpersonator(final TProtocol in) {
+      TTransport transport = in.getTransport();
+      if (transport instanceof TSaslServerTransport) {
+        String impersonator = ((TSaslServerTransport) transport).getSaslServer().getAuthorizationID();
+        CommandUtil.setImpersonator(impersonator);
+      }
+    }
+
+    private void setIpAddress(final TProtocol in) {
+      TTransport transport = in.getTransport();
+      TSocket tSocket = getUnderlyingSocketFromTransport(transport);
+      if (tSocket != null) {
+        setIpAddress(tSocket.getSocket());
+      } else {
+        LOGGER.warn("Unknown Transport, cannot determine ipAddress");
+      }
+    }
+
+    private void setIpAddress(Socket socket) {
+      CommandUtil.setIpAddress(socket.getInetAddress().toString());
+    }
+
+    private TSocket getUnderlyingSocketFromTransport(TTransport transport) {
+      if (transport != null) {
+        if (transport instanceof TSaslServerTransport) {
+          transport = ((TSaslServerTransport) transport).getUnderlyingTransport();
+        } else if (transport instanceof TSaslClientTransport) {
+          transport = ((TSaslClientTransport) transport).getUnderlyingTransport();
+        } else if (transport instanceof TSocket) {
+          return (TSocket) transport;
+        }
+      }
+      return null;
+    }
+  }
+
+  public SentryHDFSServiceProcessorFactory(Configuration conf) {
+    super(conf);
+  }
+
+
+  public boolean register(TMultiplexedProcessor multiplexedProcessor) throws Exception {
+    SentryHDFSServiceProcessor sentryServiceHandler =
+        new SentryHDFSServiceProcessor();
+    TProcessor processor = new ProcessorWrapper(sentryServiceHandler);
+    multiplexedProcessor.registerProcessor(
+        SentryHDFSServiceProcessor.SENTRY_HDFS_SERVICE_NAME, processor);
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
new file mode 100644
index 0000000..262e893
--- /dev/null
+++ b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
+import org.apache.sentry.hdfs.service.thrift.TPathChanges;
+import org.apache.sentry.hdfs.service.thrift.TPermissionsUpdate;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
+import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
+import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
+import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest;
+import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest;
+import org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleRevokePrivilegeRequest;
+import org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
+import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
+import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class SentryPlugin implements SentryPolicyStorePlugin {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class);
+
+  public static volatile SentryPlugin instance;
+
+  static class PermImageRetriever implements ExternalImageRetriever<PermissionsUpdate> {
+
+    private final SentryStore sentryStore;
+
+    public PermImageRetriever(SentryStore sentryStore) {
+      this.sentryStore = sentryStore;
+    }
+
+    @Override
+    public PermissionsUpdate retrieveFullImage(long currSeqNum) {
+      Map<String, HashMap<String, String>> privilegeImage = sentryStore.retrieveFullPrivilegeImage();
+      Map<String, LinkedList<String>> roleImage = sentryStore.retrieveFullRoleImage();
+      
+      TPermissionsUpdate tPermUpdate = new TPermissionsUpdate(true, currSeqNum,
+          new HashMap<String, TPrivilegeChanges>(),
+          new HashMap<String, TRoleChanges>());
+      for (Map.Entry<String, HashMap<String, String>> privEnt : privilegeImage.entrySet()) {
+        String authzObj = privEnt.getKey();
+        HashMap<String,String> privs = privEnt.getValue();
+        tPermUpdate.putToPrivilegeChanges(authzObj, new TPrivilegeChanges(
+            authzObj, privs, new HashMap<String, String>()));
+      }
+      for (Map.Entry<String, LinkedList<String>> privEnt : roleImage.entrySet()) {
+        String role = privEnt.getKey();
+        LinkedList<String> groups = privEnt.getValue();
+        tPermUpdate.putToRoleChanges(role, new TRoleChanges(role, groups, new LinkedList<String>()));
+      }
+      PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate);
+      permissionsUpdate.setSeqNum(currSeqNum);
+      return permissionsUpdate;
+    }
+    
+  }
+
+  private UpdateForwarder<PathsUpdate> pathsUpdater;
+  private UpdateForwarder<PermissionsUpdate> permsUpdater;
+  private final AtomicLong permSeqNum = new AtomicLong(5);
+
+  @Override
+  public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException {
+    HiveConf hiveConf = new HiveConf(conf, Configuration.class);
+    final MetastoreClient hmsClient = new ExtendedMetastoreClient(hiveConf);
+    final String[] pathPrefixes = conf
+        .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES, new String[]{"/"});
+    pathsUpdater = new UpdateForwarder<PathsUpdate>(new UpdateableAuthzPaths(
+        pathPrefixes), createHMSImageRetriever(pathPrefixes, hmsClient), 100);
+    PermImageRetriever permImageRetriever = new PermImageRetriever(sentryStore);
+    permsUpdater = new UpdateForwarder<PermissionsUpdate>(
+        new UpdateablePermissions(permImageRetriever), permImageRetriever, 100);
+    instance = this;
+  }
+
+  public List<PathsUpdate> getAllPathsUpdatesFrom(long pathSeqNum) {
+    return pathsUpdater.getAllUpdatesFrom(pathSeqNum);
+  }
+
+  public List<PermissionsUpdate> getAllPermsUpdatesFrom(long permSeqNum) {
+    return permsUpdater.getAllUpdatesFrom(permSeqNum);
+  }
+
+  public void handlePathUpdateNotification(PathsUpdate update) {
+    pathsUpdater.handleUpdateNotification(update);
+    LOGGER.info("Recieved Authz Path update [" + update.getSeqNum() + "]..");
+  }
+
+  private ExternalImageRetriever<PathsUpdate> createHMSImageRetriever(
+      final String[] pathPrefixes, final MetastoreClient hmsClient) {
+    return new ExternalImageRetriever<PathsUpdate>() {
+      @Override
+      public PathsUpdate retrieveFullImage(long currSeqNum) {
+        PathsUpdate tempUpdate = new PathsUpdate(currSeqNum, false);
+        List<Database> allDatabases = hmsClient.getAllDatabases();
+        for (Database db : allDatabases) {
+          tempUpdate.newPathChange(db.getName()).addToAddPaths(
+              PathsUpdate.cleanPath(db.getLocationUri()));
+          List<Table> allTables = hmsClient.getAllTablesOfDatabase(db);
+          for (Table tbl : allTables) {
+            TPathChanges tblPathChange = tempUpdate.newPathChange(tbl
+                .getDbName() + "." + tbl.getTableName());
+            List<Partition> tblParts = hmsClient.listAllPartitions(db, tbl);
+            tblPathChange.addToAddPaths(PathsUpdate.cleanPath(tbl.getSd()
+                    .getLocation() == null ? db.getLocationUri() : tbl
+                    .getSd().getLocation()));
+            for (Partition part : tblParts) {
+              tblPathChange.addToAddPaths(PathsUpdate.cleanPath(part.getSd()
+                  .getLocation()));
+            }
+          }
+        }
+        UpdateableAuthzPaths tmpAuthzPaths = new UpdateableAuthzPaths(
+            pathPrefixes);
+        tmpAuthzPaths.updatePartial(Lists.newArrayList(tempUpdate),
+            new ReentrantReadWriteLock());
+        PathsUpdate retUpdate = new PathsUpdate(currSeqNum, true);
+        retUpdate.getThriftObject().setPathsDump(
+            tmpAuthzPaths.getPathsDump().createPathsDump());
+        return retUpdate;
+      }
+    };
+  }
+
+  @Override
+  public void onAlterSentryRoleAddGroups(
+      TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException {
+    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
+    for (TSentryGroup group : request.getGroups()) {
+      rUpdate.addToAddGroups(group.getGroupName());
+    }
+    permsUpdater.handleUpdateNotification(update);
+    LOGGER.info("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+  }
+
+  @Override
+  public void onAlterSentryRoleDeleteGroups(
+      TAlterSentryRoleDeleteGroupsRequest request)
+      throws SentryPluginException {
+    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName());
+    for (TSentryGroup group : request.getGroups()) {
+      rUpdate.addToDelGroups(group.getGroupName());
+    }
+    permsUpdater.handleUpdateNotification(update);
+    LOGGER.info("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+  }
+
+  @Override
+  public void onAlterSentryRoleGrantPrivilege(
+      TAlterSentryRoleGrantPrivilegeRequest request)
+      throws SentryPluginException {
+    String authzObj = getAuthzObj(request.getPrivilege());
+    if (authzObj != null) {
+      PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+      update.addPrivilegeUpdate(authzObj).putToAddPrivileges(
+          request.getRoleName(), request.getPrivilege().getAction().toUpperCase());
+      permsUpdater.handleUpdateNotification(update);
+      LOGGER.info("Authz Perm preUpdate [" + update.getSeqNum() + "]..");
+    }
+  }
+
+  @Override
+  public void onAlterSentryRoleRevokePrivilege(
+      TAlterSentryRoleRevokePrivilegeRequest request)
+      throws SentryPluginException {
+    String authzObj = getAuthzObj(request.getPrivilege());
+    if (authzObj != null) {
+      PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+      update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
+          request.getRoleName(), request.getPrivilege().getAction().toUpperCase());
+      permsUpdater.handleUpdateNotification(update);
+      LOGGER.info("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "]..");
+    }
+  }
+
+  @Override
+  public void onDropSentryRole(TDropSentryRoleRequest request)
+      throws SentryPluginException {
+    PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false);
+    update.addPrivilegeUpdate(PermissionsUpdate.ALL_AUTHZ_OBJ).putToDelPrivileges(
+        request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
+    update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
+    permsUpdater.handleUpdateNotification(update);
+    LOGGER.info("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "]..");
+  }
+
+  private String getAuthzObj(TSentryPrivilege privilege) {
+    String authzObj = null;
+    if (!SentryStore.isNULL(privilege.getDbName())) {
+      String dbName = privilege.getDbName();
+      String tblName = privilege.getTableName();
+      if (tblName == null) {
+        authzObj = dbName;
+      } else {
+        authzObj = dbName + "." + tblName;
+      }
+    }
+    return authzObj;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
new file mode 100644
index 0000000..b0fc5ed
--- /dev/null
+++ b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.sentry.hdfs.Updateable;
+
+import com.google.common.collect.Lists;
+
+public class UpdateForwarder<K extends Updateable.Update> implements
+    Updateable<K> {
+
+  public static interface ExternalImageRetriever<K> {
+
+    public K retrieveFullImage(long currSeqNum);
+
+  }
+
+  private final AtomicLong lastSeenSeqNum = new AtomicLong(0);
+  private final AtomicLong lastCommittedSeqNum = new AtomicLong(0);
+  // Updates should be handled in order
+  private final Executor updateHandler = Executors.newSingleThreadExecutor();
+
+  // Update log is used when propagate updates to a downstream cache.
+  // The preUpdate log stores all commits that were applied to this cache.
+  // When the update log is filled to capacity (updateLogSize), all
+  // entries are cleared and a compact image if the state of the cache is
+  // appended to the log.
+  // The first entry in an update log (consequently the first preUpdate a
+  // downstream cache sees) will be a full image. All subsequent entries are
+  // partial edits
+  private final LinkedList<K> updateLog = new LinkedList<K>();
+  // UpdateLog is dissabled when updateLogSize = 0;
+  private final int updateLogSize;
+
+  private final ExternalImageRetriever<K> imageRetreiver;
+
+  private volatile Updateable<K> updateable;
+
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private static final long INIT_SEQ_NUM = -2;
+
+  public UpdateForwarder(Updateable<K> updateable,
+      ExternalImageRetriever<K> imageRetreiver, int updateLogSize) {
+    this.updateLogSize = updateLogSize;
+    this.imageRetreiver = imageRetreiver;
+    K fullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM);
+    appendToUpdateLog(fullImage);
+    this.updateable = updateable.updateFull(fullImage);
+  }
+
+  /**
+   * Handle notifications from HMS plug-in or upstream Cache
+   * @param update
+   */
+  public void handleUpdateNotification(final K update) {
+    // Correct the seqNums on the first update
+    if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) {
+      K firstUpdate = updateLog.peek();
+      long firstSeqNum = update.getSeqNum() - 1;
+      firstUpdate.setSeqNum(firstSeqNum);
+      lastCommittedSeqNum.set(firstSeqNum);
+      lastSeenSeqNum.set(firstSeqNum);
+    }
+    final boolean editNotMissed = 
+        lastSeenSeqNum.incrementAndGet() == update.getSeqNum();
+    if (!editNotMissed) {
+      lastSeenSeqNum.set(update.getSeqNum());
+    }
+    Runnable task = new Runnable() {
+      @Override
+      public void run() {
+        K toUpdate = update;
+        if (update.hasFullImage()) {
+          updateable = updateable.updateFull(update);
+        } else {
+          if (editNotMissed) {
+            // apply partial preUpdate
+            updateable.updatePartial(Lists.newArrayList(update), lock);
+          } else {
+            // Retrieve full update from External Source and 
+            toUpdate = imageRetreiver
+                .retrieveFullImage(update.getSeqNum());
+            updateable = updateable.updateFull(toUpdate);
+          }
+        }
+        appendToUpdateLog(toUpdate);
+      }
+    };
+    updateHandler.execute(task);
+  }
+
+  private void appendToUpdateLog(K update) {
+    synchronized (updateLog) {
+      if (updateLogSize > 0) {
+        if (update.hasFullImage() || (updateLog.size() == updateLogSize)) {
+          // Essentially a log compaction
+          updateLog.clear();
+          updateLog.add(update.hasFullImage() ? update
+              : createFullImageUpdate(update.getSeqNum()));
+        } else {
+          updateLog.add(update);
+        }
+      }
+      lastCommittedSeqNum.set(update.getSeqNum());
+    }
+  }
+
+  /**
+   * Return all updates from requested seqNum (inclusive)
+   * @param seqNum
+   * @return
+   */
+  public List<K> getAllUpdatesFrom(long seqNum) {
+    List<K> retVal = new LinkedList<K>();
+    synchronized (updateLog) {
+      long currSeqNum = lastCommittedSeqNum.get();
+      if (updateLogSize == 0) {
+        // no updatelog configured..
+        return retVal;
+      }
+      K head = updateLog.peek();
+      if (seqNum > currSeqNum + 1) {
+        // This process has probably restarted since downstream
+        // recieved last update
+        retVal.addAll(updateLog);
+        return retVal;
+      }
+      if (head.getSeqNum() > seqNum) {
+        // Caller has diverged greatly..
+        if (head.hasFullImage()) {
+          // head is a refresh(full) image
+          // Send full image along with partial updates
+          for (K u : updateLog) {
+            retVal.add(u);
+          }
+        } else {
+          // Create a full image
+          // clear updateLog
+          // add fullImage to head of Log
+          // NOTE : This should ideally never happen
+          K fullImage = createFullImageUpdate(currSeqNum);
+          updateLog.clear();
+          updateLog.add(fullImage);
+          retVal.add(fullImage);
+        }
+      } else {
+        // increment iterator to requested seqNum
+        Iterator<K> iter = updateLog.iterator();
+        K u = null;
+        while (iter.hasNext()) {
+          u = iter.next();
+          if (u.getSeqNum() == seqNum) {
+            break;
+          }
+        }
+        // add all updates from requestedSeq
+        // to committedSeqNum
+        for (long seq = seqNum; seq <= currSeqNum; seq ++) {
+          retVal.add(u);
+          if (iter.hasNext()) {
+            u = iter.next();
+          } else {
+            break;
+          }
+        }
+      }
+    }
+    return retVal;
+  }
+ 
+  public boolean areAllUpdatesCommited() {
+    return lastCommittedSeqNum.get() == lastSeenSeqNum.get();
+  }
+
+  public long getLastCommitted() {
+    return lastCommittedSeqNum.get();
+  }
+
+  public long getLastSeen() {
+    return lastSeenSeqNum.get();
+  }
+
+  @Override
+  public Updateable<K> updateFull(K update) {
+    return updateable.updateFull(update);
+  }
+
+  @Override
+  public void updatePartial(Iterable<K> updates, ReadWriteLock lock) {
+    updateable.updatePartial(updates, lock);
+  }
+  
+  @Override
+  public long getLastUpdatedSeqNum() {
+    return updateable.getLastUpdatedSeqNum();
+  }
+
+  @Override
+  public K createFullImageUpdate(long currSeqNum) {
+    return updateable.createFullImageUpdate(currSeqNum);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
new file mode 100644
index 0000000..6b3e2e2
--- /dev/null
+++ b/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.sentry.hdfs.PermissionsUpdate;
+import org.apache.sentry.hdfs.Updateable;
+import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
+
+public class UpdateablePermissions implements Updateable<PermissionsUpdate>{
+
+  private AtomicLong seqNum = new AtomicLong();
+  private final ExternalImageRetriever<PermissionsUpdate> imageRetreiver;
+
+  public UpdateablePermissions(
+      ExternalImageRetriever<PermissionsUpdate> imageRetreiver) {
+    this.imageRetreiver = imageRetreiver;
+  }
+
+  @Override
+  public PermissionsUpdate createFullImageUpdate(long currSeqNum) {
+    return imageRetreiver.retrieveFullImage(currSeqNum);
+  }
+
+  @Override
+  public long getLastUpdatedSeqNum() {
+    return seqNum.get();
+  }
+
+  @Override
+  public void updatePartial(Iterable<PermissionsUpdate> update,
+      ReadWriteLock lock) {
+    for (PermissionsUpdate permsUpdate : update) {
+      seqNum.set(permsUpdate.getSeqNum());
+    }
+  }
+
+  @Override
+  public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) {
+    UpdateablePermissions other = new UpdateablePermissions(imageRetreiver);
+    other.seqNum.set(update.getSeqNum());
+    return other;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
----------------------------------------------------------------------
diff --git a/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
new file mode 100644
index 0000000..d571df8
--- /dev/null
+++ b/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sentry.hdfs;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import junit.framework.Assert;
+
+import org.apache.sentry.hdfs.UpdateForwarder;
+import org.apache.sentry.hdfs.Updateable;
+import org.apache.sentry.hdfs.UpdateForwarder.ExternalImageRetriever;
+import org.apache.sentry.hdfs.Updateable.Update;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class TestUpdateForwarder {
+  
+  static class DummyUpdate implements Update {
+    private long seqNum = 0;
+    private boolean hasFullUpdate = false;
+    private String stuff;
+    public DummyUpdate(long seqNum, boolean hasFullUpdate) {
+      this.seqNum = seqNum;
+      this.hasFullUpdate = hasFullUpdate;
+    }
+    public String getStuff() {
+      return stuff;
+    }
+    public DummyUpdate setStuff(String stuff) {
+      this.stuff = stuff;
+      return this;
+    }
+    @Override
+    public boolean hasFullImage() {
+      return hasFullUpdate;
+    }
+    @Override
+    public long getSeqNum() {
+      return seqNum;
+    }
+    @Override
+    public void setSeqNum(long seqNum) {
+     this.seqNum = seqNum;
+    }
+  }
+
+  static class DummyUpdatable implements Updateable<DummyUpdate> {
+    
+    private List<String> state = new LinkedList<String>();
+    private long lastUpdatedSeqNum = 0;
+
+    @Override
+    public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) {
+      for (DummyUpdate u : update) {
+        state.add(u.getStuff());
+        lastUpdatedSeqNum = u.seqNum;
+      }
+    }
+
+    @Override
+    public Updateable<DummyUpdate> updateFull(DummyUpdate update) {
+      DummyUpdatable retVal = new DummyUpdatable();
+      retVal.lastUpdatedSeqNum = update.seqNum;
+      retVal.state = Lists.newArrayList(update.stuff.split(","));
+      return retVal;
+    }
+
+    @Override
+    public long getLastUpdatedSeqNum() {
+      return lastUpdatedSeqNum;
+    }
+
+    @Override
+    public DummyUpdate createFullImageUpdate(long currSeqNum) {
+      DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
+      retVal.stuff = Joiner.on(",").join(state);
+      return retVal;
+    }
+
+    public String getState() {
+      return Joiner.on(",").join(state);
+    }
+  }
+
+  static class DummyImageRetreiver implements ExternalImageRetriever<DummyUpdate> {
+
+    private String state;
+    public void setState(String state) {
+      this.state = state;
+    }
+    @Override
+    public DummyUpdate retrieveFullImage(long currSeqNum) {
+      DummyUpdate retVal = new DummyUpdate(currSeqNum, true);
+      retVal.stuff = state;
+      return retVal;
+    }
+  }
+
+  @Test
+  public void testInit() {
+    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+    imageRetreiver.setState("a,b,c");
+    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+        new DummyUpdatable(), imageRetreiver, 10);
+    Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum());
+    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+    Assert.assertTrue(allUpdates.size() == 1);
+    Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+
+    // If the current process has restarted the input seqNum will be > currSeq
+    allUpdates = updateForwarder.getAllUpdatesFrom(100);
+    Assert.assertTrue(allUpdates.size() == 1);
+    Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+    Assert.assertEquals(-2, allUpdates.get(0).getSeqNum());
+    allUpdates = updateForwarder.getAllUpdatesFrom(-1);
+    Assert.assertEquals(0, allUpdates.size());
+  }
+
+  @Test
+  public void testUpdateReceive() throws Exception {
+    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+    imageRetreiver.setState("a,b,c");
+    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+        new DummyUpdatable(), imageRetreiver, 5);
+    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
+    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+    Assert.assertEquals(2, allUpdates.size());
+    Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+    Assert.assertEquals("d", allUpdates.get(1).getStuff());
+  }
+
+  @Test
+  public void testGetUpdates() throws Exception {
+    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+    imageRetreiver.setState("a,b,c");
+    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+        new DummyUpdatable(), imageRetreiver, 5);
+    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
+    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+    Assert.assertEquals(2, allUpdates.size());
+
+    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
+
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
+    allUpdates = updateForwarder.getAllUpdatesFrom(0);
+    Assert.assertEquals(4, allUpdates.size());
+    Assert.assertEquals("a,b,c", allUpdates.get(0).getStuff());
+    Assert.assertEquals(4, allUpdates.get(0).getSeqNum());
+    Assert.assertEquals("d", allUpdates.get(1).getStuff());
+    Assert.assertEquals(5, allUpdates.get(1).getSeqNum());
+    Assert.assertEquals("e", allUpdates.get(2).getStuff());
+    Assert.assertEquals(6, allUpdates.get(2).getSeqNum());
+    Assert.assertEquals("f", allUpdates.get(3).getStuff());
+    Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
+
+    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
+    allUpdates = updateForwarder.getAllUpdatesFrom(8);
+    Assert.assertEquals(1, allUpdates.size());
+    Assert.assertEquals("g", allUpdates.get(0).getStuff());
+  }
+
+  @Test
+  public void testGetUpdatesAfterExternalEntityReset() throws Exception {
+    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+    imageRetreiver.setState("a,b,c");
+    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+        new DummyUpdatable(), imageRetreiver, 5);
+    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+
+    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
+
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum());
+    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+    Assert.assertEquals(4, allUpdates.size());
+    Assert.assertEquals("f", allUpdates.get(3).getStuff());
+    Assert.assertEquals(7, allUpdates.get(3).getSeqNum());
+
+    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum());
+    allUpdates = updateForwarder.getAllUpdatesFrom(8);
+    Assert.assertEquals(1, allUpdates.size());
+    Assert.assertEquals("g", allUpdates.get(0).getStuff());
+
+    imageRetreiver.setState("a,b,c,d,e,f,g,h");
+
+    // New update comes with SeqNum = 1
+    updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setStuff("h"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    // NN plugin asks for next update
+    allUpdates = updateForwarder.getAllUpdatesFrom(9);
+    Assert.assertEquals(1, allUpdates.size());
+    Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getStuff());
+    Assert.assertEquals(1, allUpdates.get(0).getSeqNum());
+  }
+
+  @Test
+  public void testUpdateLogCompression() throws Exception {
+    DummyImageRetreiver imageRetreiver = new DummyImageRetreiver();
+    imageRetreiver.setState("a,b,c");
+    UpdateForwarder<DummyUpdate> updateForwarder = new UpdateForwarder<DummyUpdate>(
+        new DummyUpdatable(), imageRetreiver, 5);
+    updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setStuff("d"));
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum());
+    List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0);
+    Assert.assertEquals(2, allUpdates.size());
+
+    updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setStuff("e"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setStuff("f"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setStuff("g"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setStuff("h"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setStuff("i"));
+    updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setStuff("j"));
+
+    while(!updateForwarder.areAllUpdatesCommited()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum());
+    allUpdates = updateForwarder.getAllUpdatesFrom(0);
+    Assert.assertEquals(3, allUpdates.size());
+    Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getStuff());
+    Assert.assertEquals(9, allUpdates.get(0).getSeqNum());
+    Assert.assertEquals("i", allUpdates.get(1).getStuff());
+    Assert.assertEquals(10, allUpdates.get(1).getSeqNum());
+    Assert.assertEquals("j", allUpdates.get(2).getStuff());
+    Assert.assertEquals(11, allUpdates.get(2).getSeqNum());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/b86a53d1/sentry-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-hdfs/pom.xml b/sentry-hdfs/pom.xml
new file mode 100644
index 0000000..94c554f
--- /dev/null
+++ b/sentry-hdfs/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.sentry</groupId>
+    <artifactId>sentry</artifactId>
+    <version>1.5.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>sentry-hdfs</artifactId>
+  <name>Sentry HDFS Integration</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <version>2.5.0</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>2.5.0</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>src/gen/thrift/gen-javabean</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <id>thriftif</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>generate-thrift-sources</id>
+                <phase>generate-sources</phase>
+                <configuration>
+                  <target>
+                    <taskdef name="for" classname="net.sf.antcontrib.logic.ForTask"
+                      classpathref="maven.plugin.classpath" />
+                    <property name="thrift.args" value="-I ${thrift.home} --gen java:beans,hashcode"/>
+                    <property name="thrift.gen.dir" value="${basedir}/src/gen/thrift"/>
+                    <delete dir="${thrift.gen.dir}"/>
+                    <mkdir dir="${thrift.gen.dir}"/>
+                    <for param="thrift.file">
+                      <path>
+                        <fileset dir="${basedir}/src/main/resources/" includes="**/*.thrift" />
+                      </path>
+                      <sequential>
+                        <echo message="Generating Thrift code for @{thrift.file}"/>
+                        <exec executable="${thrift.home}/bin/thrift"  failonerror="true" dir=".">
+                          <arg line="${thrift.args} -I ${basedir}/src/main/resources/ -o ${thrift.gen.dir} @{thrift.file} " />
+                        </exec>
+                      </sequential>
+                    </for>
+                  </target>
+                </configuration>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>enforce-property</id>
+                <goals>
+                  <goal>enforce</goal>
+                </goals>
+                <configuration>
+                  <rules>
+                    <requireProperty>
+                      <property>thrift.home</property>
+                    </requireProperty>
+                  </rules>
+                  <fail>true</fail>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+</project>


Mime
View raw message