usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [1/3] usergrid git commit: Remove orphaned collection edges with no target entity
Date Fri, 13 Oct 2017 20:37:29 GMT
Repository: usergrid
Updated Branches:
  refs/heads/master 4a659106e -> b93f8d441


Remove orphaned collection edges with no target entity

Also remove extraneous entityID sort from the default Elasticsearch sort


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f4842b06
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f4842b06
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f4842b06

Branch: refs/heads/master
Commit: f4842b06cd9fbc03af03039cc1f050a887c9b45c
Parents: 4a65910
Author: Mike Dunker <mdunker@google.com>
Authored: Fri Oct 13 10:13:55 2017 -0700
Committer: Mike Dunker <mdunker@google.com>
Committed: Fri Oct 13 10:13:55 2017 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |  3 +
 .../read/traverse/EntityLoadVerifyFilter.java   | 72 +++++++++++++++++---
 .../pipeline/read/traverse/ReadRepairFig.java   | 38 +++++++++++
 .../impl/SearchRequestBuilderStrategy.java      |  3 +-
 4 files changed, 106 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index a0748e6..909c073 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.corepersistence.migration.CoreMigration;
 import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
 import org.apache.usergrid.corepersistence.migration.DeDupConnectionDataMigration;
 import org.apache.usergrid.corepersistence.pipeline.PipelineModule;
+import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadRepairFig;
 import org.apache.usergrid.corepersistence.rx.impl.*;
 import org.apache.usergrid.corepersistence.service.*;
 import org.apache.usergrid.locking.guice.LockModule;
@@ -160,6 +161,8 @@ public class CoreModule extends AbstractModule {
 
         install( new GuicyFigModule( EntityManagerFig.class ) );
 
+        install( new GuicyFigModule( ReadRepairFig.class ) );
+
         install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
 
         install( new GuicyFigModule( ServiceSchedulerFig.class ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
index 3f6e26d..7cc9735 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
@@ -23,6 +23,11 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,11 +57,17 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>,
Fil
     private static final Logger logger = LoggerFactory.getLogger( EntityLoadVerifyFilter.class
);
 
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
+    private final ReadRepairFig readRepairFig;
 
 
     @Inject
-    public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory
) {
+    public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                   final GraphManagerFactory graphManagerFactory,
+                                   final ReadRepairFig readRepairFig) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
+        this.readRepairFig = readRepairFig;
     }
 
 
@@ -64,8 +75,9 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>,
Fil
     public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>>
filterResultObservable ) {
 
 
+        final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
         final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope()
);
+            entityCollectionManagerFactory.createCollectionManager( applicationScope );
 
         //it's more efficient to make 1 network hop to get everything, then drop our results
if required
         final Observable<FilterResult<Entity>> entityObservable =
@@ -80,9 +92,10 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>,
Fil
                               .flatMap( ids -> entityCollectionManager.load( ids ) );
 
 
-                //now we have a collection, validate our canidate set is correct.
-
-                return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet,
bufferedIds ) )
+                //now we have a collection, validate our candidate set is correct.
+                GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope);
+                return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope,
graphManager,
+                    entitySet, bufferedIds, readRepairFig ) )
                                           .doOnNext( entityCollector -> entityCollector.merge()
).flatMap(
                         entityCollector -> Observable.from( entityCollector.getResults()
) );
             } );
@@ -102,12 +115,20 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>,
Fil
 
         private final List<FilterResult<Id>> candidateResults;
         private final EntitySet entitySet;
+        private final GraphManager graphManager;
+        private final ApplicationScope applicationScope;
+        private final ReadRepairFig readRepairFig;
 
 
-        public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>>
candidateResults ) {
+        public EntityVerifier( final ApplicationScope applicationScope, final GraphManager
graphManager,
+                               final EntitySet entitySet, final List<FilterResult<Id>>
candidateResults,
+                               final ReadRepairFig readRepairFig) {
+            this.applicationScope = applicationScope;
+            this.graphManager = graphManager;
             this.entitySet = entitySet;
             this.candidateResults = candidateResults;
             this.results = new ArrayList<>( entitySet.size() );
+            this.readRepairFig = readRepairFig;
         }
 
 
@@ -137,11 +158,44 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>,
Fil
 
             //doesn't exist warn and drop
             if ( entity == null || !entity.getEntity().isPresent() ) {
-                logger.warn( "Read graph edge and received candidate with entityId {}, yet
was not found in cassandra."
-                    + "  Ignoring since this could be a region sync issue", candidateId );
+
+                // look for orphaned edges
+                String edgeTypeName = CpNamingUtils.getEdgeTypeFromCollectionName(Schema.defaultCollectionName(candidateId.getType()));
+                final SearchByEdge searchByEdge =
+                    new SimpleSearchByEdge( applicationScope.getApplication(), edgeTypeName,
candidateId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        Optional.absent() );
+
+                int edgesDeleted = 0;
+                List<MarkedEdge> edgeList = graphManager.loadEdgeVersions(searchByEdge).toList().toBlocking().last();
+                if (edgeList.size() > 0) {
+                    MarkedEdge firstEdge = edgeList.get(0);
+                    long currentTimestamp = CpNamingUtils.createGraphOperationTimestamp();
+                    long edgeTimestamp = firstEdge.getTimestamp();
+                    long timestampDiff = currentTimestamp - edgeTimestamp;
+                    long orphanDelaySecs = readRepairFig.getEdgeOrphanDelaySecs();
+                    // timestamps are in 100 nanoseconds, convert from seconds
+                    long allowedDiff = orphanDelaySecs * 1000L * 1000L * 10L;
+                    if (timestampDiff > allowedDiff) {
+                        // edges must be orphans, delete edges
+                        for (MarkedEdge edge: edgeList) {
+                            MarkedEdge markedEdge = graphManager.markEdge(edge).toBlocking().lastOrDefault(null);
+                            if (markedEdge != null) {
+                                graphManager.deleteEdge(markedEdge).toBlocking().lastOrDefault(null);
+                                edgesDeleted += 1;
+                            }
+                        }
+                    }
+                }
+
+                if (edgesDeleted > 0) {
+                    logger.warn("Read graph edge and received candidate with entityId {},
yet was not found in cassandra."
+                        + "  Deleted {} edges.", candidateId, edgesDeleted);
+                } else {
+                    logger.warn("Read graph edge and received candidate with entityId {},
yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync issue", candidateId);
+                }
 
 
-                //TODO trigger an audit after a fail count where we explicitly try to repair
from other regions
 
                 return;
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
new file mode 100644
index 0000000..2f3e6e4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+/**
+ * Read repair fig
+ */
+@FigSingleton
+public interface ReadRepairFig extends GuicyFig {
+
+    @Key( "usergrid.edge.orphan.delete.delay.secs" )
+    @Default( "86400" ) // 1 day
+    long getEdgeOrphanDelaySecs();
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
index c5f07b4..5812c6f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
@@ -141,8 +141,9 @@ public class SearchRequestBuilderStrategy {
         //sort by the edge timestamp
         srb.addSort( SortBuilders.fieldSort( IndexingUtils.EDGE_TIMESTAMP_FIELDNAME ).order(
SortOrder.DESC ) );
 
+        // removing secondary sort by entity ID -- takes ES resources and provides no benefit
         //sort by the entity id if our times are equal
-        srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME ).order( SortOrder.ASC
) );
+        //srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME ).order(
SortOrder.ASC ) );
 
         return;
     }


Mime
View raw message