usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [1/2] Completing rename of "importUG" package to "importer" to avoid capital letters in package name.
Date Wed, 15 Oct 2014 14:01:38 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import 52dbfe43a -> 5d6c4accc


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
new file mode 100644
index 0000000..400439a
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -0,0 +1,977 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.batch.service.SchedulerService;
+import org.apache.usergrid.management.ApplicationInfo;
+import org.apache.usergrid.management.ManagementService;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.FileImport;
+import org.apache.usergrid.persistence.entities.Import;
+import org.apache.usergrid.persistence.entities.JobData;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.usergrid.corepersistence.CpEntityManagerFactory.MANAGEMENT_APPLICATION_ID;
+import org.apache.usergrid.persistence.index.query.Query.Level;
+
+
+public class ImportServiceImpl implements ImportService {
+
+    public static final String IMPORT_ID = "importId";
+    public static final String IMPORT_JOB_NAME = "importJob";
+    public static final String FILE_IMPORT_ID = "fileImportId";
+    public static final String FILE_IMPORT_JOB_NAME = "fileImportJob";
+
+    //Amount of time that has passed before sending another heart beat in millis
+    public static final int TIMESTAMP_DELTA = 5000;
+
+    private static final Logger logger = LoggerFactory.getLogger(ImportServiceImpl.class);
+
+    //injected the Entity Manager Factory
+    protected EntityManagerFactory emf;
+    private ArrayList<File> files;
+
+    //dependency injection
+    private SchedulerService sch;
+
+    //inject Management Service to access Organization Data
+    private ManagementService managementService;
+    private JsonFactory jsonFactory = new JsonFactory();
+
+    private int entityCount = 0;
+
+    /**
+     * This schedules the main import Job
+     *
+     * @param config configuration of the job to be scheduled
+     * @return it returns the UUID of the scheduled job
+     * @throws Exception
+     */
+    @Override
+    public UUID schedule(Map<String, Object> config) throws Exception {
+
+        if (config == null) {
+            logger.error("import information cannot be null");
+            return null;
+        }
+
+        EntityManager rootEm = null;
+        try {
+            rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+            Set<String> collections = rootEm.getApplicationCollections();
+            if (!collections.contains("imports")) {
+                rootEm.createApplicationCollection("imports");
+            }
+        } catch (Exception e) {
+            logger.error("application doesn't exist within the current context");
+            return null;
+        }
+
+        Import importUG = new Import();
+
+        // create the import entity to store all metadata about the import job
+        try {
+            importUG = rootEm.create(importUG);
+        } catch (Exception e) {
+            logger.error("Import entity creation failed");
+            return null;
+        }
+
+        // update state for import job to created
+        importUG.setState(Import.State.CREATED);
+        rootEm.update(importUG);
+
+        // set data to be transferred to importInfo
+        JobData jobData = new JobData();
+        jobData.setProperty("importInfo", config);
+        jobData.setProperty(IMPORT_ID, importUG.getUuid());
+
+        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
+
+        // schedule import job
+        sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData);
+
+        // update state for import job to created
+        importUG.setState(Import.State.SCHEDULED);
+        rootEm.update(importUG);
+
+        return importUG.getUuid();
+    }
+
+    /**
+     * This schedules the sub  FileImport Job
+     *
+     * @param file file to be scheduled
+     * @return it returns the UUID of the scheduled job
+     * @throws Exception
+     */
+    public UUID scheduleFile(String file, EntityRef importRef) throws Exception {
+
+        EntityManager rootEm = null;
+
+        try {
+            rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        } catch (Exception e) {
+            logger.error("application doesn't exist within the current context");
+            return null;
+        }
+
+        // create a FileImport entity to store metadata about the fileImport job
+        FileImport fileImport = new FileImport();
+
+        fileImport.setFileName(file);
+        fileImport.setCompleted(false);
+        fileImport.setLastUpdatedUUID(" ");
+        fileImport.setErrorMessage(" ");
+        fileImport.setState(FileImport.State.CREATED);
+        fileImport = rootEm.create(fileImport);
+
+        Import importUG = rootEm.get(importRef, Import.class);
+
+        try {
+            // create a connection between the main import job and the sub FileImport Job
+            rootEm.createConnection(importUG, "includes", fileImport);
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+            return null;
+        }
+
+        // mark the File Import Job as created
+        fileImport.setState(FileImport.State.CREATED);
+        rootEm.update(fileImport);
+
+        //set data to be transferred to the FileImport Job
+        JobData jobData = new JobData();
+        jobData.setProperty("File", file);
+        jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
+
+        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
+
+        //schedule file import job
+        sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
+
+        //update state of the job to Scheduled
+        fileImport.setState(FileImport.State.SCHEDULED);
+        rootEm.update(fileImport);
+
+        return fileImport.getUuid();
+    }
+
+    /**
+     * Query Entity Manager for the state of the Import Entity. This corresponds to the GET /import
+     *
+     * @return String
+     */
+    @Override
+    public String getState(UUID uuid) throws Exception {
+        if (uuid == null) {
+            logger.error("UUID passed in cannot be null.");
+            return "UUID passed in cannot be null";
+        }
+
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        //retrieve the import entity.
+        Import importUG = rootEm.get(uuid, Import.class);
+
+        if (importUG == null) {
+            logger.error("no entity with that uuid was found");
+            return "No Such Element found";
+        }
+        return importUG.getState().toString();
+    }
+
+    /**
+     * Query Entity Manager for the error message generated for an import job.
+     *
+     * @return String
+     */
+    @Override
+    public String getErrorMessage(final UUID uuid) throws Exception {
+
+        //get application entity manager
+
+        if (uuid == null) {
+            logger.error("UUID passed in cannot be null.");
+            return "UUID passed in cannot be null";
+        }
+
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        //retrieve the import entity.
+        Import importUG = rootEm.get(uuid, Import.class);
+
+        if (importUG == null) {
+            logger.error("no entity with that uuid was found");
+            return "No Such Element found";
+        }
+        return importUG.getErrorMessage().toString();
+    }
+
+    /**
+     * Returns the Import Entity that stores all meta-data for the particular import Job
+     * @param jobExecution the import job details
+     * @return Import Entity
+     * @throws Exception
+     */
+    @Override
+    public Import getImportEntity(final JobExecution jobExecution) throws Exception {
+
+        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+        EntityManager importManager = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        return importManager.get(importId, Import.class);
+    }
+
+
+    /**
+     * Returns the File Import Entity that stores all meta-data for the particular sub File import Job
+     * @param jobExecution the file import job details
+     * @return File Import Entity
+     * @throws Exception
+     */
+    @Override
+    public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
+
+        UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
+        EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+        return em.get(fileImportId, FileImport.class);
+    }
+
+    /**
+     * This returns the temporary files downloaded form s3
+     */
+    @Override
+    public ArrayList<File> getEphemeralFile() {
+        return files;
+    }
+
+    public SchedulerService getSch() {
+        return sch;
+    }
+
+
+    public void setSch(final SchedulerService sch) {
+        this.sch = sch;
+    }
+
+
+    public EntityManagerFactory getEmf() {
+        return emf;
+    }
+
+
+    public void setEmf(final EntityManagerFactory emf) {
+        this.emf = emf;
+    }
+
+
+    public ManagementService getManagementService() {
+
+        return managementService;
+    }
+
+
+    public void setManagementService(final ManagementService managementService) {
+        this.managementService = managementService;
+    }
+
+    /**
+     * This method gets the files from s3 and also creates sub-jobs for each file i.e. File Import Jobs
+     * @param jobExecution the job created by the scheduler with all the required config data
+     * @throws Exception
+     */
+    @Override
+    public void doImport(JobExecution jobExecution) throws Exception {
+
+        Map<String, Object> config = (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
+        Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
+        S3Import s3Import = null;
+
+        if (config == null) {
+            logger.error("Import Information passed through is null");
+            return;
+        }
+
+        //get the entity manager for the application, and the entity that this Import corresponds to.
+        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+
+        EntityManager rooteEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        Import importUG = rooteEm.get(importId, Import.class);
+
+        //update the entity state to show that the job has officially started.
+        importUG.setState(Import.State.STARTED);
+        importUG.setStarted(System.currentTimeMillis());
+        importUG.setErrorMessage(" ");
+        rooteEm.update(importUG);
+        try {
+            if (s3PlaceHolder != null) {
+                s3Import = (S3Import) s3PlaceHolder;
+            } else {
+                s3Import = new S3ImportImpl();
+            }
+        } catch (Exception e) {
+            logger.error("S3Import doesn't exist");
+            importUG.setErrorMessage(e.getMessage());
+            importUG.setState(Import.State.FAILED);
+            rooteEm.update(importUG);
+            return;
+        }
+
+        try {
+
+            if (config.get("organizationId") == null) {
+                logger.error("No organization could be found");
+                importUG.setErrorMessage("No organization could be found");
+                importUG.setState(Import.State.FAILED);
+                rooteEm.update(importUG);
+                return;
+            } else if (config.get("applicationId") == null) {
+                //import All the applications from an organization
+                importApplicationsFromOrg((UUID) config.get("organizationId"), config, jobExecution, s3Import);
+            } else if (config.get("collectionName") == null) {
+                //imports an Application from a single organization
+                importApplicationFromOrg((UUID) config.get("organizationId"), (UUID) config.get("applicationId"), config, jobExecution, s3Import);
+            } else {
+                //imports a single collection from an app org combo
+                importCollectionFromOrgApp((UUID) config.get("applicationId"), config, jobExecution, s3Import);
+            }
+
+        } catch (OrganizationNotFoundException e) {
+            importUG.setErrorMessage(e.getMessage());
+            importUG.setState(Import.State.FINISHED);
+            rooteEm.update(importUG);
+            return;
+        } catch (ApplicationNotFoundException e) {
+            importUG.setErrorMessage(e.getMessage());
+            importUG.setState(Import.State.FINISHED);
+            rooteEm.update(importUG);
+            return;
+        }
+
+        if (files.size() == 0) {
+
+            importUG.setState(Import.State.FINISHED);
+            importUG.setErrorMessage("no files found in the bucket with the relevant context");
+            rooteEm.update(importUG);
+
+        } else {
+
+            Map<String, Object> fileMetadata = new HashMap<String, Object>();
+
+            ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
+
+            // schedule each file as a separate job
+            for (File eachfile : files) {
+
+                UUID jobID = scheduleFile(eachfile.getPath(), importUG);
+                Map<String, Object> fileJobID = new HashMap<String, Object>();
+                fileJobID.put("FileName", eachfile.getName());
+                fileJobID.put("JobID", jobID.toString());
+                value.add(fileJobID);
+            }
+
+            fileMetadata.put("files", value);
+            importUG.addProperties(fileMetadata);
+            rooteEm.update(importUG);
+        }
+        return;
+    }
+
+    /**
+     * Imports a specific collection from an org-app combo.
+     */
+    private void importCollectionFromOrgApp(UUID applicationUUID, final Map<String, Object> config,
+                                            final JobExecution jobExecution, S3Import s3Import) throws Exception {
+
+        //retrieves import entity
+        Import importUG = getImportEntity(jobExecution);
+        ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
+
+        if (application == null) {
+            throw new ApplicationNotFoundException("Application Not Found");
+        }
+
+        String collectionName = config.get("collectionName").toString();
+
+        // prepares the prefix path for the files to be imported depending on the endpoint being hit
+        String appFileName = prepareInputFileName("application", application.getName(), collectionName);
+
+        files = fileTransfer(importUG, appFileName, config, s3Import, 0);
+
+    }
+
+    /**
+     * Imports a specific applications from an organization
+     */
+    private void importApplicationFromOrg(UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
+                                          final JobExecution jobExecution, S3Import s3Import) throws Exception {
+
+        //retrieves import entity
+        Import importUG = getImportEntity(jobExecution);
+
+        ApplicationInfo application = managementService.getApplicationInfo(applicationId);
+
+        if (application == null) {
+            throw new ApplicationNotFoundException("Application Not Found");
+        }
+
+        // prepares the prefix path for the files to be imported depending on the endpoint being hit
+        String appFileName = prepareInputFileName("application", application.getName(), null);
+
+        files = fileTransfer(importUG, appFileName, config, s3Import, 1);
+
+    }
+
+    /**
+     * Imports All Applications from an Organization
+     */
+    private void importApplicationsFromOrg(UUID organizationUUID, final Map<String, Object> config,
+                                           final JobExecution jobExecution, S3Import s3Import) throws Exception {
+
+        // retrieves import entity
+        Import importUG = getImportEntity(jobExecution);
+        String appFileName = null;
+
+        OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
+        if (organizationInfo == null) {
+            throw new OrganizationNotFoundException("Organization Not Found");
+        }
+
+        // prepares the prefix path for the files to be imported depending on the endpoint being hit
+        appFileName = prepareInputFileName("organization", organizationInfo.getName(), null);
+
+        files = fileTransfer(importUG, appFileName, config, s3Import, 2);
+
+    }
+
+    /**
+     * prepares the prefix path for the files to be imported depending on the endpoint being hit
+     * @param type just a label such us: organization, application.
+     * @return the file name concatenated with the type and the name of the collection
+     */
+    protected String prepareInputFileName(String type, String name, String CollectionName) {
+        StringBuilder str = new StringBuilder();
+
+        // in case of type organization --> the file name will be "<org_name>/"
+        if (type.equals("organization")) {
+
+            str.append(name);
+            str.append("/");
+
+        } else if (type.equals("application")) {
+
+            // in case of type application --> the file name will be "<org_name>/<app_name>."
+            str.append(name);
+            str.append(".");
+
+            if (CollectionName != null) {
+
+                // in case of type application and collection import --> the file name will be "<org_name>/<app_name>.<collection_name>."
+                str.append(CollectionName);
+                str.append(".");
+
+            }
+        }
+
+        String inputFileName = str.toString();
+
+        return inputFileName;
+    }
+
+    /**
+     * @param importUG    Import instance
+     * @param appFileName the base file name for the files to be downloaded
+     * @param config      the config information for the import job
+     * @param s3Import    s3import instance
+     * @param type        it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
+     * @return
+     */
+    public ArrayList<File> fileTransfer(Import importUG, String appFileName, Map<String, Object> config,
+                                        S3Import s3Import, int type) throws Exception {
+
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        ArrayList<File> files = new ArrayList<File>();
+
+        try {
+            files = s3Import.copyFromS3(config, appFileName, type);
+        } catch (Exception e) {
+            importUG.setErrorMessage(e.getMessage());
+            importUG.setState(Import.State.FAILED);
+            rootEm.update(importUG);
+        }
+        return files;
+    }
+
+    /**
+     * The loops through each temp file and parses it to store the entities from the json back into usergrid
+     *
+     * @throws Exception
+     */
+    @Override
+    public void FileParser(JobExecution jobExecution) throws Exception {
+
+        // add properties to the import entity
+        FileImport fileImport = getFileImportEntity(jobExecution);
+
+        File file = new File(jobExecution.getJobData().getProperty("File").toString());
+
+        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        rootEm.update(fileImport);
+
+        boolean completed = fileImport.getCompleted();
+
+        // on resume, completed files will not be traversed again
+        if (!completed) {
+
+            // validates the JSON structure
+            if (isValidJSON(file, rootEm, fileImport)) {
+
+                // mark the File import job as started
+                fileImport.setState(FileImport.State.STARTED);
+                rootEm.update(fileImport);
+
+                // gets the application anme from the filename
+                String applicationName = file.getPath().split("\\.")[0];
+
+                ApplicationInfo application = managementService.getApplicationInfo(applicationName);
+
+                JsonParser jp = getJsonParserForFile(file);
+
+                // incase of resume, retrieve the last updated UUID for this file
+                String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
+
+                // this handles partially completed files by updating entities from the point of failure
+                if (!lastUpdatedUUID.equals(" ")) {
+
+                    // go till the last updated entity
+                    while (!jp.getText().equals(lastUpdatedUUID)) {
+                        jp.nextToken();
+                    }
+
+                    // skip the last one and start from the next one
+                    while (!(jp.getCurrentToken() == JsonToken.END_OBJECT && jp.nextToken() == JsonToken.START_OBJECT)) {
+                        jp.nextToken();
+                    }
+                }
+
+                // get to start of an object i.e next entity.
+                while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
+                    jp.nextToken();
+                }
+
+                // get entity manager for the application
+                EntityManager em = emf.getEntityManager(application.getId());
+
+                while (jp.nextToken() != JsonToken.END_ARRAY) {
+                    // import the entities in this file
+                    importEntityStuff(jp, em, rootEm, fileImport, jobExecution);
+                }
+                jp.close();
+
+                // Updates the state of file import job
+                if (!fileImport.getState().equals("FAILED")) {
+
+                    // mark file as completed
+                    fileImport.setCompleted(true);
+                    fileImport.setState(FileImport.State.FINISHED);
+                    rootEm.update(fileImport);
+
+                    //check other files status and mark the status of import Job as Finished if all others are finished
+                    Results ImportJobResults = rootEm.getConnectingEntities(fileImport, "includes", null, Level.ALL_PROPERTIES);
+                    List<Entity> importEntity = ImportJobResults.getEntities();
+                    UUID importId = importEntity.get(0).getUuid();
+                    Import importUG = rootEm.get(importId, Import.class);
+
+                    Results entities = rootEm.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES);
+                    List<Entity> importFile = entities.getEntities();
+
+                    int count = 0;
+                    for (Entity eachEntity : importFile) {
+                        FileImport fi = rootEm.get(eachEntity.getUuid(), FileImport.class);
+                        if (fi.getState().toString().equals("FINISHED")) {
+                            count++;
+                        } else if (fi.getState().toString().equals("FAILED")) {
+                            importUG.setState(Import.State.FAILED);
+                            rootEm.update(importUG);
+                            break;
+                        }
+                    }
+                    if (count == importFile.size()) {
+                        importUG.setState(Import.State.FINISHED);
+                        rootEm.update(importUG);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Checks if a file is a valid JSON
+     * @param collectionFile the file being validated
+     * @param rootEm    the Entity Manager for the Management application
+     * @param fileImport the file import entity
+     * @return
+     * @throws Exception
+     */
+    private boolean isValidJSON(File collectionFile, EntityManager rootEm, FileImport fileImport) throws Exception {
+
+        boolean valid = false;
+        try {
+            final JsonParser jp = jsonFactory.createJsonParser(collectionFile);
+            while (jp.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException e) {
+            e.printStackTrace();
+            fileImport.setErrorMessage(e.getMessage());
+            rootEm.update(fileImport);
+        } catch (IOException e) {
+            fileImport.setErrorMessage(e.getMessage());
+            rootEm.update(fileImport);
+        }
+        return valid;
+    }
+
+
+    /**
+     * Gets the JSON parser for given file
+     * @param collectionFile the file for which JSON parser is required
+     * @return
+     * @throws Exception
+     */
+    private JsonParser getJsonParserForFile(File collectionFile) throws Exception {
+        JsonParser jp = jsonFactory.createJsonParser(collectionFile);
+        jp.setCodec(new ObjectMapper());
+        return jp;
+    }
+
+
+
+    /**
+     * Imports the entity's connecting references (collections, connections and dictionaries)
+     * @param jp  JsonParser pointing to the beginning of the object.
+     * @param em Entity Manager for the application being imported
+     * @param rootEm Entity manager for the root applicaition
+     * @param fileImport the file import entity
+     * @param jobExecution  execution details for the import jbo
+     * @throws Exception
+     */
+    private void importEntityStuff(final JsonParser jp, final EntityManager em, final EntityManager rootEm, final FileImport fileImport, final JobExecution jobExecution) throws Exception {
+
+        final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
+
+        final Observable<WriteEvent> observable = Observable.create(subscribe);
+
+        /**
+         * This is the action we want to perform for every UUID we receive
+         */
+        final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
+            @Override
+            public void call(WriteEvent writeEvent) {
+                writeEvent.doWrite(em, jobExecution, fileImport);
+            }
+
+        };
+
+
+        final AtomicLong entityCounter = new AtomicLong();
+        final AtomicLong eventCounter = new AtomicLong();
+        /**
+         * This is boilerplate glue code.  We have to follow this for the parallel operation.  In the "call"
+         * method we want to simply return the input observable + the chain of operations we want to invoke
+         */
+        observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+            @Override
+            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+
+                /* TODO:
+                 * need to fixed so that number of entities created can be counted correctly
+                 * and also update the last updated UUID for the fileImport which is a must for resumability
+                 */
+//                return entityWrapperObservable.doOnNext(doWork).doOnNext(new Action1<WriteEvent>() {
+//
+//                         @Override
+//                         public void call(WriteEvent writeEvent) {
+//                             if (!(writeEvent instanceof EntityEvent)) {
+//                                 final long val = eventCounter.incrementAndGet();
+//                                 if(val % 50 == 0) {
+//                                     jobExecution.heartbeat();
+//                                 }
+//                                 return;
+//                             }
+//
+//                             final long value = entityCounter.incrementAndGet();
+//                             if (value % 2000 == 0) {
+//                                 try {
+//                                     logger.error("UUID = " +((EntityEvent) writeEvent).getEntityUuid().toString() + " value = " + value +"");
+//                                     fileImport.setLastUpdatedUUID(((EntityEvent) writeEvent).getEntityUuid().toString());
+//                                     //checkpoint the UUID here.
+//                                     rootEm.update(fileImport);
+//                                 } catch(Exception ex) {}
+//                             }
+//                             if(value % 100 == 0) {
+//                                 logger.error("heartbeat sent by " + fileImport.getFileName());
+//                                 jobExecution.heartbeat();
+//                             }
+//                         }
+//                     }
+//                );
+
+                return entityWrapperObservable.doOnNext(doWork);
+            }
+        }, Schedulers.io()).toBlocking().last();
+    }
+
+    private interface WriteEvent {
+        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport);
+    }
+
+    private final class EntityEvent implements WriteEvent {
+        UUID entityUuid;
+        String entityType;
+        Map<String, Object> properties;
+
+        EntityEvent(UUID entityUuid, String entityType, Map<String, Object> properties) {
+            this.entityUuid = entityUuid;
+            this.entityType = entityType;
+            this.properties = properties;
+        }
+
+        public UUID getEntityUuid() {
+            return entityUuid;
+        }
+
+        // Creates entities
+        @Override
+        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
+            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+            try {
+                em.create(entityUuid, entityType, properties);
+            } catch (Exception e) {
+                fileImport.setErrorMessage(e.getMessage());
+                try {
+                    rootEm.update(fileImport);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+
+    private final class ConnectionEvent implements WriteEvent {
+        EntityRef ownerEntityRef;
+        String connectionType;
+        EntityRef entryRef;
+
+        ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef) {
+            this.ownerEntityRef = ownerEntityRef;
+            this.connectionType = connectionType;
+            this.entryRef = entryRef;
+
+        }
+
+        // creates connections between entities
+        @Override
+        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
+            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+            try {
+                em.createConnection(ownerEntityRef, connectionType, entryRef);
+            } catch (Exception e) {
+                fileImport.setErrorMessage(e.getMessage());
+                try {
+                    rootEm.update(fileImport);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+
+    private final class DictionaryEvent implements WriteEvent {
+
+        EntityRef ownerEntityRef;
+        String dictionaryName;
+        Map<String, Object> dictionary;
+
+        DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary) {
+            this.ownerEntityRef = ownerEntityRef;
+            this.dictionaryName = dictionaryName;
+            this.dictionary = dictionary;
+        }
+
+        // adds map to the dictionary
+        @Override
+        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
+            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+            try {
+                em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+            } catch (Exception e) {
+                fileImport.setErrorMessage(e.getMessage());
+                try {
+                    rootEm.update(fileImport);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+
+
+    private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
+        private final JsonParser jp;
+        EntityManager em;
+        EntityManager rootEm;
+        FileImport fileImport;
+
+
+        JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, FileImport fileImport) {
+            this.jp = parser;
+            this.em = em;
+            this.rootEm = rootEm;
+            this.fileImport = fileImport;
+        }
+
+        @Override
+        public void call(final Subscriber<? super WriteEvent> subscriber) {
+
+            WriteEvent entityWrapper = null;
+            EntityRef ownerEntityRef = null;
+            String entityUuid = "";
+            String entityType = "";
+            try {
+                while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
+                    String collectionName = jp.getCurrentName();
+
+                    // create the  wrapper for connections
+                    if (collectionName.equals("connections")) {
+
+                        jp.nextToken(); // START_OBJECT
+                        while (jp.nextToken() != JsonToken.END_OBJECT) {
+                            String connectionType = jp.getCurrentName();
+
+                            jp.nextToken(); // START_ARRAY
+                            while (jp.nextToken() != JsonToken.END_ARRAY) {
+                                String entryId = jp.getText();
+
+                                EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
+                                entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
+
+                                // Creates a new subscriber to the observer with the given connection wrapper
+                                subscriber.onNext(entityWrapper);
+                            }
+                        }
+
+                    }
+                    // create the  wrapper for dictionaries
+                    else if (collectionName.equals("dictionaries")) {
+
+                        jp.nextToken(); // START_OBJECT
+                        while (jp.nextToken() != JsonToken.END_OBJECT) {
+
+                            String dictionaryName = jp.getCurrentName();
+
+                            jp.nextToken();
+
+                            Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
+                            entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
+
+                            // Creates a new subscriber to the observer with the given dictionary wrapper
+                            subscriber.onNext(entityWrapper);
+                        }
+                        subscriber.onCompleted();
+
+                    } else {
+
+                        // Regular collections
+                        jp.nextToken(); // START_OBJECT
+
+                        Map<String, Object> properties = new HashMap<String, Object>();
+                        JsonToken token = jp.nextToken();
+
+                        while (token != JsonToken.END_OBJECT) {
+                            if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
+                                String key = jp.getCurrentName();
+                                if (key.equals("uuid")) {
+                                    entityUuid = jp.getText();
+
+                                } else if (key.equals("type")) {
+                                    entityType = jp.getText();
+                                } else if (key.length() != 0 && jp.getText().length() != 0) {
+                                    String value = jp.getText();
+                                    properties.put(key, value);
+                                }
+                            }
+                            token = jp.nextToken();
+                        }
+
+                        ownerEntityRef = new SimpleEntityRef(entityType, UUID.fromString(entityUuid));
+                        entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
+
+                        // Creates a new subscriber to the observer with the given dictionary wrapper
+                        subscriber.onNext(entityWrapper);
+
+                    }
+                }
+            } catch (Exception e) {
+                // skip illegal entity UUID and go to next one
+                fileImport.setErrorMessage(e.getMessage());
+                try {
+                    rootEm.update(fileImport);
+                } catch (Exception ex) {
+                }
+                subscriber.onError(e);
+            }
+        }
+    }
+}
+
+/**
+ * Custom Exception class for Organization Not Found
+ */
+class OrganizationNotFoundException extends Exception {
+    OrganizationNotFoundException(String s) {
+        super(s);
+    }
+}
+
+/**
+ * Custom Exception class for Application Not Found
+ */
+class ApplicationNotFoundException extends Exception {
+    ApplicationNotFoundException(String s) {
+        super(s);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
new file mode 100644
index 0000000..e21fe0a
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.io.File;
+
+/**
+ * Created by ApigeeCorporation on 7/8/14.
+ */
+
+// interface for S3ImportImpl
+public interface S3Import {
+    ArrayList<File> copyFromS3(Map<String, Object> exportInfo, String filename, int type);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
new file mode 100644
index 0000000..ea999f6
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.management.importer;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Module;
+import org.jclouds.ContextBuilder;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.MutableBlobMetadata;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
+import org.jclouds.blobstore.options.ListContainerOptions;
+import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
+import org.jclouds.logging.log4j.config.Log4JLoggingModule;
+import org.jclouds.netty.config.NettyPayloadModule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class S3ImportImpl implements S3Import {
+
+    private BlobStore blobStore;
+    private ArrayList<Blob> blobs = new ArrayList<Blob>();
+    private ArrayList<File> files = new ArrayList<File>();
+    private int i=0;
+
+    /**
+     * Downloads the files from s3 into temp local files
+     * @param importInfo the information entered by the user required to perform import from S3
+     * @param filename the filename generated based on the request URI
+     * @param type  it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
+     * @return It returns an ArrayList of files i.e. the files downloaded from s3
+     */
+    public ArrayList<File> copyFromS3( final Map<String,Object> importInfo, String filename , int type) {
+
+        Map<String,Object> properties = ( Map<String, Object> ) importInfo.get( "properties" );
+
+        Map<String, Object> storage_info = (Map<String,Object>)properties.get( "storage_info" );
+
+        String bucketName = ( String ) storage_info.get( "bucket_location" );
+        String accessId = ( String ) storage_info.get( "s3_access_id" );
+        String secretKey = ( String ) storage_info.get( "s3_key" );
+
+        Properties overrides = new Properties();
+        overrides.setProperty( "s3" + ".identity", accessId );
+        overrides.setProperty( "s3" + ".credential", secretKey );
+
+        final Iterable<? extends Module> MODULES = ImmutableSet
+                .of(new JavaUrlHttpCommandExecutorServiceModule(), new Log4JLoggingModule(),
+                        new NettyPayloadModule());
+
+        BlobStoreContext context =
+                ContextBuilder.newBuilder("s3").credentials( accessId, secretKey ).modules( MODULES )
+                        .overrides( overrides ).buildView( BlobStoreContext.class );
+
+        try{
+
+            blobStore = context.getBlobStore();
+
+            // gets all the files in the bucket recursively
+            PageSet<? extends StorageMetadata> pageSet = blobStore.list(bucketName, new ListContainerOptions().recursive());
+
+            Iterator itr = pageSet.iterator();
+
+            while(itr.hasNext())
+            {
+                String fname = ((MutableBlobMetadata)itr.next()).getName();
+                switch(type) {
+                    // check if file is a collection file and is in format <org_name>/<app_name>.<collection_name>.[0-9]+.json
+                    case 0:
+                        if(fname.contains(filename))
+                        {
+                            copyFile(bucketName,fname);
+                            i++;
+                        }
+                        break;
+                    // check if file is an application file and is in format <org_name>/<app_name>.[0-9]+.json
+                    case 1:
+                        if(fname.matches(filename+"[0-9]+\\.json"))
+                        {
+                            copyFile(bucketName,fname);
+                            i++;
+                        }
+                        break;
+                    // check if file is an application file and is in format <org_name>/[-a-zA-Z0-9]+.[0-9]+.json
+                    case 2:
+                        if(fname.matches(filename+"[-a-zA-Z0-9]+\\.[0-9]+\\.json"))
+                        {
+                            copyFile(bucketName,fname);
+                            i++;
+                        }
+                        break;
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return files;
+    }
+
+    /**
+     * Copy the file from s3 into a temp local file
+     * @param bucketName the S3 bucket name from where files need to be imported
+     * @param fname the filename by which the temp file should be created
+     * @throws IOException
+     */
+    void copyFile(String bucketName, String fname) throws IOException {
+
+        Logger logger = LoggerFactory.getLogger(ImportServiceImpl.class);
+        Blob blob = blobStore.getBlob(bucketName, fname);
+        blobs.add(blob);
+        String[] fileOrg = fname.split("/");
+        File organizationDirectory = new File(fileOrg[0]);
+
+        if (!organizationDirectory.exists()) {
+            try {
+                organizationDirectory.mkdir();
+            }catch(SecurityException se) {
+                logger.error(se.getMessage());
+            }
+        }
+
+        File ephemeral = new File(fname);
+
+        FileOutputStream fop = new FileOutputStream(ephemeral);
+
+        blobs.get(i).getPayload().writeTo(fop);
+
+        files.add(ephemeral);
+
+        organizationDirectory.deleteOnExit();
+        ephemeral.deleteOnExit();
+        fop.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java b/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java
index 5a08338..2a04c52 100644
--- a/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java
+++ b/stack/services/src/test/java/org/apache/usergrid/ServiceITSetup.java
@@ -20,7 +20,7 @@ package org.apache.usergrid;
 import org.apache.usergrid.management.ApplicationCreator;
 import org.apache.usergrid.management.ManagementService;
 import org.apache.usergrid.management.export.ExportService;
-import org.apache.usergrid.management.importug.ImportService;
+import org.apache.usergrid.management.importer.ImportService;
 import org.apache.usergrid.security.providers.SignInProviderFactory;
 import org.apache.usergrid.security.tokens.TokenService;
 import org.apache.usergrid.services.ServiceManagerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java b/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java
index 9f5ac16..8cafa68 100644
--- a/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java
+++ b/stack/services/src/test/java/org/apache/usergrid/ServiceITSetupImpl.java
@@ -21,7 +21,7 @@ import org.apache.usergrid.cassandra.CassandraResource;
 import org.apache.usergrid.management.ApplicationCreator;
 import org.apache.usergrid.management.ManagementService;
 import org.apache.usergrid.management.export.ExportService;
-import org.apache.usergrid.management.importug.ImportService;
+import org.apache.usergrid.management.importer.ImportService;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.security.providers.SignInProviderFactory;
 import org.apache.usergrid.security.tokens.TokenService;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
index 3cabb89..5453324 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/ImportServiceIT.java
@@ -32,9 +32,9 @@ import org.apache.usergrid.management.UserInfo;
 import org.apache.usergrid.management.export.ExportService;
 import org.apache.usergrid.management.export.S3Export;
 import org.apache.usergrid.management.export.S3ExportImpl;
-import org.apache.usergrid.management.importug.ImportService;
-import org.apache.usergrid.management.importug.S3Import;
-import org.apache.usergrid.management.importug.S3ImportImpl;
+import org.apache.usergrid.management.importer.ImportService;
+import org.apache.usergrid.management.importer.S3Import;
+import org.apache.usergrid.management.importer.S3ImportImpl;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.entities.JobData;
 import org.jclouds.ContextBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5d6c4acc/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java
index b6b54b0..23f264c 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/cassandra/MockS3ImportImpl.java
@@ -17,15 +17,13 @@
 
 package org.apache.usergrid.management.cassandra;
 
-import org.apache.usergrid.management.importug.S3Import;
+import org.apache.usergrid.management.importer.S3Import;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Map;
 
-/**
- * Created by ApigeeCorporation on 7/8/14.
- */
+
 public class MockS3ImportImpl implements S3Import{
     private final String filename;
 


Mime
View raw message