usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [03/12] incubator-usergrid git commit: Massive refactor. Paths for cursor generation are now part of our I/O results. This allows the collector to take until satisfied, then generate a serializable path.
Date Mon, 04 May 2015 17:32:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
new file mode 100644
index 0000000..56e1c1c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Responsible for verifying candidate result versions, then emitting the Ids of these versions
+ * Input is a batch of candidate results, output is a stream of validated Ids
+ */
+public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
+    implements Filter<Candidate, Id> {
+
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EntityIndexFactory entityIndexFactory;
+
+
+    @Inject
+    public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                              final EntityIndexFactory entityIndexFactory ) {
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.entityIndexFactory = entityIndexFactory;
+    }
+
+
+
+    @Override
+      public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
+
+
+        /**
+         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
+         * objects
+         */
+
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+        final EntityCollectionManager entityCollectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+        final ApplicationEntityIndex applicationIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        final Observable<FilterResult<Id>> searchIdSetObservable = filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap(
+            candidateResults -> {
+                //flatten toa list of ids to load
+                final Observable<List<Id>> candidateIds =
+                    Observable.from( candidateResults ).map( candidate -> candidate.getValue().getCandidateResult().getId() )
+                              .toList();
+
+                //load the ids
+                final Observable<VersionSet> versionSetObservable =
+                    candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
+
+                //now we have a collection, validate our canidate set is correct.
+
+                return versionSetObservable.map(
+                    entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
+                                           .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                        entityCollector -> Observable.from( entityCollector.collectResults() ) );
+        } );
+
+        return searchIdSetObservable;
+    }
+
+
+
+
+
+    /**
+     * Map a new cp entity to an old entity.  May be null if not present
+     */
+    private static final class EntityCollector {
+
+        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
+        private List<FilterResult<Id>> results = new ArrayList<>();
+
+        private final EntityIndexBatch batch;
+        private final List<FilterResult<Candidate>> candidateResults;
+        private final VersionSet versionSet;
+
+
+        public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
+                                final List<FilterResult<Candidate>> candidateResults ) {
+            this.batch = batch;
+            this.versionSet = versionSet;
+            this.candidateResults = candidateResults;
+            this.results = new ArrayList<>( versionSet.size() );
+        }
+
+
+        /**
+         * Merge our candidates and our entity set into results
+         */
+        public void merge() {
+
+            for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+                validate( candidateResult );
+            }
+
+            batch.execute();
+        }
+
+
+        public List<FilterResult<Id>> collectResults() {
+            return results;
+        }
+
+
+        /**
+         * Validate each candidate results vs the data loaded from cass
+         * @param filterCandidate
+         */
+        private void validate( final FilterResult<Candidate> filterCandidate ) {
+
+            final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
+
+            final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
+
+            final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
+
+            final UUID candidateVersion = candidateResult.getVersion();
+
+            final UUID entityVersion = logEntry.getVersion();
+
+            final Id entityId = logEntry.getEntityId();
+
+            //entity is newer than ES version
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
+
+                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                    new Object[] { searchEdge, entityId, entityVersion } );
+                batch.deindex( searchEdge, entityId, entityVersion );
+                return;
+            }
+
+            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+            //remove the ES record, since the read in cass should cause a read repair, just ignore
+            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+                logger.warn(
+                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
+                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+            }
+
+            //they're the same add it
+
+            final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath()  );
+
+            results.add( result );
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
deleted file mode 100644
index 465ff22..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from an incoming CandidateResults object and return them as results
- */
-public class CandidateResultsEntityResultsCollector extends AbstractPipelineOperation<CandidateResults, ResultsPage>
-    implements Collector<CandidateResults, ResultsPage> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-
-
-    @Inject
-    public CandidateResultsEntityResultsCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                                   final EntityIndexFactory entityIndexFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-    }
-
-
-    @Override
-    public Observable<ResultsPage> call( final Observable<CandidateResults> candidateResultsObservable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
-         * objects
-         */
-
-        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        final ApplicationEntityIndex applicationIndex =
-            entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final Observable<ResultsPage> searchIdSetObservable = candidateResultsObservable.flatMap( candidateResults -> {
-            //flatten toa list of ids to load
-            final Observable<List<Id>> candidateIds =
-                Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
-            //load the ids
-            final Observable<EntitySet> entitySetObservable =
-                candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
-
-            //now we have a collection, validate our canidate set is correct.
-
-            return entitySetObservable
-                .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
-                .doOnNext( entityCollector -> entityCollector.merge() )
-                .map( entityCollector -> entityCollector.getResults() );
-        } );
-
-        //if we filter all our results, we want to continue to try the next page
-        return searchIdSetObservable;
-    }
-
-
-    /**
-     * Our collector to collect entities.  Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
-     */
-    private static final class EntityCollector {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
-        private List<Entity> results = new ArrayList<>();
-
-        private final EntityIndexBatch batch;
-        private final CandidateResults candidateResults;
-        private final EntitySet entitySet;
-
-
-        public EntityCollector( final EntityIndexBatch batch, final EntitySet entitySet,
-                                final CandidateResults candidateResults ) {
-            this.batch = batch;
-            this.entitySet = entitySet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( entitySet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final CandidateResult candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-
-            batch.execute();
-        }
-
-
-        public ResultsPage getResults() {
-            return new ResultsPage( results );
-        }
-
-
-        public EntityIndexBatch getBatch() {
-            return batch;
-        }
-
-
-        private void validate( final CandidateResult candidateResult ) {
-
-            final Id candidateId = candidateResult.getId();
-            final UUID candidateVersion = candidateResult.getVersion();
-
-
-            final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
-            //doesn't exist warn and drop
-            if ( entity == null ) {
-                logger.warn(
-                    "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
-                        + "  Ignoring since this could be a region sync issue",
-                    candidateId, candidateVersion );
-
-
-                //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-
-            }
-
-
-            final UUID entityVersion = entity.getVersion();
-
-
-            //entity is newer than ES version, could be an update or the entity is marked as deleted
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0) {
-
-                final Id entityId = entity.getId();
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
-                    new Object[] { searchEdge, entityId, entityVersion } );
-                batch.deindex( searchEdge, entityId, entityVersion );
-                return;
-            }
-
-            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
-            //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
-                final Id entityId = entity.getId();
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn(
-                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
-                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-
-                  //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
-                return;
-            }
-
-            //they're the same add it
-
-
-            results.add( entity.getEntity().get() );
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
deleted file mode 100644
index bb9ab76..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Responsible for verifying candidate result versions, then emitting the Ids of these versions
- * Input is a batch of candidate results, output is a stream of validated Ids
- */
-public class CandidateResultsIdVerifyFilter extends AbstractPipelineOperation<CandidateResults, Id>
-    implements PipelineOperation<CandidateResults, Id> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final EntityIndexFactory entityIndexFactory;
-
-
-    @Inject
-    public CandidateResultsIdVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                           final EntityIndexFactory entityIndexFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.entityIndexFactory = entityIndexFactory;
-    }
-
-
-
-    @Override
-    public Observable<Id> call( final Observable<CandidateResults> observable ) {
-
-
-        /**
-         * A bit kludgy from old 1.0 -> 2.0 apis.  Refactor this as we clean up our lower levels and create new results
-         * objects
-         */
-
-        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
-        final ApplicationEntityIndex applicationIndex =
-            entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
-        final Observable<Id> searchIdSetObservable = observable.flatMap( candidateResults -> {
-            //flatten toa list of ids to load
-            final Observable<List<Id>> candidateIds =
-                Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
-            //load the ids
-            final Observable<VersionSet> versionSetObservable =
-                candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
-
-            //now we have a collection, validate our canidate set is correct.
-
-            return versionSetObservable
-                .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
-                .doOnNext( entityCollector -> entityCollector.merge() )
-                .flatMap( entityCollector -> Observable.from(  entityCollector.collectResults() ) );
-        } );
-
-        return searchIdSetObservable;
-    }
-
-
-    /**
-     * Map a new cp entity to an old entity.  May be null if not present
-     */
-    private static final class EntityCollector {
-
-        private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
-        private List<Id> results = new ArrayList<>();
-
-        private final EntityIndexBatch batch;
-        private final CandidateResults candidateResults;
-        private final VersionSet versionSet;
-
-
-        public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
-                                final CandidateResults candidateResults ) {
-            this.batch = batch;
-            this.versionSet = versionSet;
-            this.candidateResults = candidateResults;
-            this.results = new ArrayList<>( versionSet.size() );
-        }
-
-
-        /**
-         * Merge our candidates and our entity set into results
-         */
-        public void merge() {
-
-            for ( final CandidateResult candidateResult : candidateResults ) {
-                validate( candidateResult );
-            }
-
-            batch.execute();
-        }
-
-
-        public List<Id> collectResults() {
-            return results;
-        }
-
-
-        /**
-         * Validate each candidate results vs the data loaded from cass
-         * @param candidateResult
-         */
-        private void validate( final CandidateResult candidateResult ) {
-
-            final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
-
-            final UUID candidateVersion = candidateResult.getVersion();
-
-            final UUID entityVersion = logEntry.getVersion();
-
-            final Id entityId = logEntry.getEntityId();
-
-            //entity is newer than ES version
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
-
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
-                    new Object[] { searchEdge, entityId, entityVersion } );
-                batch.deindex( searchEdge, entityId, entityVersion );
-                return;
-            }
-
-            //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
-            //remove the ES record, since the read in cass should cause a read repair, just ignore
-            if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
-                final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
-                logger.warn(
-                    "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}.  Repair "
-                        + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-            }
-
-            //they're the same add it
-
-            results.add( entityId );
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
deleted file mode 100644
index cc96633..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class CollectionRefsVerifier extends VersionVerifier {
-
-
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-        List<EntityRef> refs = new ArrayList<EntityRef>(ids.size());
-        for ( Id id : ids ) {
-            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
-        }
-        return Results.fromRefList( refs );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 94c91d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ApplicationEntityIndex applicationEntityIndex;
-
-
-    public CollectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
-        final ApplicationEntityIndex applicationEntityIndex ) {
-        this.entityCollectionManager = entityCollectionManager;
-        this.applicationEntityIndex = applicationEntityIndex;
-    }
-
-
-    @Override
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
-                                    final Query.Level resultsLevel ) {
-
-        ResultsVerifier verifier;
-
-        if ( resultsLevel == Query.Level.REFS ) {
-            verifier = new CollectionRefsVerifier();
-        }
-        else if ( resultsLevel == Query.Level.IDS ) {
-            verifier = new IdsVerifier();
-        }
-        else {
-            verifier = new EntityVerifier( Query.MAX_LIMIT );
-        }
-
-        return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope,
-            scope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
deleted file mode 100644
index 6b7bdde..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
-
-
-/**
- * Verifier for creating connections
- */
-public class ConnectionRefsVerifier extends VersionVerifier {
-
-
-    private final EntityRef ownerId;
-    private final String connectionType;
-
-
-    public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) {
-        this.ownerId = ownerId;
-        this.connectionType = connectionType;
-    }
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-        List<ConnectionRef> refs = new ArrayList<>();
-        for ( Id id : ids ) {
-            refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid())  ));
-        }
-
-        return Results.fromConnections( refs );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 55b95a9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ApplicationEntityIndex applicationEntityIndex;
-    private final EntityRef ownerId;
-    private final String connectionType;
-
-
-    public ConnectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
-                                               final ApplicationEntityIndex applicationEntityIndex, final EntityRef ownerId,
-                                               final String connectionType ) {
-        this.entityCollectionManager = entityCollectionManager;
-        this.applicationEntityIndex = applicationEntityIndex;
-        this.ownerId = ownerId;
-        this.connectionType = connectionType;
-    }
-
-
-    @Override
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
-                                    final Query.Level resultsLevel ) {
-
-        ResultsVerifier verifier;
-
-        if ( resultsLevel == Query.Level.REFS ) {
-            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
-        }
-        else if ( resultsLevel == Query.Level.IDS ) {
-            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
-            ;
-        }
-        else {
-            verifier = new EntityVerifier( Query.MAX_LIMIT );
-        }
-
-        return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope, scope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
deleted file mode 100644
index 6e170f8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.persistence.index.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<Results> {
-
-    private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
-
-    private final ResultsLoaderFactory resultsLoaderFactory;
-
-    private final ApplicationScope applicationScope;
-
-    private final ApplicationEntityIndex entityIndex;
-
-    private final SearchEdge indexScope;
-
-    private final SearchTypes types;
-
-    private final String query;
-
-    private final Optional<Integer> setOffsetFromCursor;
-
-    private final int limit;
-
-    private int offset;
-
-
-    private Results currentResults;
-
-    private boolean moreToLoad = true;
-
-
-
-
-    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
-                                       final ApplicationScope applicationScope, final SearchEdge indexScope,
-                                       final SearchTypes types, final String query, final Optional<Integer> setOffsetFromCursor, final int limit ) {
-        this.resultsLoaderFactory = resultsLoaderFactory;
-        this.applicationScope = applicationScope;
-        this.entityIndex = entityIndex;
-        this.indexScope = indexScope;
-        this.types = types;
-        this.setOffsetFromCursor = setOffsetFromCursor;
-
-        //we must deep copy the query passed.  Otherwise we will modify it's state with cursors.  Won't fix, not relevant
-        //once we start subscribing to streams.
-        this.query = query;
-        this.limit = limit;
-    }
-
-
-    @Override
-    public Iterator<Results> iterator() {
-        return this;
-    }
-
-
-    private void loadNextPage() {
-        // Because of possible stale entities, which are filtered out by buildResults(),
-        // we loop until the we've got enough results to satisfy the query limit.
-
-        final int maxQueries = 10; // max re-queries to satisfy query limit
-
-
-        Results results = null;
-        int queryCount = 0;
-
-
-        CandidateResults crs = null;
-
-        int newLimit = limit;
-
-        while ( queryCount++ < maxQueries ) {
-
-            crs = entityIndex.search( indexScope, types, query, newLimit , offset);
-
-
-            logger.debug( "Calling build results with crs {}", crs );
-            results = buildResults( indexScope, crs );
-
-            /**
-             * In an edge case where we delete stale entities, we could potentially get less results than expected.
-             * This will only occur once during the repair phase.
-             * We need to ensure that we short circuit before we overflow the requested limit during a repair.
-             */
-            if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { // no results, no cursor, can't get more
-                break;
-            }
-
-
-            //we didn't load anything, but there was a cursor, this means a read repair occured.  We have to short
-            //circuit to avoid over returning the result set
-
-
-            // need to query for more
-            // ask for just what we need to satisfy, don't want to exceed limit
-            newLimit =  newLimit - results.size();
-
-            logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
-                limit, newLimit, queryCount
-            } );
-        }
-
-        //now set our cursor if we have one for the next iteration
-        if ( results.hasCursor() ) {
-            moreToLoad = true;
-        }
-
-        else {
-            moreToLoad = false;
-        }
-
-//
-//        //set our select subjects into our query if provided
-//        if(crs != null){
-//            query.setSelectSubjects( crs.getGetFieldMappings() );
-//        }
-//
-
-        //set our current results and the flag
-        this.currentResults = results;
-    }
-
-
-
-    /**
-     * Build results from a set of candidates, and discard those that represent stale indexes.
-     *
-     * @param indexScope The index scope to execute the search on
-     * @param crs Candidates to be considered for results
-     */
-    private Results buildResults( final SearchEdge indexScope, final CandidateResults crs ) {
-
-        logger.debug( "buildResults()  from {} candidates", crs.size() );
-
-        //get an instance of our results loader
-        final ResultsLoader resultsLoader =
-            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, Query.Level.ALL_PROPERTIES );
-
-        //load the results
-        final Results results = resultsLoader.loadResults(crs);
-
-        //signal for post processing
-        resultsLoader.postProcess();
-
-        //set offset into query
-
-        logger.debug( "Returning results size {}", results.size() );
-
-        return results;
-    }
-
-
-    @Override
-    public boolean hasNext() {
-
-        //we've tried to load and it's empty and we have more to load, load the next page
-        if ( currentResults == null ) {
-            //there's nothing left to load, nothing to do
-            if ( !moreToLoad ) {
-                return false;
-            }
-
-            //load the page
-
-            loadNextPage();
-        }
-
-
-        //see if our current results are not null
-        return currentResults != null;
-    }
-
-
-    @Override
-    public Results next() {
-        if ( !hasNext() ) {
-            throw new NoSuchElementException( "No more results present" );
-        }
-
-        final Results toReturn = currentResults;
-
-        currentResults = null;
-
-        return toReturn;
-    }
-
-    @Override
-    public void remove() {
-        throw new RuntimeException("Remove not implemented!!");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
deleted file mode 100644
index d73c731..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityFactory;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
-
-
-/**
- * A loader that verifies versions are correct in cassandra and match elasticsearch
- */
-public class EntityVerifier implements ResultsVerifier {
-
-    private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
-
-    private EntitySet ids;
-
-    private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping;
-
-
-    public EntityVerifier( final int maxSize ) {
-        this.entityMapping = new HashMap<>( maxSize );
-    }
-
-
-    @Override
-    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
-        ids = ecm.load( idsToLoad ).toBlocking().last();
-        logger.debug("loadResults() asked for {} ids and got {}", idsToLoad.size(), ids.size());
-    }
-
-
-    @Override
-    public boolean isValid( final CandidateResult candidateResult ) {
-        final Id entityId = candidateResult.getId();
-
-        final MvccEntity savedEntity = ids.getEntity( entityId );
-
-        //version wasn't found deindex
-        if ( savedEntity == null ) {
-            logger.warn( "Version for Entity {}:{} not found", entityId.getType(), entityId.getUuid() );
-            return false;
-        }
-
-        final UUID candidateVersion = candidateResult.getVersion();
-        final UUID savedVersion = savedEntity.getVersion();
-
-        if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 0 ) {
-            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
-                    entityId.getUuid(), entityId.getType(), candidateVersion, savedEntity
-            } );
-
-            return false;
-        }
-
-
-        final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = savedEntity.getEntity();
-
-        if ( !entity.isPresent() ) {
-            logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, this is a bug ",
-                    entityId.getUuid(), savedEntity.getEntity() );
-            return false;
-        }
-
-        entityMapping.put( entityId, entity.get() );
-
-        return true;
-    }
-
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-
-        final List<Entity> ugEntities = new ArrayList<>( ids.size() );
-
-        for ( final Id id : ids ) {
-            final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id );
-
-            Entity entity = EntityFactory.newEntity( id.getUuid(), id.getType() );
-
-            Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
-            entity.addProperties( entityMap );
-            ugEntities.add( entity );
-        }
-
-        return Results.fromEntities( ugEntities );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
deleted file mode 100644
index ade64a2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-
-
-public class FilteringLoader implements ResultsLoader {
-
-    private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class );
-
-    private final EntityCollectionManager entityCollectionManager;
-    private final ResultsVerifier resultsVerifier;
-    private final ApplicationScope applicationScope;
-    private final SearchEdge indexScope;
-    private final EntityIndexBatch indexBatch;
-
-
-    /**
-     * Create an instance of a filter loader
-     *
-     * @param entityCollectionManager The entityCollectionManagerFactory
-     * @param resultsVerifier The verifier to verify the candidate results
-     * @param applicationScope The application scope to perform the load
-     * @param indexScope The index scope used in the search
-     */
-    protected FilteringLoader( final  EntityCollectionManager entityCollectionManager, final ApplicationEntityIndex applicationEntityIndex,  final ResultsVerifier resultsVerifier,
-                               final ApplicationScope applicationScope, final SearchEdge indexScope ) {
-
-        this.entityCollectionManager = entityCollectionManager;
-        this.resultsVerifier = resultsVerifier;
-        this.applicationScope = applicationScope;
-        this.indexScope = indexScope;
-
-        indexBatch = applicationEntityIndex.createBatch();
-    }
-
-
-    @Override
-    public Results loadResults( final CandidateResults crs ) {
-
-
-        if ( crs.size() == 0 ) {
-            return new Results();
-        }
-
-
-        // For each entity, holds the index it appears in our candidates for keeping ordering correct
-        final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
-
-        // Maps the entity ids to our candidates
-        final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
-
-
-        final Iterator<CandidateResult> iter = crs.iterator();
-
-
-        // TODO, in this case we're "optimizing" due to the limitations of collection scope.
-        // Perhaps  we should change the API to just be an application, then an "owner" scope?
-
-        // Go through the candidates and group them by scope for more efficient retrieval.
-        // Also remove duplicates before we even make a network call
-        for ( int i = 0; iter.hasNext(); i++ ) {
-
-            final CandidateResult currentCandidate = iter.next();
-
-            final Id entityId = currentCandidate.getId();
-
-            //check if we've seen this candidate by id
-            final CandidateResult previousMax = maxCandidateMapping.get( entityId );
-
-            //its not been seen, save it
-            if ( previousMax == null ) {
-                maxCandidateMapping.put( entityId, currentCandidate );
-                orderIndex.put( entityId, i );
-                continue;
-            }
-
-            //we have seen it, compare them
-
-            final UUID previousMaxVersion = previousMax.getVersion();
-
-            final UUID currentVersion = currentCandidate.getVersion();
-
-
-            final CandidateResult toRemove;
-            final CandidateResult toKeep;
-
-            //current is newer than previous.  Remove previous and keep current
-            if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
-                toRemove = previousMax;
-                toKeep = currentCandidate;
-            }
-            //previously seen value is newer than current.  Remove the current and keep the previously seen value
-            else {
-                toRemove = currentCandidate;
-                toKeep = previousMax;
-            }
-
-            //this is a newer version, we know we already have a stale entity, add it to be cleaned up
-
-
-            //de-index it
-            logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
-                    entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion()
-                } );
-
-            //deindex this document, and remove the previous maxVersion
-            //we have to deindex this from our ownerId, since this is what gave us the reference
-            indexBatch.deindex( indexScope, toRemove );
-
-
-            //TODO, fire the entity repair cleanup task here instead of de-indexing
-
-            //replace the value with a more current version
-            maxCandidateMapping.put( entityId, toKeep );
-            orderIndex.put( entityId, i );
-        }
-
-
-        //now everything is ordered, and older versions are removed.  Batch fetch versions to verify
-        // existence and correct versions
-
-        final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
-
-
-        final Collection<Id> idsToLoad =
-            Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() {
-                @Nullable
-                @Override
-                public Id apply( @Nullable final CandidateResult input ) {
-                    //NOTE this is never null, we won't need to check
-                    return input.getId();
-                }
-            } );
-
-
-        //now using the scope, load the collection
-
-
-
-        //load the results into the loader for this scope for validation
-        resultsVerifier.loadResults( idsToLoad, entityCollectionManager );
-
-        //now let the loader validate each candidate.  For instance, the "max" in this candidate
-        //could still be a stale result, so it needs validated
-        for ( final Id requestedId : idsToLoad ) {
-
-            final CandidateResult cr = maxCandidateMapping.get( requestedId );
-
-            //ask the loader if this is valid, if not discard it and de-index it
-            if ( !resultsVerifier.isValid( cr ) ) {
-                indexBatch.deindex( indexScope, cr );
-                continue;
-            }
-
-            //if we get here we're good, we need to add this to our results
-            final int candidateIndex = orderIndex.get( requestedId );
-
-            sortedResults.put( candidateIndex, requestedId );
-        }
-
-
-        // NOTE DO NOT execute the batch here.
-        // It changes the results and we need consistent paging until we aggregate all results
-        return resultsVerifier.getResults( sortedResults.values() );
-    }
-
-
-    @Override
-    public void postProcess() {
-        this.indexBatch.execute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
deleted file mode 100644
index 4a3bfcd..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class IdsVerifier extends VersionVerifier {
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-
-        final List<UUID> returnIds = new ArrayList<>( ids.size() );
-
-        for ( final Id id : ids ) {
-            returnIds.add( id.getUuid() );
-        }
-
-
-        return Results.fromIdList( returnIds );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
deleted file mode 100644
index c2a3e9a..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.CandidateResults;
-
-
-/**
- * Interface for loading results
- */
-public interface ResultsLoader {
-
-    /**
-     * Using the candidate results, load our results.  Should filter stale results
-     * @param  crs The candidate result set
-     * @return Results.  Null safe, but may be empty
-     */
-    public Results loadResults( final CandidateResults crs);
-
-    /**
-     * Post process the load operation
-     */
-    public void postProcess();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
deleted file mode 100644
index 3ccca1b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public interface ResultsLoaderFactory {
-
-    /**
-     * Get the loader for results
-     * @param applicationScope The application scope used to load results
-     * @param indexScope The index scope used in the search
-     * @param
-     */
-    ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge indexScope,
-                             final Query.Level resultsLevel );
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
deleted file mode 100644
index fe72ca2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface ResultsVerifier {
-
-    /**
-     * Load all the candidate ides for verification
-     * @param ids The Id's to load
-     * @param ecm The entity collection manager
-     */
-    public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
-
-    /**
-     * Return true if the candidate result is a valid result that should be retained. If it should
-     * not it should also be removed from the list of possible return values in this loader
-     * @param candidateResult
-     */
-    public boolean isValid(CandidateResult candidateResult);
-
-
-    /**
-     * Load the result set with the given ids
-     * @return
-     */
-    public Results getResults(Collection<Id> ids);
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
deleted file mode 100644
index c49fb28..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-
-
-/**
- * A loader that verifies versions are correct in Cassandra and match ElasticSearch
- */
-public abstract class VersionVerifier implements ResultsVerifier {
-
-    private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class );
-
-    private VersionSet ids;
-
-
-    @Override
-    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
-        ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
-    }
-
-
-    @Override
-    public boolean isValid( final CandidateResult candidateResult ) {
-        final Id entityId = candidateResult.getId();
-
-        final MvccLogEntry version = ids.getMaxVersion( entityId );
-
-        //version wasn't found ,deindex
-        if ( version == null ) {
-            logger.warn( "Version for Entity {}:{} not found",
-                    entityId.getUuid(), entityId.getUuid() );
-
-            return false;
-        }
-
-        final UUID savedVersion = version.getVersion();
-
-        if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
-            logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
-                new Object[] {
-                    entityId.getUuid(),
-                    entityId.getType(),
-                    candidateResult.getVersion(),
-                    savedVersion
-            } );
-
-            return false;
-        }
-
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
deleted file mode 100644
index 6230147..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
- * be removed
- */
-public class EntityIdFilter extends AbstractPipelineOperation<Id, Id> implements Filter<Id, Id> {
-
-    private final Id entityId;
-
-
-    @Inject
-    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
-
-
-
-
-    @Override
-    public Observable<Id> call( final Observable<Id> idObservable ) {
-        return Observable.just( entityId );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
deleted file mode 100644
index dd6b9b8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
- */
-public class EntityLoadCollector extends AbstractPipelineOperation<Id, ResultsPage>
-    implements Collector<Id, ResultsPage> {
-
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-
-
-    @Inject
-    public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-    }
-
-
-    @Override
-    public Observable<ResultsPage> call( final Observable<Id> observable ) {
-
-
-        final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
-
-        final Observable<EntitySet> entitySetObservable = observable.buffer( pipelineContext.getLimit() ).flatMap(
-            bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) );
-
-
-        final Observable<ResultsPage> resultsObservable = entitySetObservable
-
-            .flatMap( entitySet -> {
-
-                //get our entites and filter missing ones, then collect them into a results object
-                final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() );
-
-
-                //convert them to our old entity model, then filter abscent, meaning they weren't found
-                final Observable<List<Entity>> entitiesPageObservable =
-                    mvccEntityObservable.filter( mvccEntity -> mvccEntity.getEntity().isPresent() )
-                                        .map( mvccEntity -> mvccEntity.getEntity().get() ).toList();
-
-                //convert them to a list, then map them into results
-                return entitiesPageObservable.map( entities -> new ResultsPage( entities ) );
-            } );
-
-
-        return resultsObservable;
-    }
-
-    /**
-     * Map a new cp entity to an old entity.  May be null if not present
-     */
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
index e0f69cf..42b352b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
@@ -20,8 +20,9 @@
 package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
@@ -39,7 +40,7 @@ import rx.Observable;
 /**
  * Filter should take and Id and a graph edge, and ensure the connection between the two exists
  */
-public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOperation<Id, Id> implements
+public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements
     Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
@@ -55,12 +56,13 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
 
 
     @Override
-    public Observable<Id> call( final Observable<Id> idObservable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
 
         final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
 
-        return idObservable.flatMap( id -> {
+        return filterValueObservable.flatMap( filterValue -> {
             final String edgeTypeName = getEdgeName();
+            final Id id = filterValue.getValue();
 
             //create our search
             final SearchByEdge searchByEdge =
@@ -68,7 +70,7 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
                     Optional.absent() );
 
             //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
-            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() );
+            return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 4dd34fc..503fcf9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
 
 
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -38,7 +39,7 @@ import rx.Observable;
 /**
  * Command for reading graph edges
  */
-public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
 
     private final GraphManagerFactory graphManagerFactory;
 
@@ -52,7 +53,8 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
 
 
     @Override
-    public Observable<Id> call( final Observable<Id> observable ) {
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+
 
         //get the graph manager
         final GraphManager graphManager =
@@ -63,10 +65,11 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
 
 
         //return all ids that are emitted from this edge
-        return observable.flatMap( id -> {
+        return previousIds.flatMap( previousFilterValue -> {
 
             //set our our constant state
             final Optional<Edge> startFromCursor = getSeekValue();
+            final Id id = previousFilterValue.getValue();
 
 
             final SimpleSearchByEdgeType search =
@@ -78,9 +81,9 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
              */
             return graphManager.loadEdgesFromSource( search )
                 //set our cursor every edge we traverse
-                .doOnNext( edge -> setCursor( edge ) )
+
                     //map our id from the target edge
-                .map( edge -> edge.getTargetNode() );
+                .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) );
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
new file mode 100644
index 0000000..5a0e026
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier.  Once full traversal has been implemented, this should
+ * be removed
+ */
+public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> {
+
+    private final Id entityId;
+
+
+    @Inject
+    public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
+
+
+
+    @Override
+    public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+        //ignore what our input was, and simply emit the id specified
+       return filterValueObservable.map( idFilterResult ->  new FilterResult( entityId, idFilterResult.getPath() ));
+
+    }
+}


Mime
View raw message