usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [01/16] incubator-usergrid git commit: Initial pass at moving queues to core
Date Tue, 21 Apr 2015 15:55:10 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-578 f130b7cea -> a0c2651e3


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index e55fcc2..6bcc405 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -33,19 +32,19 @@ import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexFig;
 
 import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
 import rx.Subscriber;
-import rx.Subscription;
 import rx.schedulers.Schedulers;
 
 
@@ -54,179 +53,110 @@ import rx.schedulers.Schedulers;
  */
 @Singleton
 public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
-    private static final Logger log = LoggerFactory.getLogger(EsIndexBufferConsumerImpl.class);
+    private static final Logger log = LoggerFactory.getLogger( EsIndexBufferConsumerImpl.class );
 
     private final IndexFig config;
     private final FailureMonitorImpl failureMonitor;
     private final Client client;
 
     private final Timer flushTimer;
-    private final Counter indexSizeCounter;
     private final Counter indexErrorCounter;
     private final Meter flushMeter;
     private final Timer produceTimer;
-    private final BufferQueue bufferQueue;
     private final IndexFig indexFig;
-    private final AtomicLong counter = new AtomicLong(  );
+    private final AtomicLong counter = new AtomicLong();
+
+
+    private final Counter indexSizeCounter;
 
-    //the actively running subscription
-    private List<Subscription> subscriptions;
+    private final Timer offerTimer;
 
-    private Object mutex = new Object();
 
+    private final BufferProducer bufferProducer;
+
+
+    private AtomicLong inFlight = new AtomicLong();
 
-    private AtomicLong inFlight = new AtomicLong(  );
 
     @Inject
-    public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider, final MetricsFactory
-        metricsFactory, final BufferQueue bufferQueue, final IndexFig indexFig ){
+    public EsIndexBufferConsumerImpl( final IndexFig config, final EsProvider provider,
+                                      final MetricsFactory metricsFactory, final IndexFig indexFig ) {
 
-        this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "buffer.flush");
-        this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "buffer.meter");
-        this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "buffer.size");
-        this.indexErrorCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "error.count");
+        this.flushTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "buffer.flush" );
+        this.flushMeter = metricsFactory.getMeter( EsIndexBufferConsumerImpl.class, "buffer.meter" );
+        this.indexSizeCounter = metricsFactory.getCounter( EsIndexBufferConsumerImpl.class, "buffer.size" );
+        this.indexErrorCounter = metricsFactory.getCounter( EsIndexBufferConsumerImpl.class, "error.count" );
+        this.offerTimer = metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index.buffer.producer.timer" );
 
         //wire up the gauge of inflight messages
-        metricsFactory.addGauge( EsIndexBufferConsumerImpl.class, "inflight.meter", new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return inFlight.longValue();
-            }
-        } );
-
+        metricsFactory.addGauge( EsIndexBufferConsumerImpl.class, "inflight.meter", () -> inFlight.longValue() );
 
 
         this.config = config;
-        this.failureMonitor = new FailureMonitorImpl(config,provider);
+        this.failureMonitor = new FailureMonitorImpl( config, provider );
         this.client = provider.getClient();
-        this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
-        this.bufferQueue = bufferQueue;
+        this.produceTimer =
+            metricsFactory.getTimer( EsIndexBufferConsumerImpl.class, "index.buffer.consumer.messageFetch" );
         this.indexFig = indexFig;
 
-        subscriptions = new ArrayList<>( indexFig.getWorkerCount() );
+        this.bufferProducer = new BufferProducer();
 
         //batch up sets of some size and send them in batch
-          start();
-    }
 
+        startSubscription();
+    }
 
-    /**
-     * Loop throught and start the workers
-     */
-    public void start() {
-        final int count = indexFig.getWorkerCount();
 
-        for(int i = 0; i < count; i ++){
-            startWorker();
-        }
+    public BetterFuture put( IndexOperationMessage message ) {
+        Preconditions.checkNotNull( message, "Message cannot be null" );
+        indexSizeCounter.inc( message.getDeIndexRequests().size() );
+        indexSizeCounter.inc( message.getIndexRequests().size() );
+        Timer.Context time = offerTimer.time();
+        bufferProducer.send( message );
+        time.stop();
+        return message.getFuture();
     }
 
 
     /**
-     * Stop the workers
+     * Start the subscription
      */
-    public void stop() {
-        synchronized ( mutex ) {
-            //stop consuming
-
-            for(final Subscription subscription: subscriptions){
-                subscription.unsubscribe();
-            }
-        }
-    }
-
-
-    private void startWorker(){
-        synchronized ( mutex) {
-
-            Observable<List<IndexIdentifierImpl.IndexOperationMessage>> consumer = Observable.create(
-                new Observable.OnSubscribe<List<IndexIdentifierImpl.IndexOperationMessage>>() {
-                    @Override
-                    public void call( final Subscriber<? super List<IndexIdentifierImpl.IndexOperationMessage>> subscriber ) {
-
-                        //name our thread so it's easy to see
-                        Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
-
-
-                        List<IndexIdentifierImpl.IndexOperationMessage> drainList = null;
-
-                        do {
-
-                            Timer.Context timer = produceTimer.time();
+    private void startSubscription() {
 
 
-                            try {
+        Observable.create( bufferProducer )
+                  .buffer( indexFig.getIndexBufferSize(), indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
+                      Schedulers.newThread() )
 
+            .doOnNext( containerList -> {
+                if ( containerList.size() == 0 ) {
+                    return;
+                }
 
-                                drainList = bufferQueue
-                                    .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
-                                        TimeUnit.MILLISECONDS );
+                flushMeter.mark( containerList.size() );
+                Timer.Context time = flushTimer.time();
 
 
-                                subscriber.onNext( drainList );
+                execute( containerList );
 
-                                //take since  we're in flight
-                                inFlight.addAndGet( drainList.size() );
-
-
-                                timer.stop();
-                            }
-                            //DO NOT add any doOnError* functions to this subscription.  We want the producer
-                            //to receive these exceptions and sleep before a retry
-                            catch ( Throwable t ) {
-                                final long sleepTime = config.getFailureRetryTime();
-
-                                log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
-
-                                if ( drainList != null ) {
-                                    inFlight.addAndGet( -1 * drainList.size() );
-                                }
-
-                                try {
-                                    Thread.sleep( sleepTime );
-                                }
-                                catch ( InterruptedException ie ) {
-                                    //swallow
-                                }
-
-                                indexErrorCounter.inc();
-                            }
-                        }
-                        while ( true );
-                    }
-                } ).doOnNext( containerList -> {
-                    if ( containerList.size() == 0 ) {
-                        return;
-                    }
-
-                    flushMeter.mark(containerList.size());
-                    Timer.Context time = flushTimer.time();
-
-
-                    execute(containerList);
-
-                    time.stop();
-                } )
+                time.stop();
+            } )
                 //ack after we process
-                .doOnNext( indexOperationMessages -> {
-                    bufferQueue.ack( indexOperationMessages );
-                    //release  so we know we've done processing
-                    inFlight.addAndGet( -1 * indexOperationMessages.size() );
-                } ).subscribeOn( Schedulers.newThread() );
+            .doOnNext( indexOperationMessages -> {
 
-            //start in the background
+                //release  so we know we've done processing
+                inFlight.addAndGet( -1 * indexOperationMessages.size() );
+            } ).subscribe();
 
-           final Subscription subscription = consumer.subscribe();
+        //start in the background
 
-            subscriptions.add(subscription );
-        }
     }
 
 
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */
-    private void execute( final List<IndexIdentifierImpl.IndexOperationMessage> operationMessages ) {
+    private void execute( final List<IndexOperationMessage> operationMessages ) {
 
         if ( operationMessages == null || operationMessages.size() == 0 ) {
             return;
@@ -250,28 +180,28 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ).toBlocking().lastOrDefault( null );
 
         //call back all futures
-        Observable.from( operationMessages ).doOnNext( operationMessage -> operationMessage.getFuture().done() ).toBlocking().lastOrDefault( null );
+        Observable.from( operationMessages ).doOnNext( operationMessage -> operationMessage.getFuture().done() )
+                  .toBlocking().lastOrDefault( null );
     }
 
 
     /**
      * initialize request
-     * @return
      */
     private BulkRequestBuilder initRequest() {
         BulkRequestBuilder bulkRequest = client.prepareBulk();
-        bulkRequest.setConsistencyLevel(WriteConsistencyLevel.fromString(config.getWriteConsistencyLevel()));
-        bulkRequest.setRefresh(config.isForcedRefresh());
+        bulkRequest.setConsistencyLevel( WriteConsistencyLevel.fromString( config.getWriteConsistencyLevel() ) );
+        bulkRequest.setRefresh( config.isForcedRefresh() );
         return bulkRequest;
     }
 
+
     /**
      * send bulk request
-     * @param bulkRequest
      */
-    private void sendRequest(BulkRequestBuilder bulkRequest) {
+    private void sendRequest( BulkRequestBuilder bulkRequest ) {
         //nothing to do, we haven't added anything to the index
-        if (bulkRequest.numberOfActions() == 0) {
+        if ( bulkRequest.numberOfActions() == 0 ) {
             return;
         }
 
@@ -280,9 +210,10 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
         try {
             responses = bulkRequest.execute().actionGet();
-        } catch (Throwable t) {
-            log.error("Unable to communicate with elasticsearch");
-            failureMonitor.fail("Unable to execute batch", t);
+        }
+        catch ( Throwable t ) {
+            log.error( "Unable to communicate with elasticsearch" );
+            failureMonitor.fail( "Unable to execute batch", t );
             throw t;
         }
 
@@ -290,23 +221,43 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
         boolean error = false;
 
-        for (BulkItemResponse response : responses) {
+        for ( BulkItemResponse response : responses ) {
 
-            if (response.isFailed()) {
+            if ( response.isFailed() ) {
                 // log error and continue processing
-                log.error("Unable to index id={}, type={}, index={}, failureMessage={} ",
-                    response.getId(),
-                    response.getType(),
-                    response.getIndex(),
-                    response.getFailureMessage()
-                );
+                log.error( "Unable to index id={}, type={}, index={}, failureMessage={} ", response.getId(),
+                    response.getType(), response.getIndex(), response.getFailureMessage() );
 
                 error = true;
             }
         }
 
         if ( error ) {
-            throw new RuntimeException("Error during processing of bulk index operations one of the responses failed.  Check previous log entries");
+            throw new RuntimeException(
+                "Error during processing of bulk index operations one of the responses failed.  Check previous log "
+                    + "entries" );
+        }
+    }
+
+
+    public static class BufferProducer implements Observable.OnSubscribe<IndexOperationMessage> {
+
+        private Subscriber<? super IndexOperationMessage> subscriber;
+
+
+        /**
+         * Send the data through the buffer
+         */
+        public void send( final IndexOperationMessage indexOp ) {
+
+            subscriber.onNext( indexOp );
+        }
+
+
+        @Override
+        public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+            //just assigns for later use, doesn't do anything else
+            this.subscriber = subscriber;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
deleted file mode 100644
index 9ba92d0..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-
-
-/**
- * Producer for index operation messages
- */
-@Singleton
-public class EsIndexBufferProducerImpl implements IndexBufferProducer {
-
-    private final Counter indexSizeCounter;
-
-    private final Timer timer;
-    private final BufferQueue bufferQueue;
-
-    @Inject
-    public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
-        this.bufferQueue = bufferQueue;
-        this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
-        this.timer =  metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");
-    }
-
-    public BetterFuture put(IndexIdentifierImpl.IndexOperationMessage message){
-        Preconditions.checkNotNull(message, "Message cannot be null");
-        indexSizeCounter.inc(message.getDeIndexRequests().size());
-        indexSizeCounter.inc(message.getIndexRequests().size());
-        Timer.Context time = timer.time();
-        bufferQueue.offer( message );
-        time.stop();
-        return message.getFuture();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
index fe31d35..fed9a50 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FailureMonitorImpl.java
@@ -98,30 +98,4 @@ public class FailureMonitorImpl implements FailureMonitor {
     public void success() {
         failCounter.set( 0 );
     }
-
-
-    /**
-     * Identifier for where an index is in underlying server
-     */
-    public static interface IndexIdentifier {
-
-        /**
-         * get the alias name
-         * @return
-         */
-        IndexAlias getAlias();
-
-        /**
-         * get index name from suffix
-         * @param suffix
-         * @return
-         */
-        String getIndex( String suffix );
-
-        /**
-         * return unique string
-         * @return
-         */
-        String toString();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FlushBufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FlushBufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FlushBufferQueue.java
new file mode 100644
index 0000000..4b84db7
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/FlushBufferQueue.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+public class FlushBufferQueue {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index b8e733d..3258444 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -19,6 +19,10 @@
  */
 package org.apache.usergrid.persistence.index.impl;
 
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+
+
 /**
  *  Buffer index requests
  */
@@ -26,12 +30,9 @@ public interface IndexBufferConsumer {
 
 
     /**
-     * Start the consumer
-     */
-    void start();
-
-    /**
-     * Stop the consumers
+     * Put this operation into our collapsing bufer
+     * @param message
+     * @return
      */
-    void stop();
+    BetterFuture put(IndexOperationMessage message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferProducer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferProducer.java
deleted file mode 100644
index 36cb180..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferProducer.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.index.impl;
-
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-
-
-/**
- * Produce to index buffer consumer
- */
-public interface IndexBufferProducer {
-
-    BetterFuture put(IndexIdentifierImpl.IndexOperationMessage message);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifier.java
new file mode 100644
index 0000000..332c7b7
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifier.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+/**
+ * Identifier for where an index is in underlying server
+ */
+public interface IndexIdentifier {
+
+    /**
+     * get the alias name
+     * @return
+     */
+    IndexAlias getAlias();
+
+    /**
+     * get index name from suffix
+     * @param suffix
+     * @return
+     */
+    String getIndex( String suffix );
+
+    /**
+     * return unique string
+     * @return
+     */
+    String toString();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java
index c47b2c8..782625b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierImpl.java
@@ -20,22 +20,15 @@
 
 package org.apache.usergrid.persistence.index.impl;
 
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Callable;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.inject.Inject;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.index.IndexFig;
 
 
 /**
  * Class is used to generate an index name and alias name
  */
-public class IndexIdentifierImpl implements FailureMonitorImpl.IndexIdentifier {
+public class IndexIdentifierImpl implements IndexIdentifier {
     private final IndexFig config;
 
     @Inject
@@ -71,113 +64,4 @@ public class IndexIdentifierImpl implements FailureMonitorImpl.IndexIdentifier {
     public String toString() {
         return "index id"+config.getIndexPrefix();
     }
-
-
-    /**
-     * Container for index operations.
-     */
-    public static class IndexOperationMessage implements Serializable {
-        private final Set<IndexRequest> indexRequests;
-        private final Set<DeIndexRequest> deIndexRequests;
-
-
-
-        private final BetterFuture<IndexOperationMessage> containerFuture;
-
-
-        public IndexOperationMessage() {
-            final IndexOperationMessage parent = this;
-            this.indexRequests = new HashSet<>();
-            this.deIndexRequests = new HashSet<>();
-            this.containerFuture = new BetterFuture<>( new Callable<IndexOperationMessage>() {
-                @Override
-                public IndexOperationMessage call() throws Exception {
-                    return parent;
-                }
-            } );
-        }
-
-
-        public void addIndexRequest( final IndexRequest indexRequest ) {
-            indexRequests.add( indexRequest );
-        }
-
-
-        public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
-            this.indexRequests.addAll( indexRequests );
-        }
-
-
-        public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
-            this.deIndexRequests.add( deIndexRequest );
-        }
-
-
-        public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
-            this.deIndexRequests.addAll( deIndexRequests );
-        }
-
-
-        public Set<IndexRequest> getIndexRequests() {
-            return indexRequests;
-        }
-
-
-        public Set<DeIndexRequest> getDeIndexRequests() {
-            return deIndexRequests;
-        }
-
-
-        @JsonIgnore
-        public boolean isEmpty(){
-            return indexRequests.isEmpty() && deIndexRequests.isEmpty();
-        }
-
-        /**
-         * return the promise
-         */
-        @JsonIgnore
-        public BetterFuture<IndexOperationMessage> getFuture() {
-            return containerFuture;
-        }
-
-
-        @Override
-        public boolean equals( final Object o ) {
-            if ( this == o ) {
-                return true;
-            }
-            if ( o == null || getClass() != o.getClass() ) {
-                return false;
-            }
-
-            final IndexOperationMessage that = ( IndexOperationMessage ) o;
-
-            if ( !deIndexRequests.equals( that.deIndexRequests ) ) {
-                return false;
-            }
-            if ( !indexRequests.equals( that.indexRequests ) ) {
-                return false;
-            }
-
-            return true;
-        }
-
-
-        @Override
-        public int hashCode() {
-            int result = indexRequests.hashCode();
-            result = 31 * result + deIndexRequests.hashCode();
-            return result;
-        }
-
-        public void done() {
-            //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
-            final BetterFuture<IndexOperationMessage> future = getFuture();
-
-            if(future != null ){
-                future.done();
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
new file mode 100644
index 0000000..1a60026
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+
+/**
+ * Container for index operations.
+ */
+public class IndexOperationMessage implements Serializable {
+    private final Set<IndexRequest> indexRequests;
+    private final Set<DeIndexRequest> deIndexRequests;
+
+
+
+    private final BetterFuture<IndexOperationMessage> containerFuture;
+
+
+    public IndexOperationMessage() {
+        final IndexOperationMessage parent = this;
+        this.indexRequests = new HashSet<>();
+        this.deIndexRequests = new HashSet<>();
+        this.containerFuture = new BetterFuture<>( new Callable<IndexOperationMessage>() {
+            @Override
+            public IndexOperationMessage call() throws Exception {
+                return parent;
+            }
+        } );
+    }
+
+
+    public void addIndexRequest( final IndexRequest indexRequest ) {
+        indexRequests.add( indexRequest );
+    }
+
+
+    public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
+        this.indexRequests.addAll( indexRequests );
+    }
+
+
+    public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
+        this.deIndexRequests.add( deIndexRequest );
+    }
+
+
+    public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
+        this.deIndexRequests.addAll( deIndexRequests );
+    }
+
+
+    public Set<IndexRequest> getIndexRequests() {
+        return indexRequests;
+    }
+
+
+    public Set<DeIndexRequest> getDeIndexRequests() {
+        return deIndexRequests;
+    }
+
+
+    @JsonIgnore
+    public boolean isEmpty(){
+        return indexRequests.isEmpty() && deIndexRequests.isEmpty();
+    }
+
+    /**
+     * return the promise
+     */
+    @JsonIgnore
+    public BetterFuture<IndexOperationMessage> getFuture() {
+        return containerFuture;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final IndexOperationMessage that = ( IndexOperationMessage ) o;
+
+        if ( !deIndexRequests.equals( that.deIndexRequests ) ) {
+            return false;
+        }
+        if ( !indexRequests.equals( that.indexRequests ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = indexRequests.hashCode();
+        result = 31 * result + deIndexRequests.hashCode();
+        return result;
+    }
+
+    public void done() {
+        //if this has been serialized, it could be null. don't NPE if it is, there's nothing to ack
+        final BetterFuture<IndexOperationMessage> future = getFuture();
+
+        if(future != null ){
+            future.done();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index f3663cd..5052ddf 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -61,14 +61,14 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
     private final IndexAlias alias;
     private final IndexCache indexCache;
     private final EsProvider esProvider;
-    private final IndexBufferProducer producer;
+    private final IndexBufferConsumer producer;
     private final IndexFig indexFig;
     private final Timer timer;
 
 
     @Inject
-    public IndexRefreshCommandImpl( FailureMonitorImpl.IndexIdentifier indexIdentifier, EsProvider esProvider,
-                                    IndexBufferProducer producer, IndexFig indexFig, MetricsFactory metricsFactory,
+    public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, EsProvider esProvider,
+                                    IndexBufferConsumer producer, IndexFig indexFig, MetricsFactory metricsFactory,
                                     final IndexCache indexCache ) {
 
 
@@ -105,7 +105,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
         IndexRequest indexRequest = new IndexRequest( alias.getWriteAlias(), docId, entityData );
 
         //save the item
-        IndexIdentifierImpl.IndexOperationMessage message = new IndexIdentifierImpl.IndexOperationMessage();
+        IndexOperationMessage message = new IndexOperationMessage();
         message.addIndexRequest( indexRequest );
         producer.put( message );
 
@@ -153,8 +153,8 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
                 new DeIndexRequest( aliases, appScope, edge, entity.getId(), entity.getVersion() );
 
             //delete the item
-            IndexIdentifierImpl.IndexOperationMessage indexOperationMessage =
-                new IndexIdentifierImpl.IndexOperationMessage();
+            IndexOperationMessage indexOperationMessage =
+                new IndexOperationMessage();
             indexOperationMessage.addDeIndexRequest( deIndexRequest );
             producer.put( indexOperationMessage );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java
index c60de42..c93fd86 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/LegacyIndexIdentifier.java
@@ -20,15 +20,15 @@
 package org.apache.usergrid.persistence.index.migration;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.impl.FailureMonitorImpl;
 import org.apache.usergrid.persistence.index.impl.IndexAlias;
 import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.impl.IndexIdentifier;
 import org.apache.usergrid.persistence.index.impl.IndexingUtils;
 
 /**
  * Class is used to generate an index name and alias name the old way via app name
  */
-public class LegacyIndexIdentifier implements FailureMonitorImpl.IndexIdentifier {
+public class LegacyIndexIdentifier implements IndexIdentifier {
     private final IndexFig config;
     private final ApplicationScope applicationScope;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 20daf82..410f0e3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -19,27 +19,17 @@
 package org.apache.usergrid.persistence.index.guice;
 
 
-import com.google.inject.Inject;
-import com.google.inject.TypeLiteral;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.impl.IndexBufferProducer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
-import org.apache.usergrid.persistence.index.impl.IndexCache;
-import org.apache.usergrid.persistence.index.impl.EsProvider;
-import org.apache.usergrid.persistence.index.migration.LegacyIndexIdentifier;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
-import rx.Observable;
+import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
-import java.util.UUID;
+import com.google.inject.Inject;
+import com.google.inject.TypeLiteral;
+
+import rx.Observable;
 
 
 public class TestIndexModule extends TestModule {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
deleted file mode 100644
index 43e581a..0000000
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchType;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.guice.TestIndexModule;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.queue.NoAWSCredsRule;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider;
-
-import com.google.inject.Inject;
-
-import net.jcip.annotations.NotThreadSafe;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-
-@RunWith(EsRunner.class)
-@UseModules({ TestIndexModule.class })
-@NotThreadSafe
-public class BufferQueueSQSImplTest {
-
-
-    @Inject
-    @Rule
-    public MigrationManagerRule migrationManagerRule;
-
-
-    @Rule
-    public NoAWSCredsRule noAwsCredsRule = new NoAWSCredsRule();
-
-    @Inject
-    public QueueManagerFactory queueManagerFactory;
-
-    @Inject
-    public IndexFig indexFig;
-
-    @Inject
-    public MapManagerFactory mapManagerFactory;
-
-    @Inject
-    public MetricsFactory metricsFactory;
-
-
-    private BufferQueueSQSImpl bufferQueueSQS;
-
-    @Before
-    public void setup(){
-        bufferQueueSQS = new BufferQueueSQSImpl( queueManagerFactory, indexFig, mapManagerFactory, metricsFactory );
-    }
-
-
-
-
-    @Test
-    public void testMessageIndexing(){
-
-        ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application"));
-        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        assumeTrue( ugProvider.getCredentials().getAWSAccessKeyId() != null );
-        assumeTrue( ugProvider.getCredentials().getAWSSecretKey() != null );
-
-        final Map<String, Object> request1Data  = new HashMap<String, Object>() {{put("test", "testval1");}};
-        final IndexRequest indexRequest1 =  new IndexRequest( "testAlias1", "testDoc1",request1Data );
-
-
-        final Map<String, Object> request2Data  = new HashMap<String, Object>() {{put("test", "testval2");}};
-        final IndexRequest indexRequest2 =  new IndexRequest( "testAlias2", "testDoc2",request2Data );
-
-
-        //de-index request
-        final DeIndexRequest deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, applicationScope, new SearchEdgeImpl(new SimpleId("testId3"),"name3",
-
-
-                SearchEdge.NodeType.SOURCE ),  new SimpleId("id3"), UUID.randomUUID() );
-
-        final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, applicationScope,  new SearchEdgeImpl(new SimpleId("testId4"),"name4",
-                SearchEdge.NodeType.SOURCE ),  new SimpleId("id4"), UUID.randomUUID()  );
-
-
-
-
-        IndexIdentifierImpl.IndexOperationMessage indexOperationMessage = new IndexIdentifierImpl.IndexOperationMessage();
-        indexOperationMessage.addIndexRequest( indexRequest1);
-        indexOperationMessage.addIndexRequest( indexRequest2);
-
-        indexOperationMessage.addDeIndexRequest( deIndexRequest1 );
-        indexOperationMessage.addDeIndexRequest( deIndexRequest2 );
-
-        bufferQueueSQS.offer( indexOperationMessage );
-
-        //wait for it to send to SQS
-        indexOperationMessage.getFuture().get();
-
-        //now get it back
-
-        final List<IndexIdentifierImpl.IndexOperationMessage> ops = getResults( 20, TimeUnit.SECONDS );
-
-        assertTrue(ops.size() > 0);
-
-        final IndexIdentifierImpl.IndexOperationMessage returnedOperation = ops.get( 0 );
-
-         //get the operations out
-
-        final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests();
-
-        assertTrue(indexRequestSet.contains(indexRequest1));
-        assertTrue(indexRequestSet.contains(indexRequest2));
-
-
-        final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests();
-
-        assertTrue( deIndexRequests.contains( deIndexRequest1 ) );
-        assertTrue( deIndexRequests.contains( deIndexRequest2 ) );
-
-
-
-        //now ack the message
-
-        bufferQueueSQS.ack( ops );
-
-    }
-
-    private List<IndexIdentifierImpl.IndexOperationMessage> getResults(final long timeout, final TimeUnit timeUnit){
-        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
-
-        List<IndexIdentifierImpl.IndexOperationMessage> ops;
-
-        do{
-            ops = bufferQueueSQS.take( 10,  20, TimeUnit.SECONDS );
-        }while((ops == null || ops.size() == 0 ) &&  System.currentTimeMillis() < endTime);
-
-        return ops;
-    }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java
deleted file mode 100644
index ba0dc1f..0000000
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/NoAWSCredsRule.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.queue;
-
-
-import org.junit.Assume;
-import org.junit.internal.runners.model.MultipleFailureException;
-import org.junit.rules.TestRule;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-import com.amazonaws.AmazonClientException;
-
-
-/**
- * Created in an attempt to mark no aws cred tests as ignored.  Blocked by this issue
- * https://github.com/junit-team/junit/issues/116
- *
- * Until then, simply marks as passed, which is a bit dangerous
- */
-public class NoAWSCredsRule implements TestRule {
-
-    public Statement apply( final Statement base, final Description description ) {
-        return new Statement() {
-            @Override
-            public void evaluate() throws Throwable {
-
-                try {
-                    base.evaluate();
-                }
-                catch ( Throwable t ) {
-
-                    if ( !isMissingCredsException( t ) ) {
-                        throw t;
-                    }
-
-                    //do this so our test gets marked as ignored.  Not pretty, but it works
-                    Assume.assumeTrue( false );
-
-
-                }
-            }
-        };
-    }
-
-
-    private boolean isMissingCredsException( final Throwable t ) {
-
-        if ( t instanceof AmazonClientException ) {
-
-            final AmazonClientException ace = ( AmazonClientException ) t;
-
-            if ( ace.getMessage().contains( "could not get aws access key" ) || ace.getMessage().contains(
-                "could not get aws secret key from system properties" ) ) {
-                //swallow
-                return true;
-            }
-        }
-
-        /**
-         * Handle the multiple failure junit trace
-         */
-        if( t instanceof MultipleFailureException ){
-            for(final Throwable failure : ((MultipleFailureException)t).getFailures()){
-                final boolean isMissingCreds = isMissingCredsException( failure );
-
-                if(isMissingCreds){
-                    return true;
-                }
-            }
-        }
-        final Throwable cause = t.getCause();
-
-        if ( cause == null ) {
-            return false;
-        }
-
-
-        return isMissingCredsException( cause );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index eecb9e1..452d328 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -27,16 +27,15 @@ import java.util.List;
 import java.util.Map;
 
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
 import org.apache.usergrid.persistence.core.test.ITRunner;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
 import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider;
 
 import com.google.inject.Inject;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index 6074e65..2f1ccdf 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -284,42 +284,42 @@
       <artifactId>slf4j-api</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.usergrid</groupId>
-      <artifactId>usergrid-core</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-      <classifier>tests</classifier>
-    </dependency>
+      <dependency>
+          <groupId>org.apache.usergrid</groupId>
+          <artifactId>usergrid-core</artifactId>
+          <version>${project.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+      </dependency>
 
       <dependency>
           <groupId>${project.parent.groupId}</groupId>
-          <artifactId>queue</artifactId>
+          <artifactId>common</artifactId>
           <version>${project.version}</version>
           <type>test-jar</type>
           <scope>test</scope>
       </dependency>
 
       <dependency>
-      <groupId>org.apache.usergrid</groupId>
-      <artifactId>usergrid-config</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-      <classifier>tests</classifier>
-    </dependency>
+          <groupId>org.apache.usergrid</groupId>
+          <artifactId>usergrid-config</artifactId>
+          <version>${project.version}</version>
+          <scope>test</scope>
+          <classifier>tests</classifier>
+      </dependency>
 
-    <dependency>
-      <groupId>org.apache.usergrid</groupId>
-      <artifactId>usergrid-test-utils</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
+      <dependency>
+          <groupId>org.apache.usergrid</groupId>
+          <artifactId>usergrid-test-utils</artifactId>
+          <version>${project.version}</version>
+          <scope>test</scope>
+      </dependency>
 
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
+      <dependency>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+          <scope>test</scope>
+      </dependency>
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
index e6970b5..fbf8290 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/NotifiersServiceIT.java
@@ -16,33 +16,28 @@
  */
 package org.apache.usergrid.services.notifications;
 
-import org.apache.commons.io.IOUtils;
 
-import org.apache.usergrid.persistence.queue.NoAWSCredsRule;
-import org.apache.usergrid.services.notifications.apns.MockSuccessfulProviderAdapter;
-import org.apache.usergrid.persistence.entities.Notifier;
+import java.io.InputStream;
+import java.lang.reflect.Field;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
-import org.apache.usergrid.services.notifications.ConnectionException;
-import org.apache.usergrid.services.notifications.NotificationsService;
 
-import java.io.InputStream;
-import java.lang.reflect.Field;
-import java.net.SocketException;
+import org.apache.commons.io.IOUtils;
+
 import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.services.AbstractServiceIT;
 import org.apache.usergrid.services.ServiceAction;
 import org.apache.usergrid.setup.ConcurrentProcessSingleton;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertArrayEquals;
-
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class NotifiersServiceIT extends AbstractServiceIT {


Mime
View raw message