usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [06/33] adding notificationsservice
Date Fri, 05 Sep 2014 21:11:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
new file mode 100644
index 0000000..554074e
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+
+import java.util.UUID;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.usergrid.persistence.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.batch.job.OnlyOnceJob;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.entities.JobData;
+import org.apache.usergrid.services.ServiceManager;
+import org.apache.usergrid.services.ServiceManagerFactory;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+
+
+@Component( "queueJob" )
+public class QueueJob extends OnlyOnceJob {
+
+    private static final Logger logger = LoggerFactory.getLogger( QueueJob.class );
+
+    @Autowired
+    private MetricsFactory metricsService;
+
+    @Autowired
+    private ServiceManagerFactory smf;
+
+    @Autowired
+    private EntityManagerFactory emf;
+    private Histogram histogram;
+    private Meter requests;
+    private Timer execution;
+
+
+    public QueueJob() {
+        logger.info( "QueueJob created: " + this );
+    }
+
+
+    @PostConstruct
+    void init() {
+        histogram = metricsService.getHistogram( QueueJob.class, "cycle" );
+        requests = metricsService.getMeter( QueueJob.class, "requests" );
+        execution = metricsService.getTimer( QueueJob.class, "execution" );
+    }
+
+
+    @Override
+    public void doJob( JobExecution jobExecution ) throws Exception {
+        Timer.Context timer = execution.time();
+        requests.mark();
+        logger.info( "execute QueueJob {}", jobExecution );
+
+        JobData jobData = jobExecution.getJobData();
+        UUID applicationId = ( UUID ) jobData.getProperty( "applicationId" );
+        ServiceManager sm = smf.getServiceManager( applicationId );
+        NotificationsService notificationsService = ( NotificationsService ) sm.getService( "notifications" );
+
+        EntityManager em = emf.getEntityManager( applicationId );
+
+        try {
+            if ( em == null ) {
+                logger.info( "no EntityManager for applicationId  {}", applicationId );
+                return;
+            }
+            UUID notificationId = ( UUID ) jobData.getProperty( "notificationId" );
+            Notification notification = em.get( notificationId, Notification.class );
+            if ( notification == null ) {
+                logger.info( "notificationId {} no longer exists", notificationId );
+                return;
+            }
+
+            try {
+                notificationsService.getQueueManager().queueNotification( notification, jobExecution );
+            }
+            catch ( Exception e ) {
+                logger.error( "execute QueueJob failed", e );
+                em.setProperty( notification, "errorMessage", e.getMessage() );
+                throw e;
+            }
+            finally {
+                long diff = System.currentTimeMillis() - notification.getCreated();
+                histogram.update( diff );
+            }
+        }
+        finally {
+            timer.stop();
+        }
+
+        logger.info( "execute QueueJob completed normally" );
+    }
+
+
+    @Override
+    protected long getDelay( JobExecution execution ) throws Exception {
+        return TaskManager.BATCH_DEATH_PERIOD;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
new file mode 100644
index 0000000..e6c876c
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import java.util.*;
+
+import org.apache.usergrid.persistence.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueueManager;
+import org.apache.usergrid.mq.QueueResults;
+import org.apache.usergrid.persistence.entities.Device;
+
+/**
+ * When all Tasks are "complete", this calls notifyAll(). Note: This may not
+ * mean that all work is done, however, as delivery errors may come in after a
+ * notification is "sent."
+ */
+public class TaskManager {
+
+    // period to poll for batch completion (keep well under Job scheduler
+    // heartbeat value!)
+    private static final long BATCH_POLL_PERIOD = 60 * 1000;
+    // period to tell job scheduler to wait between heartbeats before timing out
+    // this transaction
+    public static final long SCHEDULER_HEARTBEAT_PERIOD = 5 * 60 * 1000;
+    // period at which the batch is considered dead without activity (10
+    // minutes)
+    // setting it high means that a batch that is dead will hang for longer
+    // but setting it too low may cause duplicates to be sent.
+    // also used for Delay before another Job will be attempted - thus total
+    // time
+    // before a job might be restarted could be as long as 2 x
+    // BATCH_DEATH_PERIOD
+    static final long BATCH_DEATH_PERIOD = 10 * 60 * 1000;
+    public static final long MESSAGE_TRANSACTION_TIMEOUT = SCHEDULER_HEARTBEAT_PERIOD;
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(TaskManager.class);
+    private final String path;
+
+    private Notification notification;
+    private  ConcurrentHashMap<UUID,Message> remaining;
+    private AtomicLong successes = new AtomicLong();
+    private AtomicLong failures = new AtomicLong();
+    private AtomicLong skips = new AtomicLong();
+    private QueueManager qm;
+    private EntityManager em;
+    private NotificationServiceProxy ns;
+
+    public TaskManager(  EntityManager em, NotificationServiceProxy ns,QueueManager qm, Notification notification, QueueResults queueResults) {
+        this.em = em;
+        this.qm = qm;
+        this.ns = ns;
+        this.path = queueResults.getPath();
+        this.notification = notification;
+        this.remaining = new ConcurrentHashMap<UUID,Message>();
+        for (Message m : queueResults.getMessages()) {
+            remaining
+                    .put((UUID) m.getObjectProperty("deviceUUID"), m);
+        }
+    }
+
+    public void skip(UUID deviceUUID) throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("notification {} skipped device {}",
+                    notification.getUuid(), deviceUUID);
+        }
+        skips.incrementAndGet();
+        completed(null, null, deviceUUID, null);
+    }
+
+    public void completed(Notifier notifier, Receipt receipt, UUID deviceUUID,
+            String newProviderId) throws Exception {
+
+            LOG.debug("REMOVED {}", deviceUUID);
+            try {
+                EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE,
+                        deviceUUID);
+
+                if (receipt != null) {
+                    LOG.debug("notification {} sent to device {}. saving receipt.",
+                            notification.getUuid(), deviceUUID);
+                    successes.incrementAndGet();
+                    receipt.setSent(System.currentTimeMillis());
+                    this.saveReceipt(notification, deviceRef, receipt);
+                    LOG.debug("notification {} receipt saved for device {}",
+                            notification.getUuid(), deviceUUID);
+                }
+
+                if (remaining.containsKey(deviceUUID)) {
+                    LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
+                    qm.commitTransaction(path, remaining.get(deviceUUID).getTransaction(), null);
+                }
+
+                if (newProviderId != null) {
+                    LOG.debug("notification {} replacing device {} notifierId", notification.getUuid(), deviceUUID);
+                    replaceProviderId(deviceRef, notifier, newProviderId);
+                }
+
+                LOG.debug("notification {} completed device {}", notification.getUuid(), deviceUUID);
+
+            } finally {
+                LOG.debug("COUNT is: {}", successes.get());
+                remaining.remove(deviceUUID);
+                // note: stats are transient for the duration of the batch
+                if (remaining.size() == 0) {
+                    long successesCopy = successes.get();
+                    long failuresCopy = failures.get();
+                    if (successesCopy > 0 || failuresCopy > 0 || skips.get()>0) {
+                        ns.finishedBatch(notification, successesCopy, failuresCopy);
+                    }
+                }
+
+            }
+    }
+
+    public void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code,
+            String message) throws Exception {
+
+        try {
+
+
+            if (LOG.isDebugEnabled()) {
+                StringBuilder sb = new StringBuilder();
+                sb.append("notification ").append(notification.getUuid());
+                sb.append(" for device ").append(deviceUUID);
+                sb.append(" got error ").append(code);
+                LOG.debug(sb.toString());
+            }
+
+            failures.incrementAndGet();
+            if (receipt.getUuid() != null) {
+                successes.decrementAndGet();
+            }
+            receipt.setErrorCode(code);
+            receipt.setErrorMessage(message);
+            this.saveReceipt(notification, new SimpleEntityRef( Device.ENTITY_TYPE, deviceUUID), receipt);
+            LOG.debug("notification {} receipt saved for device {}",  notification.getUuid(), deviceUUID);
+        } finally {
+            completed(notifier, null, deviceUUID, null);
+        }
+    }
+
+    /*
+    * called from TaskManager - creates a persistent receipt and updates the
+    * passed one w/ the UUID
+    */
+    public void saveReceipt(EntityRef notification, EntityRef device,
+                            Receipt receipt) throws Exception {
+        if (receipt.getUuid() == null) {
+            Receipt savedReceipt = em.create(receipt);
+            receipt.setUuid(savedReceipt.getUuid());
+
+            List<EntityRef> entities = Arrays.asList(notification, device);
+            em.addToCollections(entities, Notification.RECEIPTS_COLLECTION,  savedReceipt);
+        } else {
+            em.update(receipt);
+        }
+    }
+
+    protected void replaceProviderId(EntityRef device, Notifier notifier,
+                                     String newProviderId) throws Exception {
+        Object value = em.getProperty(device, notifier.getName()
+                + NotificationsService.NOTIFIER_ID_POSTFIX);
+        if (value != null) {
+            em.setProperty(device, notifier.getName() + NotificationsService.NOTIFIER_ID_POSTFIX,  newProviderId);
+        } else {
+            value = em.getProperty(device, notifier.getUuid()
+                    + NotificationsService.NOTIFIER_ID_POSTFIX);
+            if (value != null) {
+                em.setProperty(device,  notifier.getUuid() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+            }
+        }
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java
new file mode 100644
index 0000000..3b4d3d7
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskTracker.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+
+import org.apache.usergrid.persistence.Notifier;
+import org.apache.usergrid.persistence.Receipt;
+
+import java.util.UUID;
+
+public class TaskTracker {
+
+    private Notifier notifier;
+    private TaskManager taskManager;
+    private Receipt receipt;
+    private UUID id;
+
+    public TaskTracker(Notifier notifier, TaskManager taskManager, Receipt receipt, UUID id) {
+        this.notifier = notifier;
+        this.taskManager = taskManager;
+        this.receipt = receipt;
+        this.id = id;
+    }
+
+    public void completed() throws Exception {
+        taskManager.completed(notifier, receipt, id, null);
+    }
+
+    public void failed(Object code, String message) throws Exception {
+        taskManager.failed(notifier, receipt, id, code, message);
+    }
+
+    public void completed(String newToken) throws Exception {
+        taskManager.completed(notifier, receipt, id, newToken);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
new file mode 100644
index 0000000..b3f6243
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TestAdapter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.Notification;
+import org.apache.usergrid.persistence.Notifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.services.ServicePayload;
+import org.apache.usergrid.services.notifications.apns.APNsAdapter;
+import org.apache.usergrid.services.notifications.apns.APNsNotification;
+
+/**
+ * Just used for testing. Performance and such.
+ */
+public class TestAdapter implements ProviderAdapter {
+
+    private static final Logger log = LoggerFactory.getLogger(TestAdapter.class);
+    private static final int DELAY = 1; // if delay > 0, uses threadpool
+
+    private ExecutorService pool = null;
+
+    public TestAdapter() {
+        if (DELAY > 0) {
+            pool = Executors
+                    .newFixedThreadPool(APNsAdapter.MAX_CONNECTION_POOL_SIZE);
+        }
+    }
+
+    @Override
+    public void testConnection(Notifier notifier) throws ConnectionException {
+    }
+
+    @Override
+    public void sendNotification(
+            String providerId, 
+            Notifier notifier,
+            final Object payload, 
+            Notification notification,
+            TaskTracker tracker)
+            throws Exception {
+
+        final APNsNotification apnsNotification = APNsNotification.create(
+                "", payload.toString(), notification, tracker);
+
+        if (pool == null) {
+            apnsNotification.messageSent();
+
+        } else {
+            pool.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        Thread.sleep(DELAY);
+                        apnsNotification.messageSent();
+                        log.debug("messageSent() - " + payload.toString());
+                    } catch (Exception e) {
+                        log.error("messageSent() returned error", e);
+                    }
+                }
+            });
+        }
+    }
+
+    @Override
+    public void doneSendingNotifications() throws Exception {
+        log.debug("doneSendingNotifications()");
+    }
+
+    @Override
+    public Map<String, Date> getInactiveDevices(Notifier notifier,
+            EntityManager em) throws Exception {
+        log.debug("getInactiveDevices()");
+        return null;
+    }
+
+    @Override
+    public Object translatePayload(Object payload) throws Exception {
+        return payload;
+    }
+
+    @Override
+    public void validateCreateNotifier(ServicePayload payload) throws Exception {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
new file mode 100644
index 0000000..73399c9
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.apns;
+
+import com.google.common.cache.*;
+
+import com.relayrides.pushy.apns.*;
+import com.relayrides.pushy.apns.util.*;
+
+import org.apache.usergrid.persistence.Notification;
+import org.apache.usergrid.persistence.Notifier;
+import org.mortbay.util.ajax.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.security.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
+import org.apache.usergrid.services.ServicePayload;
+import org.apache.usergrid.services.notifications.ConnectionException;
+import org.apache.usergrid.services.notifications.ProviderAdapter;
+import org.apache.usergrid.services.notifications.TaskTracker;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * Adapter for Apple push notifications
+ */
+public class APNsAdapter implements ProviderAdapter {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(APNsAdapter.class);
+
+    public static int MAX_CONNECTION_POOL_SIZE = 15;
+    private static final Set<String> validEnvironments = new HashSet<String>();
+    private static final String TEST_TOKEN = "ff026b5a4d2761ef13843e8bcab9fc83b47f1dfbd1d977d225ab296153ce06d6";
+    private static final String TEST_PAYLOAD = "{}";
+
+    static {
+        validEnvironments.add("development");
+        validEnvironments.add("production");
+        validEnvironments.add("mock");
+    }
+
+    public APNsAdapter(){}
+
+    @Override
+    public void testConnection(Notifier notifier) throws ConnectionException {
+        if(isMock(notifier)){
+            delayRandom(notifier); return;
+        }
+        TestAPNsNotification notification =  TestAPNsNotification.create(TEST_TOKEN, TEST_PAYLOAD);
+        try {
+            CountDownLatch latch = new CountDownLatch(1);
+            notification.setLatch(latch);
+                PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
+                addToQueue(pushManager, notification);
+                latch.await(10000,TimeUnit.MILLISECONDS);
+                if(notification.hasFailed()){
+                    // this is expected with a bad certificate (w/message: comes from failedconnectionlistener
+                    throw new ConnectionException("Bad certificate. Double-check your environment.",notification.getCause() != null ? notification.getCause() : new Exception("Bad certificate."));
+                }
+                notification.finished();
+            } catch (Exception e) {
+                notification.finished();
+
+                if (e instanceof ConnectionException) {
+                throw (ConnectionException) e;
+            }
+            if (e instanceof InterruptedException) {
+                throw new ConnectionException("Test notification timed out", e);
+            }
+            logger.warn("testConnection got non-fatal error", e.getCause());
+        }
+    }
+
+    private BlockingQueue<SimpleApnsPushNotification> addToQueue(PushManager<SimpleApnsPushNotification> pushManager, SimpleApnsPushNotification notification) throws InterruptedException {
+        BlockingQueue<SimpleApnsPushNotification> queue = pushManager.getQueue();
+        queue.offer(notification,2500,TimeUnit.MILLISECONDS);
+        return queue;
+    }
+
+    @Override
+    public void sendNotification(String providerId, Notifier notifier,
+            Object payload, Notification notification, TaskTracker tracker)
+            throws Exception {
+        if(isMock(notifier)){
+            tracker.completed("Mocked!");
+            return;
+        }
+        APNsNotification apnsNotification = APNsNotification.create(providerId, payload.toString(), notification, tracker);
+        PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
+        try {
+            addToQueue(pushManager, apnsNotification);
+            apnsNotification.messageSent();
+        }catch (InterruptedException ie){
+            apnsNotification.messageSendFailed(ie);
+            throw ie;
+        }
+    }
+
+    @Override
+    public void doneSendingNotifications() throws Exception {
+        // do nothing - no batching
+    }
+
+    @Override
+    public Map<String, Date> getInactiveDevices(Notifier notifier,
+            EntityManager em) throws Exception {
+        Map<String,Date> map = new HashMap<String,Date>();
+        if(isMock(notifier)){
+            return map;
+        }
+        PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
+
+        List<ExpiredToken> tokens = null;
+        try {
+            tokens = pushManager.getExpiredTokens();
+        }catch (FeedbackConnectionException fce){
+            logger.debug("Failed to get tokens",fce);
+            return map;
+        }
+        for(ExpiredToken token : tokens){
+            String expiredToken = new String(token.getToken());
+            map.put(expiredToken, token.getExpiration());
+        }
+        return map;
+    }
+
+    private PushManager<SimpleApnsPushNotification> getPushManager(Notifier notifier) throws ExecutionException {
+        PushManager<SimpleApnsPushNotification> pushManager = apnsServiceMap.get(notifier);
+        if(pushManager != null &&  !pushManager.isStarted() && pushManager.isShutDown()){
+            try{
+                pushManager = createApnsService(notifier);
+            }catch(Exception e){
+                logger.error("could not instantiate push manager.");
+                throw new ExecutionException(e);
+            }
+            apnsServiceMap.put(notifier,pushManager);
+        }
+        try {
+            if (!pushManager.isStarted()) { //ensure manager is started
+                pushManager.start();
+            }
+        }catch(IllegalStateException ise){
+            logger.debug("failed to start",ise);//could have failed because its starteded
+        }
+        return pushManager;
+    }
+
+    //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
+    private static LoadingCache<Notifier, PushManager<SimpleApnsPushNotification>> apnsServiceMap = CacheBuilder
+            .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES)
+            .removalListener(new RemovalListener<Notifier, PushManager<SimpleApnsPushNotification>>() {
+                @Override
+                public void onRemoval(
+                        RemovalNotification<Notifier, PushManager<SimpleApnsPushNotification>> notification) {
+                    try {
+                        PushManager<SimpleApnsPushNotification> manager = notification.getValue();
+                        if(!manager.isShutDown()){
+                            notification.getValue().shutdown();
+                        }
+                    } catch (Exception ie) {
+                        logger.error("Failed to shutdown from cache",ie);
+                    }
+                }
+            }).build(new CacheLoader<Notifier, PushManager<SimpleApnsPushNotification>>() {
+                @Override
+                public PushManager<SimpleApnsPushNotification> load(Notifier notifier) {
+                    try{
+                        return createApnsService(notifier);
+                    }catch (KeyStoreException ke){
+                        logger.error("Could not instantiate pushmanager",ke);
+                        return null;
+                    }
+                }
+            });
+
+
+    protected static PushManager<SimpleApnsPushNotification> createApnsService(Notifier notifier) throws KeyStoreException{
+        LinkedBlockingQueue<SimpleApnsPushNotification> queue = new LinkedBlockingQueue<SimpleApnsPushNotification>();
+        PushManagerConfiguration config = new PushManagerConfiguration();
+        config.setConcurrentConnectionCount(Runtime.getRuntime().availableProcessors() * 2);
+        PushManager<SimpleApnsPushNotification> pushManager =  new PushManager<SimpleApnsPushNotification>(getApnsEnvironment(notifier), getSSLContext(notifier), null, null, queue, config);
+        //only tested when a message is sent
+        pushManager.registerRejectedNotificationListener(new RejectedAPNsListener());
+        //this will get tested when start is called
+        pushManager.registerFailedConnectionListener(new FailedConnectionListener());
+        return pushManager;
+    }
+
+    @Override
+    public Object translatePayload(Object objPayload) throws Exception {
+        String payload;
+        if (objPayload instanceof String) {
+            payload = (String) objPayload;
+            if (!payload.startsWith("{")) {
+                payload = "{\"aps\":{\"alert\":\"" + payload + "\"}}";
+            }
+        } else {
+            payload = JSON.toString(objPayload);
+        }
+        if (payload.length() > 256) {
+            throw new IllegalArgumentException(
+                    "Apple APNs payloads must be 256 characters or less");
+        }
+        return payload;
+    }
+
+
+    @Override
+    public void validateCreateNotifier(ServicePayload payload) throws Exception {
+        String environment = payload.getStringProperty("environment");
+        if (!validEnvironments.contains(environment)) {
+            throw new IllegalArgumentException("environment must be one of: "
+                    + Arrays.toString(validEnvironments.toArray()));
+        }
+
+        if (payload.getProperty("p12Certificate") == null) {
+            throw new RequiredPropertyNotFoundException("notifier",
+                    "p12Certificate");
+        }
+    }
+    public boolean isMock(Notifier notifier){
+        return notifier.getEnvironment() !=null ? notifier.getEnvironment().equals("mock") : false ;
+    }
+    public boolean delayRandom(Notifier notifier) {
+        boolean wasDelayed = false;
+        if (isMock(notifier)) {
+            try {
+                Thread.sleep(
+                        new Random().nextInt(300)
+                );
+                wasDelayed = true;
+            } catch (InterruptedException ie) {
+                //delay was stopped
+            }
+        }
+        return wasDelayed;
+    }
+
+    private static ApnsEnvironment getApnsEnvironment(Notifier notifier){
+        return  notifier.isProduction()
+                ? ApnsEnvironment.getProductionEnvironment()
+                : ApnsEnvironment.getSandboxEnvironment();
+    }
+
+
+    private static SSLContext getSSLContext(Notifier notifier) {
+        try {
+            KeyStore keyStore = KeyStore.getInstance("PKCS12");
+            String password = notifier.getCertificatePassword();
+            char[] passChars =(password != null ? password : "").toCharArray();
+            InputStream stream = notifier.getP12CertificateStream();
+            keyStore.load(stream,passChars);
+            SSLContext context =  SSLContextUtil.createDefaultSSLContext(keyStore, passChars);
+            return context;
+        }catch (Exception e){
+            throw new RuntimeException("Error getting certificate",e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
new file mode 100644
index 0000000..11a9856
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.RejectedNotificationReason;
+import com.relayrides.pushy.apns.util.MalformedTokenStringException;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import com.relayrides.pushy.apns.util.TokenUtil;
+
+import org.apache.usergrid.persistence.Notification;
+import org.apache.usergrid.services.notifications.TaskTracker;
+
+import java.util.Calendar;
+import java.util.Date;
+/**
+ * Standard apigee notificatton
+ */
+public class APNsNotification extends SimpleApnsPushNotification {
+
+
+    private TaskTracker tracker;
+
+    /**
+     * Factory method
+     * @param providerId token for device
+     * @param payload body
+     * @param notification notification entity
+     * @param tracker tracks completion
+     * @return
+     */
+    public static APNsNotification create(String providerId, String payload, Notification notification, TaskTracker tracker) throws RuntimeException {
+
+        Calendar date  = Calendar.getInstance();
+        date.add(Calendar.SECOND, notification.getExpireTimeInSeconds());
+      try {
+          final byte[] token = TokenUtil.tokenStringToByteArray(providerId);
+
+          return new APNsNotification(tracker, date.getTime(), token, payload, notification);
+      }catch(MalformedTokenStringException mtse){
+          throw new RuntimeException("Exception converting token",mtse);
+      }
+    }
+
+    /**
+     * Default constructor
+     * @param tracker
+     * @param expiryTime
+     * @param token
+     * @param payload
+     */
+    public APNsNotification(TaskTracker tracker, Date expiryTime, byte[] token, String payload,Notification notification) {
+        super(token, payload, expiryTime);
+        this.tracker = tracker;
+    }
+
+    /**
+     * mark message sent
+     * @throws Exception
+     */
+    public void messageSent() throws Exception {
+        if (tracker != null) {
+            tracker.completed();
+        }
+    }
+
+    /**
+     * mark message failed
+     *
+     * @throws Exception
+     */
+    public void messageSendFailed(RejectedNotificationReason reason) throws Exception {
+        if (tracker != null) {
+            tracker.failed(reason.name(), "Failed sending notification.");
+        }
+    }
+
+    /**
+     * mark message failed, from exception
+     * @param cause
+     * @throws Exception
+     */
+    public void messageSendFailed(Throwable cause) throws Exception {
+        if (tracker != null) {
+            tracker.failed(cause.getClass().getSimpleName(), cause.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/FailedConnectionListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/FailedConnectionListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/FailedConnectionListener.java
new file mode 100644
index 0000000..552701f
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/FailedConnectionListener.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.*;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Provides a single listener that basically just delegates back to the
+ * APNsNotification for handling.
+ */
+public class FailedConnectionListener implements com.relayrides.pushy.apns.FailedConnectionListener<SimpleApnsPushNotification> {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(RejectedAPNsListener.class);
+
+    @Override
+    public void handleFailedConnection(PushManager<? extends SimpleApnsPushNotification> pushManager, Throwable cause) {
+        List<SimpleApnsPushNotification> notifications = new ArrayList<SimpleApnsPushNotification>();
+        if (cause instanceof SSLException || cause instanceof SSLHandshakeException) { //cert is probably bad so shut it down.
+            if (!pushManager.isShutDown()) {
+                pushManager.unregisterFailedConnectionListener(this);
+
+                try {
+                    BlockingQueue notificationQueue =  pushManager.getQueue();
+                    if(notificationQueue !=null){
+                        LinkedBlockingQueue<SimpleApnsPushNotification>  queue =  ( LinkedBlockingQueue<SimpleApnsPushNotification> )notificationQueue;
+                        Object[] objectMess = queue.toArray(); //get messages still in queue
+                        for(Object o : objectMess){
+                            if(o instanceof SimpleApnsPushNotification) {
+                                notifications.add((SimpleApnsPushNotification) o);
+                            }
+                        }
+                    }
+                    pushManager.shutdown();
+                } catch (InterruptedException ie) {
+                    logger.error("Failed to stop push services", ie);
+                }
+            } else {
+                return;
+            }
+        }
+        //mark all unsent notifications failed
+        if (notifications != null) {
+            for (SimpleApnsPushNotification notification : notifications) {
+                if (notification instanceof APNsNotification) {
+                    try {
+                        ((APNsNotification) notification).messageSendFailed(cause);//mark failed with bad token
+                    } catch (Exception e) {
+                        logger.error("failed to track notification in failed connection listener", e);
+                    }
+                }
+                //if test this is a problem because you can't connect
+                if (notification instanceof TestAPNsNotification) {
+                    TestAPNsNotification testAPNsNotification = ((TestAPNsNotification) notification);
+                    testAPNsNotification.setReason(cause);
+                    testAPNsNotification.countdown();
+                }
+
+            }
+            pushManager.getQueue().clear();
+        }
+        logger.error("Failed to register push connection", cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/RejectedAPNsListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/RejectedAPNsListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/RejectedAPNsListener.java
new file mode 100644
index 0000000..545dc0c
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/RejectedAPNsListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.*;
+import com.relayrides.pushy.apns.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides a single listener that basically just delegates back to the
+ * APNsNotification for handling.
+ */
+public class RejectedAPNsListener implements RejectedNotificationListener<SimpleApnsPushNotification>{
+
+    @Override
+    public void handleRejectedNotification(PushManager<? extends SimpleApnsPushNotification> pushManager, SimpleApnsPushNotification notification, RejectedNotificationReason rejectionReason) {
+        try {
+            //mark failed for standard notification
+            if (notification instanceof APNsNotification) {
+                ((APNsNotification) notification).messageSendFailed(rejectionReason);
+            }
+            //if test getting here means it worked
+            if(notification instanceof TestAPNsNotification){
+                TestAPNsNotification testAPNsNotification = (TestAPNsNotification) notification;
+                testAPNsNotification.setReason(rejectionReason);
+                testAPNsNotification.countdown();
+                logger.error("Failed to connect to APN's service",testAPNsNotification);
+            }
+
+        } catch (Exception e) {
+            logger.error("Failed to track rejected listener", e);
+        }
+        System.out.format("%s was rejected with rejection reason %s\n", notification, rejectionReason);
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(RejectedAPNsListener.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/TestAPNsListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/TestAPNsListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/TestAPNsListener.java
new file mode 100644
index 0000000..9d112d4
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/TestAPNsListener.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.ApnsConnection;
+import com.relayrides.pushy.apns.ApnsConnectionListener;
+import com.relayrides.pushy.apns.RejectedNotificationReason;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+public class TestAPNsListener implements ApnsConnectionListener<SimpleApnsPushNotification> {
+
+    private final CountDownLatch latch;
+
+    private boolean connectionFailed = false;
+    private boolean connectionClosed = false;
+
+    private Throwable connectionFailureCause;
+
+    private final ArrayList<SimpleApnsPushNotification> writeFailures = new ArrayList<SimpleApnsPushNotification>();
+
+    private SimpleApnsPushNotification rejectedNotification;
+    private RejectedNotificationReason rejectionReason;
+
+    private final ArrayList<SimpleApnsPushNotification> unprocessedNotifications = new ArrayList<SimpleApnsPushNotification>();
+
+    public TestAPNsListener() {
+        this.latch = new CountDownLatch(1);
+    }
+
+    public void handleConnectionSuccess(final ApnsConnection<SimpleApnsPushNotification> connection) {
+        latch.countDown();
+    }
+
+    public void handleConnectionFailure(final ApnsConnection<SimpleApnsPushNotification> connection, final Throwable cause) {
+        this.connectionFailed = true;
+        this.connectionFailureCause = cause;
+        latch.countDown();
+    }
+
+    public void handleConnectionClosure(ApnsConnection<SimpleApnsPushNotification> connection) {
+        try {
+            connection.waitForPendingWritesToFinish();
+        } catch (InterruptedException ignored) {
+        }
+        this.connectionClosed = true;
+        latch.countDown();
+    }
+
+    public void handleWriteFailure(ApnsConnection<SimpleApnsPushNotification> connection,
+                                   SimpleApnsPushNotification notification, Throwable cause) {
+
+        this.writeFailures.add(notification);
+    }
+
+    public void handleRejectedNotification(ApnsConnection<SimpleApnsPushNotification> connection,
+                                           SimpleApnsPushNotification rejectedNotification, RejectedNotificationReason reason) {
+
+        this.rejectedNotification = rejectedNotification;
+        this.rejectionReason = reason;
+    }
+
+    public void handleUnprocessedNotifications(ApnsConnection<SimpleApnsPushNotification> connection,
+                                               Collection<SimpleApnsPushNotification> unprocessedNotifications) {
+
+        this.unprocessedNotifications.addAll(unprocessedNotifications);
+    }
+
+    public void handleConnectionWritabilityChange(ApnsConnection<SimpleApnsPushNotification> connection, boolean writable) {
+    }
+
+    public boolean hasConnectionFailed() {
+        return connectionFailed;
+    }
+
+    public Throwable getConnectionFailureCause(){
+        return connectionFailureCause;
+    }
+
+    public CountDownLatch getLatch() {
+        return latch;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/TestAPNsNotification.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/TestAPNsNotification.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/TestAPNsNotification.java
new file mode 100644
index 0000000..014d275
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/TestAPNsNotification.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.RejectedNotificationReason;
+import com.relayrides.pushy.apns.util.MalformedTokenStringException;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import com.relayrides.pushy.apns.util.TokenUtil;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * notification type for testing connections
+ */
+public class TestAPNsNotification extends SimpleApnsPushNotification {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestAPNsNotification.class);
+
+    boolean hasFailed = false;
+
+    CountDownLatch latch;
+
+    private final Timer processTimer  =
+            Metrics.newTimer(TestAPNsNotification.class, "apns_test_notification", TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
+    private TimerContext timer;
+    private Throwable cause;
+
+    public static TestAPNsNotification create(String tokenString, String payload) throws RuntimeException{
+        try {
+            final byte[] token = TokenUtil.tokenStringToByteArray(tokenString);
+            return new TestAPNsNotification( token, payload);
+        }catch (MalformedTokenStringException mtse) {
+            throw new RuntimeException("exception foreign byte array",mtse);
+        }
+    }
+
+    /**
+     * setup concurrency
+     * @param latch
+     */
+    public void setLatch(CountDownLatch latch){
+        this.latch = latch;
+    }
+
+    /**
+     * get concurrency
+     * @return
+     */
+    public CountDownLatch getLatch(){
+        return latch;
+    }
+
+    /**
+     * decrement countdown for concurrency
+     */
+    public void countdown(){
+        if(latch != null){
+            latch.countDown();
+        }
+    }
+
+    public TestAPNsNotification( byte[] token, String payload) {
+        super(token, payload, Calendar.getInstance().getTime());
+        this.timer = processTimer.time();
+    }
+
+    /**
+     * has this failed
+     * @return
+     */
+    public boolean hasFailed(){
+        return hasFailed;
+    }
+
+    /**
+     * stop timer
+     */
+    public void finished(){
+        this.timer.stop();
+    }
+
+    /**
+     * get failure reason
+     * @return cause
+     */
+    public Throwable getCause(){return cause;}
+    /**
+     * mark failure state
+     * @param cause
+     */
+    public void setReason(Throwable cause){
+        hasFailed = true; //token is definitely invalid, so don't fail
+        this.cause = cause;
+    }
+    /**
+     * mark failure state
+     * @param reason
+     */
+    public void setReason(RejectedNotificationReason reason){
+        hasFailed = reason != RejectedNotificationReason.INVALID_TOKEN; //token is definitely invalid, so don't fail
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
new file mode 100644
index 0000000..04a07d8
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.gcm;
+
+import com.google.android.gcm.server.*;
+import org.apache.usergrid.persistence.Notification;
+import org.apache.usergrid.persistence.Notifier;
+import org.mortbay.util.ajax.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.services.ServicePayload;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
+
+import org.apache.usergrid.services.notifications.ConnectionException;
+import org.apache.usergrid.services.notifications.ProviderAdapter;
+import org.apache.usergrid.services.notifications.TaskTracker;
+
+import java.io.IOException;
+import java.util.*;
+
+public class GCMAdapter implements ProviderAdapter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GCMAdapter.class);
+    private static final int SEND_RETRIES = 3;
+    private static int BATCH_SIZE = 1000;
+
+    private Map<Notifier, Batch> notifierBatches = new HashMap<Notifier, Batch>();
+
+    @Override
+    public void testConnection(Notifier notifier) throws ConnectionException {
+        if(isMock(notifier)){
+            try{Thread.sleep(200);}catch (Exception ie){}
+            return;
+        }
+        Sender sender = new Sender(notifier.getApiKey());
+        Message message = new Message.Builder().build();
+        try {
+            Result result = sender.send(message, "device_token", 1);
+            LOG.debug("testConnection result: {}", result);
+        } catch (IOException e) {
+            throw new ConnectionException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void sendNotification(String providerId, Notifier notifier,
+            Object payload, Notification notification, TaskTracker tracker)
+            throws Exception {
+        Map<String,Object> map = (Map<String, Object>) payload;
+        final String expiresKey = "time_to_live";
+        if(!map.containsKey(expiresKey) && notification.getExpire() != null){
+            int expireSeconds = notification.getExpireTimeInSeconds();
+            expireSeconds = expireSeconds <= 2419200 ? expireSeconds : 2419200; //send the max gcm value documented here http://developer.android.com/google/gcm/adv.html#ttl
+            map.put(expiresKey, expireSeconds);
+        }
+        Batch batch = getBatch(notifier, map);
+        batch.add(providerId, tracker);
+    }
+
+    synchronized private Batch getBatch(Notifier notifier,
+            Map<String, Object> payload) {
+        Batch batch = notifierBatches.get(notifier);
+        if (batch == null && payload != null) {
+            batch = new Batch(notifier, payload);
+            notifierBatches.put(notifier, batch);
+        }
+        return batch;
+    }
+
+    @Override
+    synchronized public void doneSendingNotifications() throws Exception {
+        for (Batch batch : notifierBatches.values()) {
+            batch.send();
+        }
+    }
+
+    @Override
+    public Map<String, Date> getInactiveDevices(Notifier notifier,
+            EntityManager em) throws Exception {
+        Batch batch = getBatch(notifier, null);
+        Map<String,Date> map = null;
+        if(batch != null) {
+            map = batch.getAndClearInactiveDevices();
+        }
+        return map;
+    }
+
+    @Override
+    public Map<String, Object> translatePayload(Object payload)
+            throws Exception {
+        Map<String, Object> mapPayload = new HashMap<String, Object>();
+        if (payload instanceof Map) {
+            mapPayload = (Map<String, Object>) payload;
+        } else if (payload instanceof String) {
+            mapPayload.put("data", payload);
+        } else {
+            throw new IllegalArgumentException(
+                    "GCM Payload must be either a Map or a String");
+        }
+        if (JSON.toString(mapPayload).length() > 4096) {
+            throw new IllegalArgumentException(
+                    "GCM payloads must be 4096 characters or less");
+        }
+        return mapPayload;
+    }
+
+    @Override
+    public void validateCreateNotifier(ServicePayload payload) throws Exception {
+        if (payload.getProperty("apiKey") == null) {
+            throw new RequiredPropertyNotFoundException("notifier", "apiKey");
+        }
+    }
+
+    private class Batch {
+        private Notifier notifier;
+        private Map payload;
+        private List<String> ids;
+        private List<TaskTracker> trackers;
+        private Map<String, Date> inactiveDevices = new HashMap<String, Date>();
+
+        Batch(Notifier notifier, Map<String, Object> payload) {
+            this.notifier = notifier;
+            this.payload = payload;
+            this.ids = new ArrayList<String>();
+            this.trackers = new ArrayList<TaskTracker>();
+        }
+
+        synchronized Map<String, Date> getAndClearInactiveDevices() {
+            Map<String, Date> map = inactiveDevices;
+            inactiveDevices = new HashMap<String, Date>();
+            return map;
+        }
+
+        synchronized void add(String id, TaskTracker tracker) throws Exception {
+            ids.add(id);
+            trackers.add(tracker);
+
+            if (ids.size() == BATCH_SIZE) {
+                send();
+            }
+        }
+
+        // Message.Builder requires the payload to be Map<String,String> for no
+        // good reason, so I just blind cast it.
+        // What actually happens is: "JSONValue.toJSONString(payload);" so
+        // anything that JSONValue can handle is fine.
+        // (What is necessary here is that the Map needs to have a nested
+        // structure.)
+        synchronized void send() throws Exception {
+            if (ids.size() == 0)
+                return;
+            Sender sender = new Sender(notifier.getApiKey());
+            Message.Builder builder = new Message.Builder();
+            builder.setData(payload);
+            Message message = builder.build();
+            if(isMock(notifier)){
+                delayRandom(notifier);
+                for(TaskTracker tracker : trackers){
+                    tracker.completed("Mocked!");
+                }
+                return;
+            }else {
+                MulticastResult multicastResult = sender.send(message, ids,
+                        SEND_RETRIES);
+                LOG.debug("sendNotification result: {}", multicastResult);
+
+                for (int i = 0; i < multicastResult.getTotal(); i++) {
+                    Result result = multicastResult.getResults().get(i);
+
+                    if (result.getMessageId() != null) {
+                        String canonicalRegId = result.getCanonicalRegistrationId();
+                        trackers.get(i).completed(canonicalRegId);
+                    } else {
+                        String error = result.getErrorCodeName();
+                        trackers.get(i).failed(error, error);
+                        if (Constants.ERROR_NOT_REGISTERED.equals(error)
+                                || Constants.ERROR_INVALID_REGISTRATION
+                                .equals(error)) {
+                            inactiveDevices.put(ids.get(i), new Date());
+                        }
+                    }
+                }
+            }
+            this.ids.clear();
+            this.trackers.clear();
+        }
+    }
+    public boolean isMock(Notifier notifier){
+        return notifier.getEnvironment() !=null ? notifier.getEnvironment().equals("mock") : false ;
+    }
+    public boolean delayRandom(Notifier notifier) {
+        boolean wasDelayed = false;
+        if (isMock(notifier)) {
+            try {
+                Thread.sleep(
+                        new Random().nextInt(300)
+                );
+                wasDelayed = true;
+            } catch (InterruptedException ie) {
+                //delay was stopped
+            }
+        }
+        return wasDelayed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
new file mode 100644
index 0000000..8d7964d
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifiers/NotifiersService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifiers;
+
+import org.apache.usergrid.persistence.Notifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.usergrid.services.*;
+import org.apache.usergrid.services.notifications.NotificationsService;
+import org.apache.usergrid.services.notifications.ProviderAdapter;
+
+import java.util.Arrays;
+import java.util.Set;
+
+public class NotifiersService extends AbstractCollectionService {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(NotifiersService.class);
+
+    public NotifiersService() {
+        super();
+        logger.info("/notifiers");
+    }
+
+    @Override
+    public ServiceResults postCollection(ServiceContext context)
+            throws Exception {
+
+        ServicePayload payload = context.getPayload();
+
+        NotificationsService ns = (NotificationsService) sm
+                .getService("notifications");
+        Set<String> providers = ns.getProviders();
+
+        String provider = payload.getStringProperty("provider");
+        if (!providers.contains(provider)) {
+            throw new IllegalArgumentException("provider must be one of: "
+                    + Arrays.toString(providers.toArray()));
+        }
+
+        ProviderAdapter providerAdapter = ns.providerAdapters.get(provider);
+        providerAdapter.validateCreateNotifier(payload);
+
+        ServiceResults results = super.postCollection(context);
+
+        Notifier notifier = (Notifier) results.getEntity();
+        if (notifier != null) {
+            try {
+                ns.testConnection(notifier);
+            } catch (Exception e) {
+                logger.info("notifier testConnection() failed", e);
+                em.delete(notifier);
+                throw e;
+            }
+        }
+
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/users/devices/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/users/devices/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/users/devices/notifications/NotificationsService.java
new file mode 100644
index 0000000..79972c6
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/users/devices/notifications/NotificationsService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.users.devices.notifications;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationsService extends
+        org.apache.usergrid.services.notifications.NotificationsService {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(NotificationsService.class);
+
+    public NotificationsService() {
+        logger.info("/users/*/devices/*/notifications");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9d7901ae/stack/services/src/main/java/org/apache/usergrid/services/users/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/users/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/users/notifications/NotificationsService.java
new file mode 100644
index 0000000..4b536bc
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/users/notifications/NotificationsService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.users.notifications;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NotificationsService extends
+        org.apache.usergrid.services.notifications.NotificationsService {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(NotificationsService.class);
+
+    public NotificationsService() {
+        logger.info("/users/*/notifications");
+    }
+
+}


Mime
View raw message