airavata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lah...@apache.org
Subject [1/2] git commit: Adding new implementation for monitoring, AMQPbased monitoring : AIRAVATA-1022
Date Thu, 20 Feb 2014 17:02:56 GMT
Repository: airavata
Updated Branches:
  refs/heads/master 985151fc1 -> a7baac58a


Adding new implementation for monitoring, AMQPbased monitoring : AIRAVATA-1022


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/abb05c8f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/abb05c8f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/abb05c8f

Branch: refs/heads/master
Commit: abb05c8f6f9f17b7868dc2be2a97d1598aa1a2a9
Parents: b1a6edc
Author: lahiru <lahiru@apache.org>
Authored: Thu Feb 20 12:02:35 2014 -0500
Committer: lahiru <lahiru@apache.org>
Committed: Thu Feb 20 12:02:35 2014 -0500

----------------------------------------------------------------------
 .../job/monitor/core/MessageParser.java         |  45 ++++++
 .../impl/push/amqp/JSONMessageParser.java       |  35 ++++
 .../job/monitor/util/AMQPConnectionUtil.java    |  80 ++++++++++
 .../airavata/job/monitor/util/CommonUtils.java  |  32 ++++
 .../airavata/job/monitor/util/X509Helper.java   | 160 +++++++++++++++++++
 .../src/main/resources/monitor.properties       |   2 +
 6 files changed, 354 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
new file mode 100644
index 0000000..4d54fae
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.core;
+
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.experiment.JobState;
+
+/**
+ * This is an interface to implement messageparser, it could be
+ * pull based or push based still monitor has to parse the content of
+ * the message it gets from remote monitoring system and finalize
+ * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
+ * for pull based monitor.
+ */
+public interface MessageParser {
+    /**
+     * This method is to implement how to parse the incoming message
+     * and implement a logic to finalize the status of the job,
+     * we have to makesure the correct message is given to the messageparser
+     * parse method, it will not do any filtering
+     * @param message content of the message
+     * @param monitorID monitorID object
+     * @return
+     */
+    JobStatus parseMessage(String message,MonitorID monitorID);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
new file mode 100644
index 0000000..42ea2d0
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.impl.push.amqp;
+
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.core.MessageParser;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+
+import javax.mail.search.MessageIDTerm;
+
+public class JSONMessageParser implements MessageParser {
+    public JobStatus parseMessage(String message,MonitorID monitorID) {
+        /*todo write a json message parser here*/
+        return new JobStatus();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
new file mode 100644
index 0000000..f100b8f
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.util;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultSaslConfig;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.Vector;
+
+public class AMQPConnectionUtil {
+    public static Connection connect(String vhost, String proxyFile) {
+        Vector<String> hosts = new Vector<String>();
+        hosts.add("info1.dyn.teragrid.org");
+        hosts.add("info2.dyn.teragrid.org");
+        Collections.shuffle(hosts);
+        for (String host : hosts) {
+            Connection connection = connect(host, vhost, proxyFile);
+            if (host != null) {
+                System.out.println("connected to " + host);
+                return connection;
+            }
+        }
+        return null;
+    }
+
+    public static Connection connect(String host, String vhost, String proxyFile) {
+        Connection connection;
+        try {
+            String keyPassPhrase = "test123";
+            KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase);
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+            kmf.init(ks, keyPassPhrase.toCharArray());
+
+            KeyStore tks = X509Helper.trustKeyStoreFromCertDir();
+            TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+            tmf.init(tks);
+
+            SSLContext c = SSLContext.getInstance("SSLv3");
+            c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+            ConnectionFactory factory = new ConnectionFactory();
+            factory.setHost(host);
+            factory.setPort(5671);
+            factory.useSslProtocol(c);
+            factory.setVirtualHost(vhost);
+            factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
+
+            connection = factory.newConnection();
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+        return connection;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
new file mode 100644
index 0000000..2248ec3
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.util;
+
+import org.apache.airavata.job.monitor.MonitorID;
+
+public class CommonUtils {
+    public static String getChannelID(MonitorID monitorID) {
+        return monitorID.getUserName() + monitorID.getHost().getType().getHostName();
+    }
+    public static String getRoutingKey(MonitorID monitorID) {
+        return "*." + monitorID.getUserName() + monitorID.getHost().getType().getHostAddress();
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
new file mode 100644
index 0000000..2ed0b88
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.util;
+
+import org.bouncycastle.openssl.PEMKeyPair;
+import org.bouncycastle.openssl.PEMParser;
+import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+
+import java.io.*;
+import java.security.*;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.security.spec.InvalidKeySpecException;
+
+public class X509Helper {
+
+    static {
+        // parsing of RSA key fails without this
+        java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+    }
+
+
+
+    public static KeyStore keyStoreFromPEM(String proxyFile,
+                                           String keyPassPhrase) throws IOException,
+            CertificateException,
+            NoSuchAlgorithmException,
+            InvalidKeySpecException,
+            KeyStoreException {
+        return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase);
+    }
+
+    public static KeyStore keyStoreFromPEM(String certFile,
+                                           String keyFile,
+                                           String keyPassPhrase) throws IOException,
+                                                                        CertificateException,
+                                                                        NoSuchAlgorithmException,
+                                                                        InvalidKeySpecException,
+                                                                        KeyStoreException
{
+        CertificateFactory cf = CertificateFactory.getInstance("X.509");
+        X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile));
+        //System.out.println(cert.toString());
+
+        // this works for proxy files, too, since it skips over the certificate
+        BufferedReader reader = new BufferedReader(new FileReader(keyFile));
+        String line = null;
+        StringBuilder builder = new StringBuilder();
+        boolean inKey = false;
+        while((line=reader.readLine()) != null) {
+            if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) {
+                inKey = true;
+            }
+            if (inKey) {
+                builder.append(line);
+                builder.append(System.getProperty("line.separator"));
+            }
+            if (line.contains("-----END RSA PRIVATE KEY-----")) {
+                inKey = false;
+            }
+        }
+        String privKeyPEM = builder.toString();
+        //System.out.println(privKeyPEM);
+
+        // using BouncyCastle
+        PEMParser pemParser = new PEMParser(new StringReader(privKeyPEM));
+        Object object = pemParser.readObject();
+        //System.out.println(object);
+        JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
+        KeyPair kp = converter.getKeyPair((PEMKeyPair)object);
+        PrivateKey privKey = kp.getPrivate();
+
+        // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to
add that dependency
+        /*
+        // Base64 decode the data
+        byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM);
+
+        // PKCS8 decode the encoded RSA private key
+        java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
+        KeyFactory kf = KeyFactory.getInstance("RSA");
+        PrivateKey privKey = kf.generatePrivate(keySpec);
+        //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec);
+        */
+        //System.out.println(privKey.toString());
+
+        KeyStore keyStore = KeyStore.getInstance("PKCS12");
+        keyStore.load(null,null);
+
+        KeyStore.PrivateKeyEntry entry =
+            new KeyStore.PrivateKeyEntry(privKey,
+                                         new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert});
+        KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray());
+        keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot);
+
+        return keyStore;
+    }
+
+
+    public static KeyStore trustKeyStoreFromCertDir() throws IOException,
+                                                             KeyStoreException,
+                                                             CertificateException,
+                                                             NoSuchAlgorithmException {
+        return trustKeyStoreFromCertDir("/Users/lahirugunathilake/Downloads/certificates");
+    }
+
+    public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException,
+                                                                           KeyStoreException,
+                                                                           CertificateException,
+                                                                           NoSuchAlgorithmException
{
+        KeyStore ks = KeyStore.getInstance("JKS");
+        ks.load(null,null);
+
+        File dir = new File(certDir);
+        for(File file : dir.listFiles()) {
+            if (!file.isFile()) {
+                continue;
+            }
+            if (!file.getName().endsWith(".0")) {
+                continue;
+            }
+
+            try {
+                //System.out.println("reading file "+file.getName());
+                CertificateFactory cf = CertificateFactory.getInstance("X.509");
+                X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file));
+                //System.out.println(cert.toString());
+
+                KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert);
+
+                ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null);
+            } catch (KeyStoreException e) {
+            } catch (CertificateParsingException e) {
+                continue;
+            }
+
+        }
+
+        return ks;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/resources/monitor.properties b/modules/airavata-job-monitor/src/main/resources/monitor.properties
new file mode 100644
index 0000000..0b0b5f4
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/resources/monitor.properties
@@ -0,0 +1,2 @@
+amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+connection.name=xsede_private
\ No newline at end of file


Mime
View raw message