usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [3/5] incubator-usergrid git commit: Login to ensure that export and import wait until all data is exported or imported before existing.
Date Mon, 06 Jul 2015 23:46:27 GMT
Login to ensure that export and import wait until all data is exported or imported before existing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/59ea6e54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/59ea6e54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/59ea6e54

Branch: refs/heads/master
Commit: 59ea6e54fd48d069843cc8ceccd48cf93b0cb2a3
Parents: 67d3322
Author: Dave Johnson <snoopdave@apache.org>
Authored: Mon Jul 6 19:19:31 2015 -0400
Committer: Dave Johnson <snoopdave@apache.org>
Committed: Mon Jul 6 19:19:31 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java | 48 +++++++++++---------
 .../org/apache/usergrid/tools/ImportAdmins.java | 44 ++++++++++++------
 2 files changed, 56 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ea6e54/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index d3d6371..d726ad3 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -167,11 +167,6 @@ public class ExportAdmins extends ExportingToolBase {
             ids = em.searchCollection( em.getApplicationRef(), "users", query );
         }
 
-        adminUserWriter.setDone( true );
-        for (AdminUserReader aur : readers) {
-            aur.setDone( true );
-        }
-
         logger.debug( "Waiting for write thread to complete" );
         
         boolean done = false;
@@ -202,7 +197,7 @@ public class ExportAdmins extends ExportingToolBase {
      */
     private void buildOrgMap() throws Exception {
 
-        logger.info("Building org map");
+        logger.info( "Building org map" );
 
         ExecutorService execService = Executors.newFixedThreadPool( this.readThreadCount
);
 
@@ -269,8 +264,6 @@ public class ExportAdmins extends ExportingToolBase {
 
     public class AdminUserReader implements Runnable {
 
-        private boolean done = true;
-
         private final BlockingQueue<UUID> readQueue;
         private final BlockingQueue<AdminUserWriteTask> writeQueue;
 
@@ -299,9 +292,7 @@ public class ExportAdmins extends ExportingToolBase {
                 UUID uuid = null;
                 try {
                     uuid = readQueue.poll( 30, TimeUnit.SECONDS );
-                    //logger.debug("Got item from entityId queue: " + uuid );
-
-                    if ( uuid == null && done ) {
+                    if ( uuid == null ) {
                         break;
                     }
 
@@ -335,8 +326,31 @@ public class ExportAdmins extends ExportingToolBase {
 
             for (String dictionary : dictionaries) {
                 Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary
);
+                if ( dict.isEmpty() ) {
+                    continue;
+                }
                 task.dictionariesByName.put( dictionary, dict );
             }
+           
+            if ( task.dictionariesByName.isEmpty() ) {
+                logger.error( "User {}:{} has no dictionaries",
+                        new Object[]{task.adminUser.getName(), task.adminUser.getUuid() }
);
+                
+            } else if ( task.dictionariesByName.get("credentials") == null ) {
+                logger.error( "User {}:{} has no credentials dictionary",
+                        new Object[]{task.adminUser.getName(), task.adminUser.getUuid() }
);
+                
+            } else {
+                if ( task.dictionariesByName.get("credentials").get("password") == null )
{
+                    logger.error( "User {}:{} has no password in credential dictionary",
+                            new Object[]{task.adminUser.getName(), task.adminUser.getUuid()
} );
+                }
+                if ( task.dictionariesByName.get("credentials").get("secret") == null ) {
+                    logger.error( "User {}:{} has no secret in credential dictionary",
+                            new Object[]{task.adminUser.getName(), task.adminUser.getUuid()
} );
+                }
+            }
+                
         }
 
         private void addOrganizationsToTask(AdminUserWriteTask task) throws Exception {
@@ -360,16 +374,10 @@ public class ExportAdmins extends ExportingToolBase {
                         task.adminUser.getUuid() } );
             }
         }
-
-        public void setDone(boolean done) {
-            this.done = done;
-        }
     }
 
     class AdminUserWriter implements Runnable {
 
-        private boolean done = false;
-
         private final BlockingQueue<AdminUserWriteTask> taskQueue;
 
         public AdminUserWriter( BlockingQueue<AdminUserWriteTask> taskQueue ) {
@@ -404,7 +412,7 @@ public class ExportAdmins extends ExportingToolBase {
 
                 try {
                     AdminUserWriteTask task = taskQueue.poll( 30, TimeUnit.SECONDS );
-                    if ( task == null && done ) {
+                    if ( task == null ) {
                         break;
                     }
 
@@ -494,10 +502,6 @@ public class ExportAdmins extends ExportingToolBase {
 
             jg.writeEndArray();
         }
-
-        public void setDone(boolean done) {
-            this.done = done;
-        }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ea6e54/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index 39384e6..4bcdc0b 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -90,6 +90,11 @@ public class ImportAdmins extends ToolBase {
     AtomicInteger userCount = new AtomicInteger( 0 );
     AtomicInteger metadataCount = new AtomicInteger( 0 );
 
+    AtomicInteger writeEmptyCount = new AtomicInteger( 0 );
+    AtomicInteger auditEmptyCount = new AtomicInteger( 0 );
+    AtomicInteger metadataEmptyCount = new AtomicInteger( 0 );
+    
+
 
     @Override
     @SuppressWarnings("static-access")
@@ -146,7 +151,7 @@ public class ImportAdmins extends ToolBase {
             writeThreadCount = Integer.parseInt( line.getOptionValue(WRITE_THREAD_COUNT));
         }
 
-        importAdminUsers(writeThreadCount, auditThreadCount);
+        importAdminUsers( writeThreadCount, auditThreadCount );
 
         importMetadata( writeThreadCount );
     }
@@ -159,7 +164,7 @@ public class ImportAdmins extends ToolBase {
 
         String[] fileNames = importDir.list(new PrefixFileFilter(ExportAdmins.ADMIN_USERS_PREFIX
+ "."));
 
-        logger.info("Applications to read: " + fileNames.length);
+        logger.info( "Applications to read: " + fileNames.length );
 
         for (String fileName : fileNames) {
             try {
@@ -210,8 +215,8 @@ public class ImportAdmins extends ToolBase {
             workQueue.add( entityProps );
         }
 
-        waitForQueueAndMeasure(workQueue, adminWriteThreads, "Admin Write");
-        waitForQueueAndMeasure(auditQueue, adminAuditThreads, "Admin Audit");
+        waitForQueueAndMeasure(workQueue, writeEmptyCount, adminWriteThreads, "Admin Write");
+        waitForQueueAndMeasure(auditQueue, auditEmptyCount, adminAuditThreads, "Admin Audit");
 
         logger.info("----- End: Imported {} admin users from file {}",
                 count, adminUsersFile.getAbsolutePath());
@@ -220,12 +225,13 @@ public class ImportAdmins extends ToolBase {
     }
 
     private static void waitForQueueAndMeasure(final BlockingQueue workQueue,
+                                               final AtomicInteger emptyCounter,
                                                final Map<Stoppable, Thread> threadMap,
                                                final String identifier) throws InterruptedException
{
         double rateAverageSum = 0;
         int iterations = 0;
 
-        while (!workQueue.isEmpty()) {
+        while ( emptyCounter.get() < threadMap.size() ) {
             iterations += 1;
 
             int sizeLast = workQueue.size();
@@ -312,8 +318,8 @@ public class ImportAdmins extends ToolBase {
     private void importMetadata(int writeThreadCount) throws Exception {
 
         String[] fileNames = importDir.list(
-                new PrefixFileFilter(ExportAdmins.ADMIN_USER_METADATA_PREFIX + "."));
-        logger.info("Metadata files to read: " + fileNames.length);
+                new PrefixFileFilter( ExportAdmins.ADMIN_USER_METADATA_PREFIX + "." ) );
+        logger.info( "Metadata files to read: " + fileNames.length );
 
         for (String fileName : fileNames) {
             try {
@@ -373,17 +379,22 @@ public class ImportAdmins extends ToolBase {
             if (jsonToken.equals(JsonToken.FIELD_NAME) && depth == 2) {
 
                 jp.nextToken();
-
                 String entityOwnerId = jp.getCurrentName();
-                EntityRef entityRef = em.getRef(UUID.fromString(entityOwnerId));
 
-                Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs(Map.class);
-
-                workQueue.put(new ImportMetadataTask(entityRef, metadata));
+                try {
+                    EntityRef entityRef = em.getRef( UUID.fromString( entityOwnerId ) );
+                    Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs(
Map.class );
+                    
+                    workQueue.put( new ImportMetadataTask( entityRef, metadata ) );
+                    logger.debug( "Put user {} in metadata queue", entityRef.getUuid() );
+                    
+                } catch ( Exception e ) {
+                    logger.debug( "Error with user {}, not putting in metadata queue", entityOwnerId
);
+                }
             }
         }
 
-        waitForQueueAndMeasure(workQueue, metadataWorkerThreadMap, "Metadata Load");
+        waitForQueueAndMeasure(workQueue, metadataEmptyCount, metadataWorkerThreadMap, "Metadata
Load");
 
         logger.info("----- End of metadata -----");
         jp.close();
@@ -523,9 +534,11 @@ public class ImportAdmins extends ToolBase {
 
                     if (entityProps == null) {
                         logger.warn("Reading from AUDIT queue was null!");
+                        auditEmptyCount.getAndIncrement();
                         Thread.sleep(1000);
                         continue;
                     }
+                    auditEmptyCount.set(0);
 
                     count++;
                     long startTime = System.currentTimeMillis();
@@ -599,15 +612,16 @@ public class ImportAdmins extends ToolBase {
 
                     if (task == null) {
                         logger.warn("Reading from metadata queue was null!");
+                        metadataEmptyCount.getAndIncrement();
                         Thread.sleep(1000);
                         continue;
                     }
+                    metadataEmptyCount.set(0);
                     
                     long startTime = System.currentTimeMillis();
                     
                     importEntityMetadata(em, task.entityRef, task.metadata);
                     
-                    metadataCount.addAndGet( 1 );
                     long stopTime = System.currentTimeMillis();
                     long duration = stopTime - startTime;
                     durationSum += duration;
@@ -662,9 +676,11 @@ public class ImportAdmins extends ToolBase {
 
                     if (entityProps == null) {
                         logger.warn("Reading from admin import queue was null!");
+                        writeEmptyCount.getAndIncrement();
                         Thread.sleep( 1000 );
                         continue;
                     }
+                    writeEmptyCount.set(0);
 
                     // Import/create the entity
                     UUID uuid = getId(entityProps);


Mime
View raw message