usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [06/50] [abbrv] incubator-usergrid git commit: Refactors operations into easier build pattern. Pipeline still need some work.
Date Thu, 28 May 2015 12:53:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
index 84654aa..91773c4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -23,12 +23,12 @@ package org.apache.usergrid.corepersistence.pipeline.read.collect;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
 import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.common.base.Optional;
 
@@ -37,21 +37,32 @@ import rx.Observable;
 
 /**
  * Takes entities and collects them into results.  This mostly exists for 1.0 compatibility.  Eventually this will
- * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST.
+ * become the only collector in our pipeline and be used when rendering results, both on GET, PUT and POST.
+ *
+ *
+ *
+ * @param T the type of element to be collected
  */
-public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
-    implements Collector<Entity, ResultsPage> {
+public class ResultsPageCollector<T> extends AbstractFilter<FilterResult<T>, ResultsPage<T>> {
+
+
+    protected PipelineContext pipelineContext;
+
+
+    @Override
+    public void setContext( final PipelineContext pipelineContext ) {
+        this.pipelineContext = pipelineContext;
+    }
+
 
 
     @Override
-    public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+    public Observable<ResultsPage<T>> call( final Observable<FilterResult<T>> filterResultObservable ) {
 
         final int limit = pipelineContext.getLimit();
 
         return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
-            () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> {
-                collector.add( entity );
-            } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
+            () -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
             new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
     }
 
@@ -59,10 +70,10 @@ public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
     /**
      * A collector that will aggregate our results together
      */
-    private static class ResultsPageWithCursorCollector {
+    private class ResultsPageWithCursorCollector {
 
 
-        private final List<Entity> results;
+        private final List<T> results;
 
         private Optional<EdgePath> lastPath;
 
@@ -72,7 +83,7 @@ public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
         }
 
 
-        public void add( final FilterResult<Entity> result ) {
+        public void add( final FilterResult<T> result ) {
             this.results.add( result.getValue() );
             this.lastPath = result.getPath();
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
deleted file mode 100644
index f403e21..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.codahale.metrics.Timer;
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * Command for reading graph edges
- */
-public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer>
-    implements Filter<Id, Candidate> {
-
-    private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
-
-    private final EntityIndexFactory entityIndexFactory;
-    private final String query;
-    private final Timer searchTimer;
-
-
-    /**
-     * Create a new instance of our command
-     */
-    public AbstractElasticSearchFilter( final EntityIndexFactory entityIndexFactory,
-                                        final MetricsFactory metricsFactory, final String query ) {
-        this.entityIndexFactory = entityIndexFactory;
-        this.query = query;
-        this.searchTimer = metricsFactory.getTimer( AbstractElasticSearchFilter.class, "query" );
-    }
-
-
-    @Override
-    public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
-
-        //get the graph manager
-        final ApplicationEntityIndex applicationEntityIndex =
-            entityIndexFactory.createApplicationEntityIndex( pipelineContext.getApplicationScope() );
-
-
-        final int limit = pipelineContext.getLimit();
-
-
-        final SearchTypes searchTypes = getSearchTypes();
-
-
-        //return all ids that are emitted from this edge
-        return observable.flatMap( idFilterResult -> {
-
-            final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
-
-
-            final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
-
-                //our offset to our start value.  This will be set the first time we emit
-                //after we receive new ids, we want to reset this to 0
-                //set our our constant state
-                final Optional<Integer> startFromCursor = getSeekValue();
-
-                final int startOffset = startFromCursor.or( 0 );
-
-                int currentOffSet = startOffset;
-
-                subscriber.onStart();
-
-                //emit while we have values from ES and someone is subscribed
-                while ( !subscriber.isUnsubscribed() ) {
-
-
-                    try {
-                        final CandidateResults candidateResults =
-                            applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
-
-
-
-                        for( CandidateResult candidateResult: candidateResults){
-
-                            //our subscriber unsubscribed, break out
-                            if(subscriber.isUnsubscribed()){
-                                return;
-                            }
-
-                            final Candidate candidate = new Candidate( candidateResult, searchEdge );
-
-                            final FilterResult<Candidate>
-                                result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
-
-                            subscriber.onNext( result );
-
-                            currentOffSet++;
-                        }
-
-                        /**
-                         * No candidates, we're done
-                         */
-                        if (candidateResults.size() < limit) {
-                            subscriber.onCompleted();
-                            return;
-                        }
-
-                    }
-                    catch ( Throwable t ) {
-
-                        log.error( "Unable to search candidates", t );
-                        subscriber.onError( t );
-                    }
-                }
-            } );
-
-
-            //add a timer around our observable
-            ObservableTimer.time( candidates, searchTimer );
-
-            return candidates;
-        } );
-    }
-
-
-    @Override
-    protected CursorSerializer<Integer> getCursorSerializer() {
-        return ElasticsearchCursorSerializer.INSTANCE;
-    }
-
-
-    /**
-     * Get the search edge from the id
-     */
-    protected abstract SearchEdge getSearchEdge( final Id id );
-
-    /**
-     * Get the search types
-     */
-    protected abstract SearchTypes getSearchTypes();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
deleted file mode 100644
index ab9d5d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Create a candidate. This holds the original candidate, as well as the search scope it was found it
- */
-public class Candidate {
-
-    private final CandidateResult candidateResult;
-    private final SearchEdge searchEdge;
-
-
-    /**
-     * Create a new Candidate for further processing
-     * @param candidateResult  The candidate result
-     * @param searchEdge The search edge this was searched on
-     */
-    public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
-        this.candidateResult = candidateResult;
-        this.searchEdge = searchEdge;
-    }
-
-
-    public CandidateResult getCandidateResult() {
-        return candidateResult;
-    }
-
-
-    public SearchEdge getSearchEdge() {
-        return searchEdge;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
deleted file mode 100644
index 4304b37..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
- * performs internal buffering for efficiency.  Note that all entities may not be emitted if our load crosses page boundaries.  It is up to the
- * collector to determine when to stop streaming entities.
- */
-public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity>
-    implements Filter<Candidate, Entity> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-
-
-    @Inject
-    public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                  final EntityIndexFactory entityIndexFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-    }
-
-
-    @Override
-       public Observable<FilterResult<Entity>> call(
-           final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
-         * objects
-         */
-
-        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        final ApplicationEntityIndex applicationIndex =
-            entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        //buffer them to get a page size we can make 1 network hop
-        final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
-
-            //load them
-            .flatMap( candidateResults -> {
-                    //flatten toa list of ids to load
-                    final Observable<List<Id>> candidateIds =
-                        Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
-
-                    //load the ids
-                    final Observable<EntitySet> entitySetObservable =
-                        candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
-
-                    //now we have a collection, validate our canidate set is correct.
-
-                    return entitySetObservable.map(
-                        entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
-                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
-                                              .flatMap(
-                                                  entityCollector -> Observable.from( entityCollector.getResults() ) );
-                } );
-
-        //if we filter all our results, we want to continue to try the next page
-        return searchIdSetObservable;
-    }
-
-
-
-
-    /**
-     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
-     */
-    private static final class EntityVerifier {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
-        private List<FilterResult<Entity>> results = new ArrayList<>();
-
-        private final EntityIndexBatch batch;
-        private final List<FilterResult<Candidate>> candidateResults;
-        private final EntitySet entitySet;
-
-
-        public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
-                               final List<FilterResult<Candidate>> candidateResults ) {
-            this.batch = batch;
-            this.entitySet = entitySet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( entitySet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-
-            batch.execute();
-        }
-
-
-        public List<FilterResult<Entity>> getResults() {
-            return results;
-        }
-
-
-        public EntityIndexBatch getBatch() {
-            return batch;
-        }
-
-
-        private void validate( final FilterResult<Candidate> filterResult ) {
-
-            final Candidate candidate = filterResult.getValue();
-            final CandidateResult candidateResult = candidate.getCandidateResult();
-            final SearchEdge searchEdge = candidate.getSearchEdge();
-            final Id candidateId = candidateResult.getId();
-            final UUID candidateVersion = candidateResult.getVersion();
-
-
-            final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
-            //doesn't exist warn and drop
-            if ( entity == null ) {
-                logger.warn(
-                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
-                        + "  Ignoring since this could be a region sync issue",
-                    candidateId, candidateVersion );
-
-
-                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-
-            }
-
-
-            final UUID entityVersion = entity.getVersion();
-            final Id entityId = entity.getId();
-
-
-
-
-
-            //entity is newer than ES version, could be an update or the entity is marked as deleted
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
-
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
-                    new Object[] { searchEdge, entityId, entityVersion } );
-                batch.deindex( searchEdge, entityId, candidateVersion );
-                return;
-            }
-
-            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
-            //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
-                logger.warn(
-                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
-                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-
-                  //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-            }
-
-            //they're the same add it
-
-            final Entity returnEntity = entity.getEntity().get();
-
-            final Optional<EdgePath> parent = filterResult.getPath();
-
-            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
-
-            results.add( toReturn );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
deleted file mode 100644
index 0e87141..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Responsible for verifying candidate result versions, then emitting the Ids of these versions Input is a batch of
- * candidate results, output is a stream of validated Ids
- */
-public class CandidateIdFilter extends AbstractFilter<Candidate, Id> implements Filter<Candidate, Id> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-
-
-    @Inject
-    public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                              final EntityIndexFactory entityIndexFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-    }
-
-
-    @Override
-    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
-         * objects
-         */
-
-        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        final ApplicationEntityIndex applicationIndex =
-            entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final Observable<FilterResult<Id>> searchIdSetObservable =
-            filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> {
-                    //flatten toa list of ids to load
-                    final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map(
-                        candidate -> candidate.getValue().getCandidateResult().getId() ).toList();
-
-                    //load the ids
-                    final Observable<VersionSet> versionSetObservable =
-                        candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
-
-                    //now we have a collection, validate our canidate set is correct.
-
-                    return versionSetObservable.map(
-                        entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
-                            candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
-                        entityCollector -> Observable.from( entityCollector.collectResults() ) );
-                } );
-
-        return searchIdSetObservable;
-    }
-
-
-    /**
-     * Map a new cp entity to an old entity.  May be null if not present
-     */
-    private static final class EntityCollector {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
-        private List<FilterResult<Id>> results = new ArrayList<>();
-
-        private final EntityIndexBatch batch;
-        private final List<FilterResult<Candidate>> candidateResults;
-        private final VersionSet versionSet;
-
-
-        public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
-                                final List<FilterResult<Candidate>> candidateResults ) {
-            this.batch = batch;
-            this.versionSet = versionSet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( versionSet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-
-            batch.execute();
-        }
-
-
-        public List<FilterResult<Id>> collectResults() {
-            return results;
-        }
-
-
-        /**
-         * Validate each candidate results vs the data loaded from cass
-         */
-        private void validate( final FilterResult<Candidate> filterCandidate ) {
-
-            final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
-
-            final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
-
-            final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
-
-            final UUID candidateVersion = candidateResult.getVersion();
-
-            final UUID entityVersion = logEntry.getVersion();
-
-            final Id entityId = logEntry.getEntityId();
-
-            //entity is newer than ES version
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
-
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
-                    new Object[] { searchEdge, entityId, entityVersion } );
-                batch.deindex( searchEdge, entityId, entityVersion );
-                return;
-            }
-
-            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
-            //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
-                logger.warn(
-                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
-                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-            }
-
-            //they're the same add it
-
-            final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() );
-
-            results.add( result );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java
deleted file mode 100644
index 702b2d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
-
-
-public class ElasticSearchCollectionFilter extends AbstractElasticSearchFilter {
-
-    private final String collectionName;
-    private final String entityType;
-
-    /**
-     * Create a new instance of our command
-     *
-     * @param entityIndexFactory The entity index factory used to search
-     * @param  metricsFactory The metrics factory for metrics
-     * @param collectionName The name of the collection
-     * @param entityType The entity type
-     */
-    @Inject
-    public ElasticSearchCollectionFilter( final EntityIndexFactory entityIndexFactory,
-                                          final MetricsFactory metricsFactory, @Assisted( "query" ) final String query,
-                                          @Assisted( "collectionName" ) final String collectionName,
-                                          @Assisted( "entityType" ) final String entityType ) {
-        super( entityIndexFactory, metricsFactory, query );
-        this.collectionName = collectionName;
-        this.entityType = entityType;
-    }
-
-
-
-    @Override
-    protected SearchTypes getSearchTypes() {
-        final SearchTypes types = SearchTypes.fromTypes( entityType );
-
-        return types;
-    }
-
-
-    @Override
-    protected SearchEdge getSearchEdge( final Id incomingId ) {
-        final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, collectionName );
-
-        return searchEdge;
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java
deleted file mode 100644
index cc40530..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge;
-
-
-public class ElasticSearchConnectionFilter extends AbstractElasticSearchFilter {
-
-
-    private final String connectionName;
-    private final Optional<String> connectedEntityType;
-
-
-    /**
-     * Create a new instance of our command
-     */
-    @Inject
-    public ElasticSearchConnectionFilter( final EntityIndexFactory entityIndexFactory,
-                                          final MetricsFactory metricsFactory, @Assisted( "query" ) final String query,
-                                          @Assisted( "connectionName" ) final String connectionName,
-                                          @Assisted( "connectedEntityType" )
-                                          final Optional<String> connectedEntityType ) {
-        super( entityIndexFactory, metricsFactory, query );
-
-        this.connectionName = connectionName;
-        this.connectedEntityType = connectedEntityType;
-    }
-
-
-    @Override
-    protected SearchTypes getSearchTypes() {
-        final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType.orNull() );
-
-        return searchTypes;
-    }
-
-
-    @Override
-    protected SearchEdge getSearchEdge( final Id id ) {
-        final SearchEdge searchEdge = createConnectionSearchEdge( id, connectionName );
-
-        return searchEdge;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
deleted file mode 100644
index a4e7746..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-
-
-/**
- * ElasticSearch cursor serializer
- */
-public class ElasticsearchCursorSerializer extends AbstractCursorSerializer<Integer> {
-
-
-    public static final ElasticsearchCursorSerializer INSTANCE = new ElasticsearchCursorSerializer();
-
-    @Override
-    protected Class<Integer> getType() {
-        return Integer.class;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg
deleted file mode 100644
index 08970e3..0000000
Binary files a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
deleted file mode 100644
index 42b352b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.SearchByEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * Filter should take and Id and a graph edge, and ensure the connection between the two exists
- */
-public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements
-    Filter<Id, Id> {
-
-    private final GraphManagerFactory graphManagerFactory;
-    private final Id targetId;
-
-
-    @Inject
-    public AbstractReadGraphEdgeByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final Id
-        targetId ) {
-        this.graphManagerFactory = graphManagerFactory;
-        this.targetId = targetId;
-    }
-
-
-    @Override
-    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
-
-        final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
-
-        return filterValueObservable.flatMap( filterValue -> {
-            final String edgeTypeName = getEdgeName();
-            final Id id = filterValue.getValue();
-
-            //create our search
-            final SearchByEdge searchByEdge =
-                new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    Optional.absent() );
-
-            //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
-            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
-        } );
-    }
-
-
-    /**
-     * Get the name of the edge to be used in the seek
-     */
-    protected abstract String getEdgeName();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
deleted file mode 100644
index 303bc5b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * Command for reading graph edges
- */
-public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
-
-    private final GraphManagerFactory graphManagerFactory;
-
-
-    /**
-     * Create a new instance of our command
-     */
-    public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) {
-        this.graphManagerFactory = graphManagerFactory;
-    }
-
-
-    @Override
-    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
-
-
-        //get the graph manager
-        final GraphManager graphManager =
-            graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
-
-
-        final String edgeName = getEdgeTypeName();
-        final EdgeState edgeCursorState = new EdgeState();
-
-
-        //return all ids that are emitted from this edge
-        return previousIds.flatMap( previousFilterValue -> {
-
-            //set our our constant state
-            final Optional<Edge> startFromCursor = getSeekValue();
-            final Id id = previousFilterValue.getValue();
-
-
-            final SimpleSearchByEdgeType search =
-                new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    startFromCursor );
-
-            /**
-             * TODO, pass a message with pointers to our cursor values to be generated later
-             */
-            return graphManager.loadEdgesFromSource( search )
-                //set the edge state for cursors
-                .doOnNext( edge -> edgeCursorState.update( edge ) )
-
-                    //map our id from the target edge  and set our cursor every edge we traverse
-                .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
-                    previousFilterValue.getPath() ) );
-        } );
-    }
-
-
-    @Override
-    protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
-                                                   final Optional<EdgePath> parent ) {
-
-        //if it's our first pass, there's no cursor to generate
-        if(cursorValue == null){
-            return new FilterResult<>( emit, parent );
-        }
-
-        return super.createFilterResult( emit, cursorValue, parent );
-    }
-
-
-    @Override
-    protected CursorSerializer<Edge> getCursorSerializer() {
-        return EdgeCursorSerializer.INSTANCE;
-    }
-
-
-    /**
-     * Get the edge type name we should use when traversing
-     */
-    protected abstract String getEdgeTypeName();
-
-
-    /**
-     * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We
-     * always try to seek to the same position as we ended.  Since we don't deal with a persistent read result, if we
-     * seek to a value = to our last, we may skip data.
-     */
-    private final class EdgeState {
-
-        private Edge cursorEdge = null;
-        private Edge currentEdge = null;
-
-
-        /**
-         * Update the pointers
-         */
-        private void update( final Edge newEdge ) {
-            cursorEdge = currentEdge;
-            currentEdge = newEdge;
-        }
-
-
-        /**
-         * Get the edge to use in cursors for resume
-         */
-        private Edge getCursorEdge() {
-            return cursorEdge;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
deleted file mode 100644
index 769a67e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-
-
-/**
- * Edge cursor serializer
- */
-public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
-
-
-    public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer();
-
-    @Override
-    protected Class<SimpleEdge> getType() {
-        return SimpleEdge.class;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
deleted file mode 100644
index 5a0e026..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
- * be removed
- */
-public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> {
-
-    private final Id entityId;
-
-
-    @Inject
-    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
-
-
-
-    @Override
-    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
-        //ignore what our input was, and simply emit the id specified
-       return filterValueObservable.map( idFilterResult ->  new FilterResult( entityId, idFilterResult.getPath() ));
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
deleted file mode 100644
index d598a2e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
- */
-public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-
-
-    @Inject
-    public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-    }
-
-
-    @Override
-    public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
-
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
-
-        //it's more efficient to make 1 network hop to get everything, then drop our results if required
-        final Observable<FilterResult<Entity>> entityObservable =
-            filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
-
-                    final Observable<EntitySet> entitySetObservable =
-                        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
-                                  .flatMap( ids -> entityCollectionManager.load( ids ) );
-
-
-                    //now we have a collection, validate our canidate set is correct.
-
-                    return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
-                                              .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
-                            entityCollector -> Observable.from( entityCollector.getResults() ) );
-                } );
-
-        return entityObservable;
-    }
-
-
-    /**
-     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this
-     * state is mutable and difficult to represent functionally
-     */
-    private static final class EntityVerifier {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
-        private List<FilterResult<Entity>> results = new ArrayList<>();
-
-        private final List<FilterResult<Id>> candidateResults;
-        private final EntitySet entitySet;
-
-
-        public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
-            this.entitySet = entitySet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( entitySet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final FilterResult<Id> candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-        }
-
-
-        public List<FilterResult<Entity>> getResults() {
-            return results;
-        }
-
-
-        private void validate( final FilterResult<Id> filterResult ) {
-
-            final Id candidateId = filterResult.getValue();
-
-
-            final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
-            //doesn't exist warn and drop
-            if ( entity == null || !entity.getEntity().isPresent() ) {
-                logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
-                        + "  Ignoring since this could be a region sync issue", candidateId );
-
-
-                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-            }
-
-            //it exists, add it
-
-            final Entity returnEntity = entity.getEntity().get();
-
-            final Optional<EdgePath> parent = filterResult.getPath();
-
-            final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
-
-            results.add( toReturn );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg
deleted file mode 100644
index c0308bd..0000000
Binary files a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
deleted file mode 100644
index da6ad29..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-
-/**
- * Read an edge in the graph to verify it's existence by id
- */
-public class ReadGraphCollectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{
-
-    private final String collectionName;
-
-    @Inject
-    public ReadGraphCollectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName, @Assisted final Id targetId ) {
-        super( graphManagerFactory, targetId );
-        this.collectionName = collectionName;
-    }
-
-
-    @Override
-    protected String getEdgeName() {
-        return CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
deleted file mode 100644
index 91ae7c3..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromCollectionName;
-
-
-/**
- * Command for reading graph edges on a collection
- */
-public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
-
-    private final String collectionName;
-
-
-    /**
-     * Create a new instance of our command
-     */
-    @Inject
-    public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) {
-        super( graphManagerFactory );
-        this.collectionName = collectionName;
-    }
-
-
-    @Override
-    protected String getEdgeTypeName() {
-        return getEdgeTypeFromCollectionName( collectionName );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
deleted file mode 100644
index 4756d33..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-
-/**
- * Read an edge in the graph to verify it's existence by id
- */
-public class ReadGraphConnectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{
-
-    private final String connectionName;
-
-    @Inject
-    public ReadGraphConnectionByIdFilter( final GraphManagerFactory graphManagerFactory,
-                                          @Assisted final String connectionName, @Assisted final Id targetId ) {
-        super( graphManagerFactory, targetId );
-        this.connectionName = connectionName;
-    }
-
-
-    @Override
-    protected String getEdgeName() {
-        return CpNamingUtils.getEdgeTypeFromConnectionType( connectionName );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
deleted file mode 100644
index 7371579..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
-
-
-/**
- * Command for reading graph edges on a connection
- */
-public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
-
-    private final GraphManagerFactory graphManagerFactory;
-    private final String connectionName;
-    private final String entityType;
-
-
-    /**
-     * Create a new instance of our command
-     */
-    @Inject
-    public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory,
-                                            @Assisted("connectionName") final String connectionName, @Assisted("entityType") final String entityType ) {
-        this.graphManagerFactory = graphManagerFactory;
-        this.connectionName = connectionName;
-        this.entityType = entityType;
-    }
-
-
-
-    @Override
-    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
-
-        //get the graph manager
-        final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
-
-
-
-        final String edgeName = getEdgeTypeFromConnectionType( connectionName );
-
-
-        //return all ids that are emitted from this edge
-        return filterResultObservable.flatMap( idFilterResult -> {
-
-              //set our our constant state
-            final Optional<Edge> startFromCursor = getSeekValue();
-            final Id id = idFilterResult.getValue();
-
-            final SimpleSearchByIdType search =
-                new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    entityType, startFromCursor );
-
-            return graphManager.loadEdgesFromSourceByType( search ).map(
-                edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
-        } );
-    }
-
-
-    @Override
-    protected CursorSerializer<Edge> getCursorSerializer() {
-        return EdgeCursorSerializer.INSTANCE;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
deleted file mode 100644
index 0d4971b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
-
-
-/**
- * Command for reading graph edges on a connection
- */
-public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
-
-    private final String connectionName;
-
-
-    /**
-     * Create a new instance of our command
-     */
-    @Inject
-    public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) {
-        super( graphManagerFactory );
-        this.connectionName = connectionName;
-    }
-
-
-    @Override
-    protected String getEdgeTypeName() {
-        return getEdgeTypeFromConnectionType( connectionName );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
new file mode 100644
index 0000000..eaf74c1
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Command for reading graph edges
+ */
+public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer> {
+
+    private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
+
+    private final EntityIndexFactory entityIndexFactory;
+    private final String query;
+    private final Timer searchTimer;
+
+
+    /**
+     * Create a new instance of our command
+     */
+    public AbstractElasticSearchFilter( final EntityIndexFactory entityIndexFactory,
+                                        final MetricsFactory metricsFactory, final String query ) {
+        this.entityIndexFactory = entityIndexFactory;
+        this.query = query;
+        this.searchTimer = metricsFactory.getTimer( AbstractElasticSearchFilter.class, "query" );
+    }
+
+
+    @Override
+    public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
+
+        //get the graph manager
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( pipelineContext.getApplicationScope() );
+
+
+        final int limit = pipelineContext.getLimit();
+
+
+        final SearchTypes searchTypes = getSearchTypes();
+
+
+        //return all ids that are emitted from this edge
+        return observable.flatMap( idFilterResult -> {
+
+            final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
+
+
+            final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
+
+                //our offset to our start value.  This will be set the first time we emit
+                //after we receive new ids, we want to reset this to 0
+                //set our our constant state
+                final Optional<Integer> startFromCursor = getSeekValue();
+
+                final int startOffset = startFromCursor.or( 0 );
+
+                int currentOffSet = startOffset;
+
+                subscriber.onStart();
+
+                //emit while we have values from ES and someone is subscribed
+                while ( !subscriber.isUnsubscribed() ) {
+
+
+                    try {
+                        final CandidateResults candidateResults =
+                            applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
+
+
+
+                        for( CandidateResult candidateResult: candidateResults){
+
+                            //our subscriber unsubscribed, break out
+                            if(subscriber.isUnsubscribed()){
+                                return;
+                            }
+
+                            final Candidate candidate = new Candidate( candidateResult, searchEdge );
+
+                            final FilterResult<Candidate>
+                                result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
+
+                            subscriber.onNext( result );
+
+                            currentOffSet++;
+                        }
+
+                        /**
+                         * No candidates, we're done
+                         */
+                        if (candidateResults.size() < limit) {
+                            subscriber.onCompleted();
+                            return;
+                        }
+
+                    }
+                    catch ( Throwable t ) {
+
+                        log.error( "Unable to search candidates", t );
+                        subscriber.onError( t );
+                    }
+                }
+            } );
+
+
+            //add a timer around our observable
+            ObservableTimer.time( candidates, searchTimer );
+
+            return candidates;
+        } );
+    }
+
+
+    @Override
+    protected CursorSerializer<Integer> getCursorSerializer() {
+        return ElasticsearchCursorSerializer.INSTANCE;
+    }
+
+
+    /**
+     * Get the search edge from the id
+     */
+    protected abstract SearchEdge getSearchEdge( final Id id );
+
+    /**
+     * Get the search types
+     */
+    protected abstract SearchTypes getSearchTypes();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java
new file mode 100644
index 0000000..7ada4ba
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Create a candidate. This holds the original candidate, as well as the search scope it was found it
+ */
+public class Candidate {
+
+    private final CandidateResult candidateResult;
+    private final SearchEdge searchEdge;
+
+
+    /**
+     * Create a new Candidate for further processing
+     * @param candidateResult  The candidate result
+     * @param searchEdge The search edge this was searched on
+     */
+    public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
+        this.candidateResult = candidateResult;
+        this.searchEdge = searchEdge;
+    }
+
+
+    public CandidateResult getCandidateResult() {
+        return candidateResult;
+    }
+
+
+    public SearchEdge getSearchEdge() {
+        return searchEdge;
+    }
+}


Mime
View raw message