usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/2] incubator-usergrid git commit: Added tests for usergrid 607 and other issues were fixed in previous releases. Added new method that gets all edge documents out of elastic search and returns them. Added comment to IndexEdge about the differentiation
Date Wed, 13 May 2015 19:27:52 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-607 [created] 79aea6ab9


Added tests for usergrid 607 and other issues were fixed in previous releases.
Added new method that gets all edge documents out of elastic search and returns them.
Added comment to IndexEdge about the differentiation between IndexEdge and SearchEdge.
Removed the timeout in EsIndexBufferConsumerImpl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b52ac2f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b52ac2f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b52ac2f4

Branch: refs/heads/USERGRID-607
Commit: b52ac2f47d918a3390fd2bc65fbd543c2e78067a
Parents: f5cb788
Author: GERey <greyes@apigee.com>
Authored: Wed May 13 12:26:58 2015 -0700
Committer: GERey <greyes@apigee.com>
Committed: Wed May 13 12:26:58 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/InMemoryAsyncEventService.java  |   1 -
 .../corepersistence/index/IndexServiceImpl.java |  62 +++++-
 .../corepersistence/index/IndexServiceTest.java | 203 ++++++++++++++++++-
 .../index/ApplicationEntityIndex.java           |  12 ++
 .../usergrid/persistence/index/IndexEdge.java   |   6 +
 .../impl/EsApplicationEntityIndexImpl.java      |  53 ++++-
 .../index/impl/EsEntityIndexBatchImpl.java      |   3 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |   5 +-
 8 files changed, 333 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6faa695..a05057d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -80,7 +80,6 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
     @Override
     public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge
) {
-
         run( eventBuilder.queueDeleteEdge( applicationScope, edge ) );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8185b4d..d616090 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.Iterator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,10 +35,13 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.CandidateResults;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -163,16 +168,54 @@ public class IndexServiceImpl implements IndexService {
     }
 
 
+    //Steps to delete an IndexEdge.
+    //1.Take the search edge given and search for all the edges in elasticsearch matching
that search edge
+    //2. Batch Delete all of those edges returned in the previous search.
+    //TODO: optimize loops further.
     @Override
     public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope
applicationScope,
                                                               final Edge edge ) {
 
+        final Observable<IndexOperationMessage> batches =
+            Observable.just( edge ).flatMap( edgeValue -> {
+                final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex(
applicationScope );
+                EntityIndexBatch batch = ei.createBatch();
 
-        //TODO, query ES and remove this edge
 
-        throw new NotImplementedException( "Implement me" );
-    }
+                //review why generating the Scope from the Source  and the target node makes
sense.
+                final IndexEdge fromSource = generateScopeFromSource( edge );
+                final Id targetId = edge.getTargetNode();
+
+
+                CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource,
targetId, 1000, 0 );
+
+                //Should loop thorugh and query for all documents and if there are no documents
then the loop should exit.
+                do{
+                    batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed,
batch );
+                    if(!targetEdgesToBeDeindexed.getOffset().isPresent())
+                        break;
+                    targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId,
1000, targetEdgesToBeDeindexed.getOffset().get() );
+                }while(!targetEdgesToBeDeindexed.isEmpty());
+
+
+
+                final IndexEdge fromTarget = generateScopeFromTarget( edge );
+                final Id sourceId = edge.getSourceNode();
+
+                CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget,
sourceId, 1000, 0 );
 
+                do{
+                    batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed,
batch );
+                    if(!sourceEdgesToBeDeindexed.getOffset().isPresent())
+                        break;
+                    sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId,
1000, sourceEdgesToBeDeindexed.getOffset().get()  );
+                }while(!sourceEdgesToBeDeindexed.isEmpty());
+
+                return batch.execute();
+            } );
+
+        return ObservableTimer.time( batches, addTimer );
+    }
 
     @Override
     public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope
applicationScope,
@@ -222,6 +265,19 @@ public class IndexServiceImpl implements IndexService {
                               .map( edge -> generateScopeFromTarget( edge ) );
     }
 
+    /**
+     * Takes in candidate results and uses the iterator to create batch commands
+     */
+
+    public EntityIndexBatch deindexBatchIteratorResolver(IndexEdge edge,CandidateResults
edgesToBeDeindexed, EntityIndexBatch batch){
+        Iterator itr = edgesToBeDeindexed.iterator();
+        while( itr.hasNext() ) {
+            CandidateResult cr = ( CandidateResult ) itr.next();
+            batch.deindex( edge, cr.getId(), cr.getVersion() );
+        }
+        return batch;
+    }
+
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 5570df1..fffe6a2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -29,6 +31,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.usergrid.corepersistence.TestIndexModule;
+import org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.Candidate;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -173,9 +176,9 @@ public class IndexServiceTest {
         final ApplicationEntityIndex applicationEntityIndex =
             entityIndexFactory.createApplicationEntityIndex( applicationScope );
 
+        //query until the collection edge is available
         final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
collectionEdge );
 
-        //query until it's available
         final CandidateResults collectionResults = getResults( applicationEntityIndex, collectionSearchEdge,
             SearchTypes.fromTypes( testEntity.getId().getType() ), 1);
 
@@ -184,10 +187,9 @@ public class IndexServiceTest {
         assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() );
 
 
+        //query until the connection edge is available
         final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
connectionSearch );
 
-
-        //query until it's available
         final CandidateResults connectionResults = getResults( applicationEntityIndex, connectionSearchEdge,
             SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
 
@@ -247,7 +249,7 @@ public class IndexServiceTest {
         //get the first and last edge
         final Edge connectionSearch = connectionSearchEdges.get( 0 );
 
-        final Edge lastSearch = connectionSearchEdges.get( edgeCount-1 );
+        final Edge lastSearch = connectionSearchEdges.get( edgeCount - 1 );
 
 
         //now index
@@ -298,6 +300,199 @@ public class IndexServiceTest {
     }
 
 
+
+
+    /**
+     *This test must do the following steps.
+     *1. Delete the connecting edge
+     *2. Run the deleteIndexEdge using the search edge that gets returned from the delete
call
+     *3. Run queries to make sure that the collection entity still exists while the connection
search edge is gone.
+     * @throws InterruptedException
+     */
+    @Test
+    public void testDeleteSingleConnectingEdge() throws InterruptedException {
+        ApplicationScope applicationScope =
+            new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) );
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope
);
+
+        final Entity testEntity = new Entity( createId( "thing" ), UUIDGenerator.newTimeUUID()
);
+        testEntity.setField( new StringField( "string", "foo" ) );
+
+        //write entity
+        final Edge connectionSearch =
+            createTestEntityAndReturnConnectionEdge( applicationScope,graphManager,testEntity
);
+
+
+        final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
connectionSearch );
+
+        //step 1
+        //(We need to mark then delete things in the graph manager.)
+        final Edge toBeMarkedEdge = graphManager.markEdge( connectionSearch ).toBlocking().firstOrDefault(
null );
+        final Edge toBeDeletedEdge = graphManager.deleteEdge( toBeMarkedEdge ).toBlocking().firstOrDefault(
null );
+
+        //step 2
+        IndexOperationMessage indexOperationMessage =
+            indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ).toBlocking().lastOrDefault(
+            null );
+
+        assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
+
+        //ensure that no edges remain
+        final CandidateResults connectionResultsEmpty = applicationEntityIndex.search( connectionSearchEdge,
+            SearchTypes.fromTypes( "things" ),"select *",10,0 );
+
+        assertEquals(0,connectionResultsEmpty.size());
+
+    }
+
+    @Test
+    public void testDeleteMultipleConnectingEdges() throws InterruptedException {
+        ApplicationScope applicationScope =
+            new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) );
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope
);
+
+        final Entity testEntity = new Entity( createId( "thing" ), UUIDGenerator.newTimeUUID()
);
+        testEntity.setField( new StringField( "string", "foo" ) );
+
+
+        //write entity
+        Edge collectionEdge = createEntityandCollectionEdge( applicationScope, graphManager,
testEntity );
+        //Write multiple connection edges
+        final int edgeCount = 5;
+
+        final List<Edge> connectionSearchEdges = createConnectionSearchEdges( testEntity,
graphManager, edgeCount );
+
+        indexService.indexEntity( applicationScope, testEntity ).toBlocking().getIterator();
+
+        //query until results are available for collections
+        final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
collectionEdge );
+        getResults( applicationEntityIndex, collectionSearchEdge,
+            SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
+
+        for(int i = 0; i < edgeCount; i++) {
+            //query until results are available for connections
+
+            final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
connectionSearchEdges.get( i ) );
+            getResults( applicationEntityIndex, connectionSearchEdge, SearchTypes.fromTypes(
testEntity.getId().getType() ),
+                 1 );
+        }
+
+        for(Edge connectionSearch:connectionSearchEdges) {
+            //step 1
+            final Edge toBeMarkedEdge = graphManager.markEdge( connectionSearch ).toBlocking().firstOrDefault(
null );
+            final Edge toBeDeletedEdge = graphManager.deleteEdge( toBeMarkedEdge ).toBlocking().first();
+
+            final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
connectionSearch );
+
+            //step 2
+            IndexOperationMessage indexOperationMessage =
+                indexService.deleteIndexEdge( applicationScope, toBeDeletedEdge ).toBlocking().lastOrDefault(
null );
+
+            //not sure if this is still valid.
+            assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
+
+            //ensure that no edges remain
+            final CandidateResults connectionResultsEmpty = applicationEntityIndex.search(
connectionSearchEdge,
+                SearchTypes.fromTypes( "things" ),"select *",10,0 );
+
+            assertEquals(0,connectionResultsEmpty.size());
+        }
+    }
+
+
+    /**
+     * Refactor into two methods . Should only have one responsiblitiy.
+     * @param applicationScope
+     * @param graphManager
+     * @return
+     */
+    private Edge createTestEntityAndReturnConnectionEdge( final ApplicationScope applicationScope,
+                                                          final GraphManager graphManager,
+                                                          final Entity testEntity) {
+        final EntityCollectionManager collectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final ApplicationEntityIndex applicationEntityIndex =
+            entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+        final Edge collectionEdge =
+            createEntityandCollectionEdge( applicationScope, graphManager, testEntity );
+
+
+        //create our connection edge.
+        final Id connectingId = createId( "connecting" );
+        final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId, "likes",
testEntity.getId() );
+
+        final Edge connectionSearch = graphManager.writeEdge( connectionEdge ).toBlocking().last();
+
+        //now index
+        indexService.indexEntity( applicationScope, testEntity ).count().toBlocking().last();
+
+        //query until results are available for collections
+        final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
collectionEdge );
+        getResults( applicationEntityIndex, collectionSearchEdge, SearchTypes.fromTypes(
testEntity.getId().getType() ),
+            1 );
+
+        //query until results are available for connections
+        final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource(
connectionSearch );
+        getResults( applicationEntityIndex, connectionSearchEdge, SearchTypes.fromTypes(
testEntity.getId().getType() ),
+            1 );
+
+        return connectionSearch;
+    }
+
+
+    /**
+     * Creates an entity along with the corresponding collection edge.
+     * @param applicationScope
+     * @param graphManager
+     * @param testEntity
+     * @return
+     */
+    private Edge createEntityandCollectionEdge( final ApplicationScope applicationScope,
+                                                final GraphManager graphManager, final Entity
testEntity) {
+
+        final EntityCollectionManager collectionManager =
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        collectionManager.write( testEntity ).toBlocking().last();
+
+        //create our collection edge
+        final Edge collectionEdge =
+            CpNamingUtils.createCollectionEdge( applicationScope.getApplication(), testEntity.getId().getType(),
+                testEntity.getId() );
+
+        graphManager.writeEdge( collectionEdge ).toBlocking().last();
+        return collectionEdge;
+    }
+
+
+    private List<Edge> createConnectionSearchEdges(
+        final Entity testEntity, final GraphManager graphManager, final int edgeCount ) {
+
+        final List<Edge> connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap(
integer -> {
+
+            //create our connection edge.
+            final Id connectingId = createId( "connecting" );
+            final Edge connectionEdge = CpNamingUtils.createConnectionEdge( connectingId,
"likes", testEntity.getId() );
+
+            return graphManager.writeEdge( connectionEdge ).subscribeOn( Schedulers.io()
);
+        }, 20).toList().toBlocking().last();
+
+
+        assertEquals( "All edges saved", edgeCount, connectionSearchEdges.size() );
+        return connectionSearchEdges;
+    }
+
+
     private CandidateResults getResults( final ApplicationEntityIndex applicationEntityIndex,
                                          final SearchEdge searchEdge, final SearchTypes searchTypes,
                                          final int expectedSize ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index f4f7bcd..20c3f12 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.index;
 
 
+import org.apache.usergrid.persistence.model.entity.Id;
+
 import rx.Observable;
 
 /**
@@ -45,6 +47,16 @@ public interface ApplicationEntityIndex {
     CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes,
final String query,
                              final int limit, final int offset );
 
+
+    /**
+     * Same as search, just iterates all documents that match the index edge exactly
+     * @param edge
+     * @param limit
+     * @param offset
+     * @return
+     */
+    CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId,  final
int limit, final int offset);
+
     /**
      * delete all application records
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
index 145ccba..36aa240 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexEdge.java
@@ -25,6 +25,12 @@ package org.apache.usergrid.persistence.index;
 /**
  * An edge to perform indexing on.
  */
+/**
+ *source node - edge - target node =>
+ IndexEdge => sourceNode, edgeType, timestamp, NodeType we're indexing (TARGET)
+ SearchEdge = > sourceNode, edgeType, NodeType (Target)
+
+ */
 public interface IndexEdge extends SearchEdge {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 5b67060..f004ce8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -30,6 +30,8 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
 import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.FilterBuilders;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.TermQueryBuilder;
@@ -46,16 +48,19 @@ import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.CandidateResult;
 import org.apache.usergrid.persistence.index.CandidateResults;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.query.ParsedQuery;
 import org.apache.usergrid.persistence.index.query.ParsedQueryBuilder;
+import org.apache.usergrid.persistence.index.query.tree.QueryVisitor;
 import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
@@ -148,7 +153,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex
{
 
         final ParsedQuery parsedQuery = ParsedQueryBuilder.build( query );
 
-        final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes,
parsedQuery, limit, offset );
+        final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes,
parsedQuery, limit, offset )
+                                                      .setTimeout( TimeValue.timeValueMillis(queryTimeout)
);
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Searching index (read alias): {}\n  nodeId: {}, edgeType: {},
 \n type: {}\n   query: {} ",
@@ -159,7 +165,50 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex
{
         try {
             //Added For Graphite Metrics
             Timer.Context timeSearch = searchTimer.time();
-            searchResponse = srb.execute().actionGet(queryTimeout);
+            searchResponse = srb.execute().actionGet();
+            timeSearch.stop();
+        }
+        catch ( Throwable t ) {
+            logger.error( "Unable to communicate with Elasticsearch", t );
+            failureMonitor.fail( "Unable to execute batch", t );
+            throw t;
+        }
+        failureMonitor.success();
+
+        return parseResults(searchResponse, parsedQuery, limit, offset);
+    }
+
+
+    @Override
+    public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId,
final int limit,
+                                                 final int offset ) {
+        /**
+         * Take a list of IndexEdge, with an entityId
+         and query Es directly for matches
+
+         */
+        IndexValidationUtils.validateSearchEdge( edge );
+        Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+        SearchResponse searchResponse;
+
+        final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+        FilterBuilders.idsFilter( entityId.getType() );
+
+        final SearchRequestBuilder srb = searchRequest.getBuilder( edge, SearchTypes.fromTypes(
entityId.getType() ),
+            parsedQuery, limit, offset ).setTimeout( TimeValue.timeValueMillis( queryTimeout
) );
+
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Searching for edge index (read alias): {}\n  nodeId: {}, edgeType:
{},  \n type: {}\n   query: {} ",
+                this.alias.getReadAlias(), edge.getNodeId(), edge.getEdgeName(),
+                SearchTypes.fromTypes( entityId.getType()), srb );
+        }
+
+        try {
+            //Added For Graphite Metrics
+            Timer.Context timeSearch = searchTimer.time();
+            searchResponse = srb.execute().actionGet();
             timeSearch.stop();
         }
         catch ( Throwable t ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 956f1d5..e50ee73 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -130,7 +130,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         IndexOperationMessage tempContainer = container;
         container = new IndexOperationMessage();
 
-        return indexBatchBufferProducer.put( tempContainer );
+        Observable observable = indexBatchBufferProducer.put( tempContainer );
+        return  observable;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b52ac2f4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 469893e..5a71444 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -170,6 +170,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
 
             final Observable<IndexOperation> index = Observable.from( batch.getIndexRequests()
);
             final Observable<DeIndexOperation> deIndex = Observable.from( batch.getDeIndexRequests()
);
+            if(indexOperationSetSize +  deIndexOperationSetSize > 0){
+                batch.done();
+            }
 
             return Observable.merge( index, deIndex );
         } );
@@ -230,7 +233,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
 
 
         try {
-            responses = bulkRequest.execute().actionGet( indexFig.getWriteTimeout() );
+            responses = bulkRequest.execute().actionGet( );
         }
         catch ( Throwable t ) {
             log.error( "Unable to communicate with elasticsearch" );


Mime
View raw message