airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/2] git commit: adding another thread to handle unregistering: AIRAVATA-1022
Date Fri, 21 Feb 2014 22:27:13 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 84622574a -> 197daf118


adding another thread to handle unregistering: AIRAVATA-1022


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f2445c28
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f2445c28
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f2445c28

Branch: refs/heads/master
Commit: f2445c28fd06cdd0bbe582388d4537e6bdd3a1c4
Parents: 16d6d17
Author: lahiru <lahiru@apache.org>
Authored: Fri Feb 21 17:26:44 2014 -0500
Committer: lahiru <lahiru@apache.org>
Committed: Fri Feb 21 17:26:44 2014 -0500

----------------------------------------------------------------------
 .../airavata/job/monitor/MonitorManager.java    |  8 ++-
 .../job/monitor/impl/push/amqp/AMQPMonitor.java | 39 +++-------
 .../impl/push/amqp/UnRegisterThread.java        | 75 ++++++++++++++++++++
 3 files changed, 89 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/f2445c28/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 90cde3d..6cc5566 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -26,6 +26,7 @@ import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
+import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,7 +100,7 @@ public class MonitorManager {
      */
     public void launchMonitor() throws AiravataMonitorException {
         new Thread(){
-            public synchronized void run() {
+            public void run() {
                 if (pushMonitors.isEmpty()) {
                     if (pullMonitors.isEmpty()) {
                         logger.error("Before launching MonitorManager should have atleast
one Monitor");
@@ -110,7 +111,7 @@ public class MonitorManager {
                         try {
                             pullMonitor.startPulling();
                         } catch (AiravataMonitorException e) {
-                            e.printStackTrace();
+                            logger.error(e.getLocalizedMessage());
                         }
                     }
                 } else {
@@ -120,6 +121,9 @@ public class MonitorManager {
                     if(pushMonitor instanceof AMQPMonitor){
                         ((AMQPMonitor) pushMonitor).run();
                     }
+                    UnRegisterThread unRegisterThread = new
+                            UnRegisterThread(((AMQPMonitor) pushMonitor).getFinishQueue(),((AMQPMonitor)
pushMonitor).getAvailableChannels());
+                    unRegisterThread.run();
 
                 }
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/f2445c28/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 96642bf..818f2ac 100644
--- a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -33,14 +33,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * This is the implementation for AMQP based monitor, this uses
+ * This is the implementation for AMQP based finishQueue, this uses
  * rabbitmq client to recieve AMQP based monitoring data from
  * mostly excede resources.
  */
@@ -60,20 +57,19 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
 
     private BlockingQueue<MonitorID> finishQueue;
 
-    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue,BlockingQueue
finishQueue) {
+    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue runningQueue, BlockingQueue
finishQueue) {
         this.publisher = publisher;
         this.runningQueue = runningQueue;
-        availableChannels = new HashMap<String, Channel>();
         this.finishQueue = finishQueue;
-
+        availableChannels = new HashMap<String, Channel>();
+//        UnRegisterThread unRegisterThread = new UnRegisterThread(finishQueue,availableChannels);
+//        unRegisterThread.run();
+        System.out.println("Testing");
     }
 
-    public synchronized void run() {
+    public void run() {
         try {
             // before going to the while true mode we start unregister thread
-//            UnRegisterThread unRegisterThread = new UnRegisterThread(this);
-//            unRegisterThread.run();
-
             while (true) {
                 // we got a new job to do the monitoring
                 MonitorID take = runningQueue.take();
@@ -113,7 +109,7 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
                 channel.queueBind(queueName, "glue2.computing_activity", filterString);
                 System.out.println(filterString);
             } catch (IOException e) {
-                logger.error("Error creating the connection to monitor the job:" + monitorID.getJobID());
+                logger.error("Error creating the connection to finishQueue the job:" + monitorID.getJobID());
             }
         }
         return false;  //To change body of implemented methods use File | Settings | File
Templates.
@@ -190,24 +186,5 @@ public class AMQPMonitor extends PushMonitor implements Runnable {
      * implementing a logic to handle the finished job and unsubscribe
      */
 
-    class UnRegisterThread extends Thread {
-        private AMQPMonitor monitor;
 
-        public UnRegisterThread(AMQPMonitor monitor){
-            this.monitor = monitor;
-        }
-        public synchronized void run() {
-            while(true){
-                try {
-                    MonitorID monitorID = this.monitor.getFinishQueue().take();
-                    monitor.unRegisterListener(monitorID);
-
-                }  catch (AiravataMonitorException e) {
-                    e.printStackTrace();  //To change body of catch statement use File |
Settings | File Templates.
-                } catch (InterruptedException e) {
-                    e.printStackTrace();  //To change body of catch statement use File |
Settings | File Templates.
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/f2445c28/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
new file mode 100644
index 0000000..3b118c0
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/UnRegisterThread.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.impl.push.amqp;
+
+import com.rabbitmq.client.Channel;
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.util.CommonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+public class UnRegisterThread extends Thread {
+    private final static Logger logger = LoggerFactory.getLogger(UnRegisterThread.class);
+    private BlockingQueue<MonitorID> finishQueue;
+    private Map<String, Channel> availableChannels;
+
+    public UnRegisterThread(BlockingQueue<MonitorID> monitor, Map<String, Channel>
channels) {
+        this.finishQueue = monitor;
+        this.availableChannels = channels;
+    }
+
+    public void run() {
+        while (true) {
+            try {
+                MonitorID monitorID = this.finishQueue.take();
+                unRegisterListener(monitorID);
+            //
+            } catch (AiravataMonitorException e) {
+                logger.error(e.getLocalizedMessage());
+            } catch (InterruptedException e) {
+                logger.error(e.getLocalizedMessage());
+            }
+        }
+    }
+
+    private boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException
{
+        String channelID = CommonUtils.getChannelID(monitorID);
+        Channel channel = availableChannels.get(channelID);
+        if (channel == null) {
+            logger.error("Already Unregistered the listener");
+            throw new AiravataMonitorException("Already Unregistered the listener");
+        } else {
+            try {
+                channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity",
CommonUtils.getRoutingKey(monitorID));
+            } catch (IOException e) {
+                logger.error("Error unregistering the listener");
+                throw new AiravataMonitorException("Error unregistering the listener");
+            }
+        }
+        return true;
+    }
+}
+


Mime
View raw message