usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [02/16] incubator-usergrid git commit: Initial pass at moving queues to core
Date Tue, 21 Apr 2015 15:55:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/persistence/GuiceAdapterBeanFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/GuiceAdapterBeanFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/GuiceAdapterBeanFactory.java
new file mode 100644
index 0000000..8bda3b0
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/GuiceAdapterBeanFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence;
+
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.support.DefaultListableBeanFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+import com.google.inject.Injector;
+
+
+/**
+ * This allows us to search guice for beans that are not in spring
+ */
+@Component
+public class GuiceAdapterBeanFactory extends DefaultListableBeanFactory {
+
+    /**
+     * Wire our injector into this so we can use it go get beans
+     */
+    @Autowired
+    private Injector injector;
+
+
+    public <T> T getBean( Class<T> requiredType ) throws BeansException {
+        final T bean = super.getBean( requiredType );
+
+        // Comes from spring, return it
+        if ( bean != null ) {
+            return bean;
+        }
+
+        final T guiceBean = injector.getInstance( requiredType );
+
+        if(guiceBean == null){
+            throw new NoGuiceBean( "Could not find bean for class" + requiredType );
+        }
+
+
+        return guiceBean;
+    }
+
+
+    @Override
+    public Object getBean( final String name ) throws BeansException {
+        final Object springBean = super.getBean( name );
+
+        return validateBean( springBean, name );
+    }
+
+
+    @Override
+    public <T> T getBean( final String name, final Class<T> requiredType ) throws BeansException {
+        final T springBean = super.getBean( name, requiredType );
+
+        return validateBean( springBean, name );
+    }
+
+
+    @Override
+    public <T> T getBean( final String name, final Class<T> requiredType, final Object... args ) throws BeansException {
+        final T springBean = super.getBean( name, requiredType, args );
+
+       return validateBean( springBean, name );
+    }
+
+
+    /**
+     * If we can't find the spring bean, we should blow up
+     * @param springBean
+     * @param <T>
+     * @return
+     */
+    private <T> T validateBean( T springBean, final String name ) {
+        if ( springBean == null ) {
+            throw new NoGuiceBean( String.format("Guice beans by name is unsupoported, and could not find a spring bean with the name '%s'", name) );
+        }
+
+        return springBean;
+    }
+
+
+    /**
+     * Exception class to throw when we can't find a bean in spring, and can't find it in guice
+     */
+    public static final class NoGuiceBean extends BeansException{
+
+        public NoGuiceBean( final String msg ) {
+            super( msg );
+        }
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java
new file mode 100644
index 0000000..6e58676
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PersistenceModule.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence;
+
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.springframework.context.ApplicationContext;
+
+import org.apache.usergrid.locking.cassandra.HectorLockManagerImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.CharSource;
+import com.google.common.io.Resources;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+import com.google.inject.name.Names;
+import com.google.inject.spring.SpringIntegration;
+
+import me.prettyprint.cassandra.connection.RoundRobinBalancingPolicy;
+import me.prettyprint.cassandra.service.CassandraHostConfigurator;
+import me.prettyprint.cassandra.service.ThriftCluster;
+import me.prettyprint.hector.api.Cluster;
+
+
+/**
+ * Replacement for configuration of our spring modules with guice
+ */
+public class PersistenceModule extends AbstractModule {
+
+
+
+    private final ApplicationContext applicationContext;
+
+    public PersistenceModule( final ApplicationContext applicationContext ) {
+
+        this.applicationContext = applicationContext;
+    }
+
+
+
+
+    @Override
+    protected void configure() {
+        SpringIntegration.bindAll( binder(), applicationContext );
+    }
+
+
+
+//    <bean id="cassandraCluster" class="me.prettyprint.cassandra.service.ThriftCluster">
+//   		<constructor-arg value="${cassandra.cluster}" />
+//   		<constructor-arg ref="cassandraHostConfigurator" />
+//   	</bean>
+//    @Provides
+//    @Singleton
+//    @Inject
+//    public Cluster configureThrift( @Named( "cassandra.cluster" ) final String cassCluster,
+//                                          @Named( "cassandra.connections" ) final int cassandraConnections ){
+//
+//        final int setSize = cassandraConnections == 0 ? 50: cassandraConnections;
+//
+//        CassandraHostConfigurator hostConfigurator = new CassandraHostConfigurator( cassCluster );
+//
+//        hostConfigurator.setMaxActive( setSize );
+//        hostConfigurator.setLoadBalancingPolicy( new RoundRobinBalancingPolicy() );
+//
+//
+//        ThriftCluster thriftCluster = new ThriftCluster(cassCluster, hostConfigurator);
+//
+//        return thriftCluster;
+//
+//    }
+//
+//
+//    @Provides
+//    @Singleton
+//    @Inject
+//    public Properties configureProps(final PropertiesProvider propertiesProvider ){
+//
+//        final Properties props = new Properties(  );
+//
+//        for(final String propFile: propertiesProvider.getPropertiesFiles()){
+//
+//            final URL url = Resources.getResource( propFile );
+//
+//            Preconditions.checkNotNull( url, "Could not find properties file '" + propFile + "' on the classpath" );
+//
+//
+//            final CharSource propsInput = Resources.asCharSource( url, Charset.defaultCharset() );
+//            try {
+//                props.load( propsInput.openStream() );
+//            }
+//            catch ( IOException e ) {
+//                throw new RuntimeException( "Unable to load properties file '" + propFile + "'", e );
+//            }
+//        }
+//
+//        //bind these properties
+//        Names.bindProperties( binder(), props );
+//
+//        return props;
+//    }
+//
+//    @Provides
+//    @Singleton
+//    @Inject
+//    public void configureLocks(final Cluster hectorCluster, @Named("cassandra.lock.keyspace") final String lockKeyspace, @Named("cassandra.lock.keyspace") final String writeCl, final String readCl ){
+//
+//
+//        final HectorLockManagerImpl hectorLockManager = new HectorLockManagerImpl();
+//
+//
+////
+////        <bean name="consistencyLevelPolicy" class="me.prettyprint.cassandra.model.ConfigurableConsistencyLevel">
+////               <property name="defaultReadConsistencyLevel" value="${cassandra.readcl}"/>
+////               <property name="defaultWriteConsistencyLevel" value="${cassandra.writecl}"/>
+////           </bean>
+//
+////        <bean name="lockManager" class="org.apache.usergrid.locking.cassandra.HectorLockManagerImpl" >
+////       		<property name="cluster" ref="cassandraCluster"/>
+////       		<property name="keyspaceName" value="${cassandra.lock.keyspace}"/>
+////       		<property name="consistencyLevelPolicy" ref="consistencyLevelPolicy"/>
+////       	</bean>
+//
+//    }
+//
+//
+//    /**
+//     * Interface to allow users to provide and inject properties
+//     */
+//    public interface PropertiesProvider{
+//        /**
+//         * Get the properties files to load
+//         * @return
+//         */
+//        String[] getPropertiesFiles();
+//    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
new file mode 100644
index 0000000..e5254d1
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence;
+
+
+import org.apache.usergrid.persistence.core.guice.TestModule;
+
+
+public class TestIndexModule extends TestModule {
+
+    @Override
+    protected void configure() {
+
+        //this will break, we need to untagle this and move to guice in core completely
+        install( new CoreModule( ));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java
new file mode 100644
index 0000000..a76a589
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImplTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.impl.DeIndexRequest;
+import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexRequest;
+import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+
+@RunWith(EsRunner.class)
+@UseModules({ TestIndexModule.class })
+@NotThreadSafe
+public class BufferQueueSQSImplTest {
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Rule
+    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
+
+    @Inject
+    public QueueManagerFactory queueManagerFactory;
+
+    @Inject
+    public QueryFig queryFig;
+
+    @Inject
+    public MapManagerFactory mapManagerFactory;
+
+    @Inject
+    public MetricsFactory metricsFactory;
+
+
+    private BufferQueueSQSImpl bufferQueueSQS;
+
+    @Before
+    public void setup(){
+        bufferQueueSQS = new BufferQueueSQSImpl( queueManagerFactory, queryFig, mapManagerFactory, metricsFactory );
+    }
+
+
+
+
+    @Test
+    public void testMessageIndexing(){
+
+        ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application"));
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null );
+        assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null );
+
+        final Map<String, Object> request1Data  = new HashMap<String, Object>() {{put("test", "testval1");}};
+        final IndexRequest indexRequest1 =  new IndexRequest( "testAlias1", "testDoc1",request1Data );
+
+
+        final Map<String, Object> request2Data  = new HashMap<String, Object>() {{put("test", "testval2");}};
+        final IndexRequest indexRequest2 =  new IndexRequest( "testAlias2", "testDoc2",request2Data );
+
+
+        //de-index request
+        final DeIndexRequest
+            deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId3"),"name3",
+
+
+                SearchEdge.NodeType.SOURCE ),  new SimpleId("id3"), UUID.randomUUID() );
+
+        final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, applicationScope,  new SearchEdgeImpl(new SimpleId("testId4"),"name4",
+                SearchEdge.NodeType.SOURCE ),  new SimpleId("id4"), UUID.randomUUID()  );
+
+
+
+
+        IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
+        indexOperationMessage.addIndexRequest( indexRequest1);
+        indexOperationMessage.addIndexRequest( indexRequest2);
+
+        indexOperationMessage.addDeIndexRequest( deIndexRequest1 );
+        indexOperationMessage.addDeIndexRequest( deIndexRequest2 );
+
+        bufferQueueSQS.offer( indexOperationMessage );
+
+        //wait for it to send to SQS
+        indexOperationMessage.getFuture().get();
+
+        //now get it back
+
+        final List<IndexOperationMessage> ops = getResults( 20, TimeUnit.SECONDS );
+
+        assertTrue(ops.size() > 0);
+
+        final IndexOperationMessage returnedOperation = ops.get( 0 );
+
+         //get the operations out
+
+        final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests();
+
+        assertTrue(indexRequestSet.contains(indexRequest1));
+        assertTrue(indexRequestSet.contains(indexRequest2));
+
+
+        final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests();
+
+        assertTrue( deIndexRequests.contains( deIndexRequest1 ) );
+        assertTrue( deIndexRequests.contains( deIndexRequest2 ) );
+
+
+
+        //now ack the message
+
+        bufferQueueSQS.ack( ops );
+
+    }
+
+    private List<IndexOperationMessage> getResults(final long timeout, final TimeUnit timeUnit){
+        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+        List<IndexOperationMessage> ops;
+
+        do{
+            ops = bufferQueueSQS.take( 10,  20, TimeUnit.SECONDS );
+        }while((ops == null || ops.size() == 0 ) &&  System.currentTimeMillis() < endTime);
+
+        return ops;
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/aws/NoAWSCredsRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/aws/NoAWSCredsRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/aws/NoAWSCredsRule.java
new file mode 100644
index 0000000..1b0f538
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/aws/NoAWSCredsRule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.aws;
+
+
+import org.junit.Assume;
+import org.junit.internal.runners.model.MultipleFailureException;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import com.amazonaws.AmazonClientException;
+
+
+/**
+ * Created in an attempt to mark no aws cred tests as ignored.  Blocked by this issue
+ * https://github.com/junit-team/junit/issues/116
+ *
+ * Until then, simply marks as passed, which is a bit dangerous
+ */
+public class NoAWSCredsRule implements TestRule {
+
+    public Statement apply( final Statement base, final Description description ) {
+        return new Statement() {
+            @Override
+            public void evaluate() throws Throwable {
+
+                try {
+                    base.evaluate();
+                }
+                catch ( Throwable t ) {
+
+                    if ( !isMissingCredsException( t ) ) {
+                        throw t;
+                    }
+
+                    //do this so our test gets marked as ignored.  Not pretty, but it works
+                    Assume.assumeTrue( false );
+
+
+                }
+            }
+        };
+    }
+
+
+    private boolean isMissingCredsException( final Throwable t ) {
+
+        if ( t instanceof AmazonClientException ) {
+
+            final AmazonClientException ace = ( AmazonClientException ) t;
+
+            if ( ace.getMessage().contains( "could not get aws access key" ) || ace.getMessage().contains(
+                "could not get aws secret key from system properties" ) ) {
+                //swallow
+                return true;
+            }
+        }
+
+        /**
+         * Handle the multiple failure junit trace
+         */
+        if( t instanceof MultipleFailureException ){
+            for(final Throwable failure : ((MultipleFailureException)t).getFailures()){
+                final boolean isMissingCreds = isMissingCredsException( failure );
+
+                if(isMissingCreds){
+                    return true;
+                }
+            }
+        }
+        final Throwable cause = t.getCause();
+
+        if ( cause == null ) {
+            return false;
+        }
+
+
+        return isMissingCredsException( cause );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 2206953..fa73991 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -28,6 +28,29 @@ import rx.Observable;
  * Get all edges from source
  */
 public interface EdgesObservable {
+
+    /**
+     * Return an observable of all edges from a source
+     * @param gm
+     * @param sourceNode
+     * @return
+     */
     Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode);
+
+    /**
+     * Get all edges from the source node with the target type
+     * @param gm
+     * @param sourceNode
+     * @param targetType
+     * @return
+     */
+    Observable<Edge> getEdgesFromSource(final GraphManager gm, final Id sourceNode, final String targetType );
+
+    /**
+     * Return an observable of all edges to a target
+     * @param gm
+     * @param targetNode
+     * @return
+     */
     Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 371cf1d..2264cbd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -19,62 +19,88 @@
  */
 package org.apache.usergrid.persistence.graph.serialization.impl;
 
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.model.entity.Id;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import rx.Observable;
 import rx.functions.Func1;
 
+
 /**
  * Emits the edges that are edges from the specified source node
  */
 public class EdgesObservableImpl implements EdgesObservable {
 
-    private static final Logger logger = LoggerFactory.getLogger(EdgesObservableImpl.class);
-    public EdgesObservableImpl(){
+    private static final Logger logger = LoggerFactory.getLogger( EdgesObservableImpl.class );
+
+
+    public EdgesObservableImpl() {
 
     }
 
+
     /**
      * Get all edges from the source
      */
     @Override
-    public  Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode){
-        Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
+    public Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode ) {
+        final Observable<String> edgeTypes =
+            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
 
-        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
+        return edgeTypes.flatMap(  edgeType -> {
 
                 logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
 
-                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE,
-                    SearchByEdgeType.Order.DESCENDING, null ) );
-            }
+                return gm.loadEdgesFromSource(
+                    new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        null ) );
         } );
     }
+
+
+    @Override
+    public Observable<Edge> getEdgesFromSource( final GraphManager gm, final Id sourceNode, final String targetType ) {
+
+        final Observable<String> edgeTypes =
+            gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
+
+
+        return edgeTypes.flatMap( edgeType -> {
+
+            logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
+
+            return gm.loadEdgesFromSourceByType(
+                new SimpleSearchByIdType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    targetType, null ) );
+        } );
+    }
+
+
     /**
      * Get all edges from the source
      */
     @Override
-    public  Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode) {
-        Observable<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
+    public Observable<Edge> edgesToTarget( final GraphManager gm, final Id targetNode ) {
+        final Observable<String> edgeTypes =
+            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
 
-        return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
-            @Override
-            public Observable<Edge> call( final String edgeType ) {
+        return edgeTypes.flatMap( edgeType -> {
 
-                logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode);
+            logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode );
 
-                return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE,
-                    SearchByEdgeType.Order.DESCENDING, null ) );
-            }
+            return gm.loadEdgesToTarget(
+                new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    null ) );
         } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index da05d39..39c686c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.index;/*
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -65,7 +66,7 @@ public interface EntityIndexBatch {
      * Execute the batch
      * @return future to guarantee execution
      */
-    BetterFuture execute();
+    BetterFuture<IndexOperationMessage> execute();
 
     /**
      * Get the number of operations in the batch

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 8b9a03e..d13d055 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -30,84 +30,52 @@ import org.apache.usergrid.persistence.index.impl.EsProvider;
 @FigSingleton
 public interface IndexFig extends GuicyFig {
 
-    public static final String ELASTICSEARCH_HOSTS = "elasticsearch.hosts";
+    String ELASTICSEARCH_HOSTS = "elasticsearch.hosts";
 
-    public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
+    String ELASTICSEARCH_PORT = "elasticsearch.port";
 
-    public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster_name";
+    String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster_name";
 
-    public static final String ELASTICSEARCH_NODENAME = "elasticsearch.node_name";
+    String ELASTICSEARCH_NODENAME = "elasticsearch.node_name";
 
-    public static final String ELASTICSEARCH_INDEX_PREFIX = "elasticsearch.index_prefix";
+    String ELASTICSEARCH_INDEX_PREFIX = "elasticsearch.index_prefix";
 
-    public static final String ELASTICSEARCH_ALIAS_POSTFIX = "elasticsearch.alias_postfix";
+    String ELASTICSEARCH_ALIAS_POSTFIX = "elasticsearch.alias_postfix";
 
-    public static final String ELASTICSEARCH_STARTUP = "elasticsearch.startup";
+    String ELASTICSEARCH_STARTUP = "elasticsearch.startup";
 
-    public static final String ELASTICSEARCH_NUMBER_OF_SHARDS = "elasticsearch.number_shards";
+    String ELASTICSEARCH_NUMBER_OF_SHARDS = "elasticsearch.number_shards";
 
-    public static final String ELASTICSEARCH_NUMBER_OF_REPLICAS = "elasticsearch.number_replicas";
+    String ELASTICSEARCH_NUMBER_OF_REPLICAS = "elasticsearch.number_replicas";
 
-    public static final String QUERY_CURSOR_TIMEOUT_MINUTES = "elasticsearch.cursor_timeout.minutes";
+    String QUERY_CURSOR_TIMEOUT_MINUTES = "elasticsearch.cursor_timeout.minutes";
 
-    public static final String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh";
+    String ELASTICSEARCH_FORCE_REFRESH = "elasticsearch.force_refresh";
 
-    public static final String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";
+    String INDEX_BUFFER_SIZE = "elasticsearch.buffer_size";
 
-    public static final String INDEX_QUEUE_SIZE = "elasticsearch.queue_size";
+    String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
 
-    public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
+    String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
 
-    /**
-     * Amount of time to wait when reading from the queue
-     */
-    public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
-
-    /**
-     * Amount of time to wait when reading from the queue in milliseconds
-     */
-    public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout";
-
-    public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
-
-    public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
+    String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
 
     /**
      * the number of times we can fail before we refresh the client
      */
-    public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
-
-    /**
-     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
-     * backpressure
-     */
-    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
+    String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
 
-    /**
-     * The number of worker threads to consume from the queue
-     */
-    public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
 
-    /**
-     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
-     */
-    public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
 
 
-    /**
-     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
-     */
-    public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout";
-
-
-    public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
+    String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
 
 
     /**
      * The client type to use.  Valid values are NODE or TRANSPORT
      */
-    public static final String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
+    String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type";
 
 
     @Default( "127.0.0.1" )
@@ -145,20 +113,20 @@ public interface IndexFig extends GuicyFig {
 
     @Default( "false" )
     @Key( ELASTICSEARCH_FORCE_REFRESH )
-    public boolean isForcedRefresh();
+    boolean isForcedRefresh();
 
     /** Identify the client node with a unique name. */
     @Default( "default" )
     @Key( ELASTICSEARCH_NODENAME )
-    public String getNodeName();
+    String getNodeName();
 
     @Default( "6" )
     @Key( ELASTICSEARCH_NUMBER_OF_SHARDS )
-    public int getNumberOfShards();
+    int getNumberOfShards();
 
     @Default( "1" )
     @Key( ELASTICSEARCH_NUMBER_OF_REPLICAS )
-    public int getNumberOfReplicas();
+    int getNumberOfReplicas();
 
     @Default( "20" )
     @Key( ELASTICSEARCH_FAIL_REFRESH )
@@ -181,12 +149,7 @@ public interface IndexFig extends GuicyFig {
     @Key( INDEX_BUFFER_SIZE )
     int getIndexBufferSize();
 
-    /**
-     * size of the buffer to build up before you send results
-     */
-    @Default( "1000" )
-    @Key( INDEX_QUEUE_SIZE )
-    int getIndexQueueSize();
+
 
     /**
      * Request batch size for ES
@@ -199,27 +162,6 @@ public interface IndexFig extends GuicyFig {
     @Key( INDEX_WRITE_CONSISTENCY_LEVEL )
     String getWriteConsistencyLevel();
 
-    @Default( "1000" )
-    @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
-    long getFailureRetryTime();
-
-    //give us 60 seconds to process the message
-    @Default( "60" )
-    @Key( INDEX_QUEUE_READ_TIMEOUT )
-    int getIndexQueueTimeout();
-
-    @Default( "2" )
-    @Key( ELASTICSEARCH_WORKER_COUNT )
-    int getWorkerCount();
-
-    @Default( "LOCAL" )
-    @Key( ELASTICSEARCH_QUEUE_IMPL )
-    String getQueueImplementation();
-
-    @Default( "1000" )
-    @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT )
-    long getQueueOfferTimeout();
-
     /**
      * Return the type of client.  Valid values or NODE or TRANSPORT
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 10cbde0..b6a1f09 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -52,15 +52,12 @@ public abstract class IndexModule extends AbstractModule {
         bind(EntityIndex.class).to(EsEntityIndexImpl.class).asEagerSingleton();
         bind(IndexCache.class).to(EsIndexCacheImpl.class);
         bind(IndexRefreshCommand.class).to(IndexRefreshCommandImpl.class);
-        bind(FailureMonitorImpl.IndexIdentifier.class).to(IndexIdentifierImpl.class);
+        bind(IndexIdentifier.class).to(IndexIdentifierImpl.class);
 
 
-        bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
         bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
 
 
-        bind( BufferQueue.class).toProvider( QueueProvider.class );
-
         //wire up the edg migration. A no-op ATM, but retained for future development
         Multibinder<DataMigration<ApplicationScope>> dataMigrationMultibinder =
                 Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration<ApplicationScope>>() {}, IndexMigration.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/QueueProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/QueueProvider.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/QueueProvider.java
deleted file mode 100644
index ea3e046..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/QueueProvider.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.guice;
-
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.impl.BufferQueue;
-import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
-import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-
-
-/**
- * A provider to allow users to configure their queue impl via properties
- */
-@Singleton
-public class QueueProvider implements Provider<BufferQueue> {
-
-    private final IndexFig indexFig;
-
-    private final QueueManagerFactory queueManagerFactory;
-    private final MapManagerFactory mapManagerFactory;
-    private final MetricsFactory metricsFactory;
-
-    private BufferQueue bufferQueue;
-
-
-    @Inject
-    public QueueProvider( final IndexFig indexFig, final QueueManagerFactory queueManagerFactory,
-                          final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
-        this.indexFig = indexFig;
-
-
-        this.queueManagerFactory = queueManagerFactory;
-        this.mapManagerFactory = mapManagerFactory;
-        this.metricsFactory = metricsFactory;
-    }
-
-
-    @Override
-    @Singleton
-    public BufferQueue get() {
-        if ( bufferQueue == null ) {
-            bufferQueue = getQueue();
-        }
-
-
-        return bufferQueue;
-    }
-
-
-    private BufferQueue getQueue() {
-        final String value = indexFig.getQueueImplementation();
-
-        final Implementations impl = Implementations.valueOf( value );
-
-        switch ( impl ) {
-            case LOCAL:
-                return new BufferQueueInMemoryImpl( indexFig );
-            case SQS:
-                return new BufferQueueSQSImpl( queueManagerFactory, indexFig, mapManagerFactory, metricsFactory );
-            default:
-                throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" );
-        }
-    }
-
-
-    private String getErrorValues() {
-        String values = "";
-
-        for ( final Implementations impl : Implementations.values() ) {
-            values += impl + ", ";
-        }
-
-        values = values.substring( 0, values.length() - 2 );
-
-        return values;
-    }
-
-
-    /**
-     * Different implementations
-     */
-    public static enum Implementations {
-        LOCAL,
-        SQS;
-
-
-        public String asString() {
-            return toString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
deleted file mode 100644
index cb89f64..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * A temporary interface of our buffer Q to decouple of producer and consumer;
- */
-public interface BufferQueue {
-
-    /**
-     * Offer the indexoperation message.  Some queues may support not returning the future until ack or fail.
-     * Other queues may return the future after ack on the offer.  See the implementation documentation for details.
-     * @param operation
-     */
-    public void offer(final IndexIdentifierImpl.IndexOperationMessage operation);
-
-
-    /**
-     * Perform a take, potentially blocking until up to takesize is available, or timeout has elapsed.
-     * May return less than the take size, but will never return null
-     *
-     * @param takeSize
-     * @param timeout
-     * @param timeUnit
-     * @return A null safe lid
-     */
-    public List<IndexIdentifierImpl.IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
-
-
-    /**
-     * Ack all messages so they do not appear again.  Meant for transactional queues, and may or may not be implemented.
-     * This will set the future as done in in memory operations
-     *
-     * @param messages
-     */
-    public void ack(final List<IndexIdentifierImpl.IndexOperationMessage> messages);
-
-    /**
-     * Mark these message as failed.  Set the exception in the future on local operation
-     *
-     * @param messages
-     */
-    public void fail(final List<IndexIdentifierImpl.IndexOperationMessage> messages, final Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
deleted file mode 100644
index bfaed3d..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.index.IndexFig;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class BufferQueueInMemoryImpl implements BufferQueue {
-
-
-    private final IndexFig fig;
-    private final ArrayBlockingQueue<IndexIdentifierImpl.IndexOperationMessage> messages;
-
-
-    @Inject
-    public BufferQueueInMemoryImpl( final IndexFig fig ) {
-        this.fig = fig;
-        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
-    }
-
-
-    @Override
-    public void offer( final IndexIdentifierImpl.IndexOperationMessage operation ) {
-        try {
-            messages.offer( operation, fig.getQueueOfferTimeout(), TimeUnit.MILLISECONDS );
-        }
-        catch ( InterruptedException e ) {
-            throw new RuntimeException("Unable to offer message to queue", e);
-        }
-    }
-
-
-    @Override
-    public List<IndexIdentifierImpl.IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
-
-        final List<IndexIdentifierImpl.IndexOperationMessage> response = new ArrayList<>( takeSize );
-        try {
-
-
-            messages.drainTo( response, takeSize );
-
-            //we got something, go process it
-            if ( response.size() > 0 ) {
-                return response;
-            }
-
-
-            final IndexIdentifierImpl.IndexOperationMessage polled = messages.poll( timeout, timeUnit );
-
-            if ( polled != null ) {
-                response.add( polled );
-
-                //try to add more
-                messages.drainTo( response, takeSize - 1 );
-            }
-        }
-        catch ( InterruptedException e ) {
-            //swallow
-        }
-
-
-        return response;
-    }
-
-
-    @Override
-    public void ack( final List<IndexIdentifierImpl.IndexOperationMessage> messages ) {
-        //if we have a future ack it
-        for ( final IndexIdentifierImpl.IndexOperationMessage op : messages ) {
-            op.done();
-        }
-    }
-
-
-    @Override
-    public void fail( final List<IndexIdentifierImpl.IndexOperationMessage> messages, final Throwable t ) {
-
-
-        for ( final IndexIdentifierImpl.IndexOperationMessage op : messages ) {
-            final BetterFuture<IndexIdentifierImpl.IndexOperationMessage> future = op.getFuture();
-
-            if ( future != null ) {
-                future.setError( t );
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
deleted file mode 100644
index fee828f..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-/**
- * This is experimental at best.  Our SQS size limit is a problem.  We shouldn't use this for index operation. Only for
- * performing
- */
-@Singleton
-public class BufferQueueSQSImpl implements BufferQueue {
-
-    private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class );
-
-    /** Hacky, copied from CPEntityManager b/c we can't access it here */
-    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
-
-
-    /**
-     * Set our TTL to 1 month.  This is high, but in the event of a bug, we want these entries to get removed
-     */
-    public static final int TTL = 60 * 60 * 24 * 30;
-
-    /**
-     * The name to put in the map
-     */
-    public static final String MAP_NAME = "esqueuedata";
-
-
-    private static final String QUEUE_NAME = "es_queue";
-
-    private static SmileFactory SMILE_FACTORY = new SmileFactory();
-
-
-    static {
-        SMILE_FACTORY.delegateToTextual( true );
-    }
-
-
-    private final QueueManager queue;
-    private final MapManager mapManager;
-    private final IndexFig indexFig;
-    private final ObjectMapper mapper;
-    private final Meter readMeter;
-    private final Timer readTimer;
-    private final Meter writeMeter;
-    private final Timer writeTimer;
-
-
-    @Inject
-    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig,
-                               final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
-        final QueueScope queueScope =
-            new QueueScopeImpl( QUEUE_NAME );
-
-        this.queue = queueManagerFactory.getQueueManager( queueScope );
-        this.indexFig = indexFig;
-
-        final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME );
-
-        this.mapManager = mapManagerFactory.createMapManager( scope );
-
-
-        this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" );
-        this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" );
-
-        this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" );
-        this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" );
-
-        this.mapper = new ObjectMapper( SMILE_FACTORY );
-        //pretty print, disabling for speed
-        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
-
-    }
-
-
-    @Override
-    public void offer( final IndexIdentifierImpl.IndexOperationMessage operation ) {
-
-        //no op
-        if(operation.isEmpty()){
-            operation.getFuture().done();
-            return;
-        }
-
-        final Timer.Context timer = this.writeTimer.time();
-        this.writeMeter.mark();
-
-        final UUID identifier = UUIDGenerator.newTimeUUID();
-
-        try {
-
-            final String payLoad = toString( operation );
-
-            //write to cassandra
-            this.mapManager.putString( identifier.toString(), payLoad, TTL );
-
-            //signal to SQS
-            this.queue.sendMessage( identifier );
-            operation.done();
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to queue message", e );
-        }
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    @Override
-    public List<IndexIdentifierImpl.IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
-
-        //SQS doesn't support more than 10
-
-        final int actualTake = Math.min( 10, takeSize );
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-
-            List<QueueMessage> messages = queue
-                .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
-                    String.class );
-
-
-
-            final List<IndexIdentifierImpl.IndexOperationMessage> response = new ArrayList<>( messages.size() );
-
-            final List<String> mapEntries = new ArrayList<>( messages.size() );
-
-
-            if(messages.size() == 0){
-                return response;
-            }
-
-            //add all our keys  for a single round trip
-            for ( final QueueMessage message : messages ) {
-                mapEntries.add( message.getBody().toString() );
-            }
-
-            //look up the values
-            final Map<String, String> storedCommands = mapManager.getStrings( mapEntries );
-
-
-            //load them into our response
-            for ( final QueueMessage message : messages ) {
-
-                final String key = getMessageKey( message );
-
-                //now see if the key was there
-                final String payload = storedCommands.get( key );
-
-                //the entry was not present in cassandra, ignore this message.  Failure should eventually kick it to
-                // a DLQ
-
-                if ( payload == null ) {
-                    continue;
-                }
-
-                final IndexIdentifierImpl.IndexOperationMessage messageBody;
-
-                try {
-                    messageBody = fromString( payload );
-                }
-                catch ( IOException e ) {
-                    logger.error( "Unable to deserialize message from string.  This is a bug", e );
-                    throw new RuntimeException( "Unable to deserialize message from string.  This is a bug", e );
-                }
-
-                SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody );
-
-                response.add( operation );
-            }
-
-            readMeter.mark( response.size() );
-            return response;
-        }
-        //stop our timer
-        finally {
-            timer.stop();
-        }
-    }
-
-
-    @Override
-    public void ack( final List<IndexIdentifierImpl.IndexOperationMessage> messages ) {
-
-        //nothing to do
-        if ( messages.size() == 0 ) {
-            return;
-        }
-
-        List<QueueMessage> toAck = new ArrayList<>( messages.size() );
-
-        for ( IndexIdentifierImpl.IndexOperationMessage ioe : messages ) {
-
-
-            final SqsIndexOperationMessage sqsIndexOperationMessage =   ( SqsIndexOperationMessage ) ioe;
-
-            final String key = getMessageKey( sqsIndexOperationMessage.getMessage() );
-
-            //remove it from the map
-            mapManager.delete( key  );
-
-            toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
-        }
-
-        queue.commitMessages( toAck );
-    }
-
-
-    @Override
-    public void fail( final List<IndexIdentifierImpl.IndexOperationMessage> messages, final Throwable t ) {
-        //no op, just let it retry after the queue timeout
-    }
-
-
-    /** Read the object from Base64 string. */
-    private IndexIdentifierImpl.IndexOperationMessage fromString( String s ) throws IOException {
-        IndexIdentifierImpl.IndexOperationMessage o = mapper.readValue( s, IndexIdentifierImpl.IndexOperationMessage.class );
-        return o;
-    }
-
-
-    /** Write the object to a Base64 string. */
-    private String toString( IndexIdentifierImpl.IndexOperationMessage o ) throws IOException {
-        return mapper.writeValueAsString( o );
-    }
-
-    private String getMessageKey(final QueueMessage message){
-        return message.getBody().toString();
-    }
-
-    /**
-     * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
-     */
-    public class SqsIndexOperationMessage extends IndexIdentifierImpl.IndexOperationMessage {
-
-        private final QueueMessage message;
-
-
-        public SqsIndexOperationMessage( final QueueMessage message, final IndexIdentifierImpl.IndexOperationMessage source ) {
-            this.message = message;
-            this.addAllDeIndexRequest( source.getDeIndexRequests() );
-            this.addAllIndexRequest( source.getIndexRequests() );
-        }
-
-
-        /**
-         * Get the message from our queue
-         */
-        public QueueMessage getMessage() {
-            return message;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index b509ad9..05c84b9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -23,7 +23,6 @@ package org.apache.usergrid.persistence.index.impl;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.elasticsearch.action.ActionListener;
@@ -61,14 +60,11 @@ import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
 
 import rx.Observable;
 
@@ -85,12 +81,12 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
 
 
     private final ApplicationScope applicationScope;
-    private final FailureMonitorImpl.IndexIdentifier indexIdentifier;
+    private final IndexIdentifier indexIdentifier;
     private final Timer searchTimer;
     private final Timer cursorTimer;
     private final MapManager mapManager;
     private final AliasedEntityIndex entityIndex;
-    private final IndexBufferProducer indexBatchBufferProducer;
+    private final IndexBufferConsumer indexBatchBufferProducer;
     private final IndexFig indexFig;
     private final EsProvider esProvider;
     private final IndexAlias alias;
@@ -103,11 +99,11 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
 
     @Inject
     public EsApplicationEntityIndexImpl(  ApplicationScope appScope, final AliasedEntityIndex entityIndex,
-                                         final IndexFig config, final IndexBufferProducer indexBatchBufferProducer,
+                                         final IndexFig config, final IndexBufferConsumer indexBatchBufferProducer,
                                          final EsProvider provider,
                                          final MetricsFactory metricsFactory, final MapManagerFactory mapManagerFactory,
                                          final IndexFig indexFig,
-                                         final FailureMonitorImpl.IndexIdentifier indexIdentifier ) {
+                                         final IndexIdentifier indexIdentifier ) {
         this.entityIndex = entityIndex;
         this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.indexFig = indexFig;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index d22f000..32a0f02 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -44,18 +44,18 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final ApplicationScope applicationScope;
 
     private final IndexAlias alias;
-    private final FailureMonitorImpl.IndexIdentifier indexIdentifier;
+    private final IndexIdentifier indexIdentifier;
 
-    private final IndexBufferProducer indexBatchBufferProducer;
+    private final IndexBufferConsumer indexBatchBufferProducer;
 
     private final AliasedEntityIndex entityIndex;
-    private IndexIdentifierImpl.IndexOperationMessage container;
+    private IndexOperationMessage container;
 
 
     public EsEntityIndexBatchImpl( final ApplicationScope applicationScope,
-                                   final IndexBufferProducer indexBatchBufferProducer,
+                                   final IndexBufferConsumer indexBatchBufferProducer,
                                    final AliasedEntityIndex entityIndex,
-                                   FailureMonitorImpl.IndexIdentifier indexIdentifier ) {
+                                   IndexIdentifier indexIdentifier ) {
 
         this.applicationScope = applicationScope;
         this.indexBatchBufferProducer = indexBatchBufferProducer;
@@ -63,7 +63,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         this.indexIdentifier = indexIdentifier;
         this.alias = indexIdentifier.getAlias();
         //constrained
-        this.container = new IndexIdentifierImpl.IndexOperationMessage();
+        this.container = new IndexOperationMessage();
     }
 
 
@@ -126,8 +126,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public BetterFuture execute() {
-        IndexIdentifierImpl.IndexOperationMessage tempContainer = container;
-        container = new IndexIdentifierImpl.IndexOperationMessage();
+        IndexOperationMessage tempContainer = container;
+        container = new IndexOperationMessage();
 
         /**
          * No-op, just disregard it

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index 6145069..e41bcf8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -42,12 +42,12 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
 
     private final IndexFig config;
     private final EsProvider provider;
-    private final IndexBufferProducer indexBatchBufferProducer;
+    private final IndexBufferConsumer indexBatchBufferProducer;
     private final MetricsFactory metricsFactory;
     private final MapManagerFactory mapManagerFactory;
     private final IndexFig indexFig;
     private final AliasedEntityIndex entityIndex;
-    private final FailureMonitorImpl.IndexIdentifier indexIdentifier;
+    private final IndexIdentifier indexIdentifier;
 
     private LoadingCache<ApplicationScope, ApplicationEntityIndex> eiCache =
         CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, ApplicationEntityIndex>() {
@@ -60,9 +60,9 @@ public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
 
     @Inject
     public EsEntityIndexFactoryImpl( final IndexFig config, final EsProvider provider,
-                                     final IndexBufferProducer indexBatchBufferProducer,
+                                     final IndexBufferConsumer indexBatchBufferProducer,
                                      final MetricsFactory metricsFactory, final MapManagerFactory mapManagerFactory,
-                                     final IndexFig indexFig, final AliasedEntityIndex entityIndex, final FailureMonitorImpl.IndexIdentifier indexIdentifier ){
+                                     final IndexFig indexFig, final AliasedEntityIndex entityIndex, final IndexIdentifier indexIdentifier ){
         this.config = config;
         this.provider = provider;
         this.indexBatchBufferProducer = indexBatchBufferProducer;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 456bd15..961ddd2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -25,57 +25,33 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Resources;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.sun.org.apache.xpath.internal.operations.Bool;
-import org.apache.commons.lang.StringUtils;
+
 import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.migration.data.VersionedData;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
 import org.apache.usergrid.persistence.index.migration.IndexDataVersions;
-import org.apache.usergrid.persistence.index.query.ParsedQuery;
-import org.apache.usergrid.persistence.index.utils.UUIDUtils;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.elasticsearch.action.ActionFuture;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
-import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
-import org.elasticsearch.action.index.*;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.common.settings.ImmutableSettings;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.query.MatchAllQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
-import org.elasticsearch.indices.IndexMissingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import rx.*;
+
 import rx.Observable;
-import rx.functions.Action1;
 
 import java.io.IOException;
 import java.net.URL;
 import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 
 /**
@@ -87,8 +63,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
     private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class );
 
     private final IndexAlias alias;
-    private final IndexBufferProducer indexBatchBufferProducer;
-    private final MetricsFactory metricsFactory;
+    private final IndexBufferConsumer producer;
     private final IndexFig indexFig;
     private final Timer addTimer;
     private final Timer updateAliasTimer;
@@ -97,7 +72,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
      * We purposefully make this per instance. Some indexes may work, while others may fail
      */
     private final EsProvider esProvider;
-    private final IndexBufferProducer producer;
     private final IndexRefreshCommand indexRefreshCommand;
 
     //number of times to wait for the index to refresh properly.
@@ -110,8 +84,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
     private static final ImmutableMap<String, Object> DEFAULT_PAYLOAD =
             ImmutableMap.<String, Object>builder().put(IndexingUtils.ENTITY_ID_FIELDNAME, UUIDGenerator.newTimeUUID().toString()).build();
 
-    private static final MatchAllQueryBuilder MATCH_ALL_QUERY_BUILDER = QueryBuilders.matchAllQuery();
-    private final FailureMonitorImpl.IndexIdentifier indexIdentifier;
+
+    private final IndexIdentifier indexIdentifier;
 
     private IndexCache aliasCache;
     private Timer mappingTimer;
@@ -121,12 +95,12 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
 
 
     @Inject
-    public EsEntityIndexImpl(final IndexBufferProducer indexBatchBufferProducer, final EsProvider provider,
-                              final IndexCache indexCache, final MetricsFactory metricsFactory,
-                              final IndexFig indexFig, final FailureMonitorImpl.IndexIdentifier indexIdentifier,
-                             final IndexBufferProducer producer, IndexRefreshCommand indexRefreshCommand) {
-        this.indexBatchBufferProducer = indexBatchBufferProducer;
-        this.metricsFactory = metricsFactory;
+    public EsEntityIndexImpl( final EsProvider provider,
+                              final IndexCache indexCache,
+                              final IndexFig indexFig, final IndexIdentifier indexIdentifier,
+                             final IndexBufferConsumer producer, final IndexRefreshCommand indexRefreshCommand,
+                              final MetricsFactory metricsFactory) {
+
         this.indexFig = indexFig;
         this.indexIdentifier = indexIdentifier;
 
@@ -349,7 +323,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
     public Observable<Boolean> refreshAsync() {
 
         refreshIndexMeter.mark();
-        BetterFuture future = indexBatchBufferProducer.put(new IndexIdentifierImpl.IndexOperationMessage());
+        BetterFuture future = producer.put(new IndexOperationMessage());
         future.get();
         return indexRefreshCommand.execute();
     }


Mime
View raw message