Repository: airavata
Updated Branches:
refs/heads/master 985151fc1 -> a7baac58a
Adding new implementation for monitoring, AMQPbased monitoring : AIRAVATA-1022
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/abb05c8f
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/abb05c8f
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/abb05c8f
Branch: refs/heads/master
Commit: abb05c8f6f9f17b7868dc2be2a97d1598aa1a2a9
Parents: b1a6edc
Author: lahiru <lahiru@apache.org>
Authored: Thu Feb 20 12:02:35 2014 -0500
Committer: lahiru <lahiru@apache.org>
Committed: Thu Feb 20 12:02:35 2014 -0500
----------------------------------------------------------------------
.../job/monitor/core/MessageParser.java | 45 ++++++
.../impl/push/amqp/JSONMessageParser.java | 35 ++++
.../job/monitor/util/AMQPConnectionUtil.java | 80 ++++++++++
.../airavata/job/monitor/util/CommonUtils.java | 32 ++++
.../airavata/job/monitor/util/X509Helper.java | 160 +++++++++++++++++++
.../src/main/resources/monitor.properties | 2 +
6 files changed, 354 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
new file mode 100644
index 0000000..4d54fae
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/core/MessageParser.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.core;
+
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.experiment.JobState;
+
+/**
+ * This is an interface to implement messageparser, it could be
+ * pull based or push based still monitor has to parse the content of
+ * the message it gets from remote monitoring system and finalize
+ * them to internal job state, Ex: JSON parser for AMQP and Qstat reader
+ * for pull based monitor.
+ */
+public interface MessageParser {
+ /**
+ * This method is to implement how to parse the incoming message
+ * and implement a logic to finalize the status of the job,
+ * we have to makesure the correct message is given to the messageparser
+ * parse method, it will not do any filtering
+ * @param message content of the message
+ * @param monitorID monitorID object
+ * @return
+ */
+ JobStatus parseMessage(String message,MonitorID monitorID);
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
new file mode 100644
index 0000000..42ea2d0
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/JSONMessageParser.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.impl.push.amqp;
+
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.core.MessageParser;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+
+import javax.mail.search.MessageIDTerm;
+
+public class JSONMessageParser implements MessageParser {
+ public JobStatus parseMessage(String message,MonitorID monitorID) {
+ /*todo write a json message parser here*/
+ return new JobStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
new file mode 100644
index 0000000..f100b8f
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/AMQPConnectionUtil.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.util;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DefaultSaslConfig;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import java.security.KeyStore;
+import java.util.Collections;
+import java.util.Vector;
+
+public class AMQPConnectionUtil {
+ public static Connection connect(String vhost, String proxyFile) {
+ Vector<String> hosts = new Vector<String>();
+ hosts.add("info1.dyn.teragrid.org");
+ hosts.add("info2.dyn.teragrid.org");
+ Collections.shuffle(hosts);
+ for (String host : hosts) {
+ Connection connection = connect(host, vhost, proxyFile);
+ if (host != null) {
+ System.out.println("connected to " + host);
+ return connection;
+ }
+ }
+ return null;
+ }
+
+ public static Connection connect(String host, String vhost, String proxyFile) {
+ Connection connection;
+ try {
+ String keyPassPhrase = "test123";
+ KeyStore ks = X509Helper.keyStoreFromPEM(proxyFile, keyPassPhrase);
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(ks, keyPassPhrase.toCharArray());
+
+ KeyStore tks = X509Helper.trustKeyStoreFromCertDir();
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
+ tmf.init(tks);
+
+ SSLContext c = SSLContext.getInstance("SSLv3");
+ c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setHost(host);
+ factory.setPort(5671);
+ factory.useSslProtocol(c);
+ factory.setVirtualHost(vhost);
+ factory.setSaslConfig(DefaultSaslConfig.EXTERNAL);
+
+ connection = factory.newConnection();
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ return connection;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
new file mode 100644
index 0000000..2248ec3
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/CommonUtils.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.util;
+
+import org.apache.airavata.job.monitor.MonitorID;
+
+public class CommonUtils {
+ public static String getChannelID(MonitorID monitorID) {
+ return monitorID.getUserName() + monitorID.getHost().getType().getHostName();
+ }
+ public static String getRoutingKey(MonitorID monitorID) {
+ return "*." + monitorID.getUserName() + monitorID.getHost().getType().getHostAddress();
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
new file mode 100644
index 0000000..2ed0b88
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/java/org/apache/airavata/job/monitor/util/X509Helper.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.util;
+
+import org.bouncycastle.openssl.PEMKeyPair;
+import org.bouncycastle.openssl.PEMParser;
+import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+
+import java.io.*;
+import java.security.*;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.security.spec.InvalidKeySpecException;
+
+public class X509Helper {
+
+ static {
+ // parsing of RSA key fails without this
+ java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
+ }
+
+
+
+ public static KeyStore keyStoreFromPEM(String proxyFile,
+ String keyPassPhrase) throws IOException,
+ CertificateException,
+ NoSuchAlgorithmException,
+ InvalidKeySpecException,
+ KeyStoreException {
+ return keyStoreFromPEM(proxyFile,proxyFile,keyPassPhrase);
+ }
+
+ public static KeyStore keyStoreFromPEM(String certFile,
+ String keyFile,
+ String keyPassPhrase) throws IOException,
+ CertificateException,
+ NoSuchAlgorithmException,
+ InvalidKeySpecException,
+ KeyStoreException
{
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ X509Certificate cert = (X509Certificate)cf.generateCertificate(new FileInputStream(certFile));
+ //System.out.println(cert.toString());
+
+ // this works for proxy files, too, since it skips over the certificate
+ BufferedReader reader = new BufferedReader(new FileReader(keyFile));
+ String line = null;
+ StringBuilder builder = new StringBuilder();
+ boolean inKey = false;
+ while((line=reader.readLine()) != null) {
+ if (line.contains("-----BEGIN RSA PRIVATE KEY-----")) {
+ inKey = true;
+ }
+ if (inKey) {
+ builder.append(line);
+ builder.append(System.getProperty("line.separator"));
+ }
+ if (line.contains("-----END RSA PRIVATE KEY-----")) {
+ inKey = false;
+ }
+ }
+ String privKeyPEM = builder.toString();
+ //System.out.println(privKeyPEM);
+
+ // using BouncyCastle
+ PEMParser pemParser = new PEMParser(new StringReader(privKeyPEM));
+ Object object = pemParser.readObject();
+ //System.out.println(object);
+ JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");
+ KeyPair kp = converter.getKeyPair((PEMKeyPair)object);
+ PrivateKey privKey = kp.getPrivate();
+
+ // PEMParser from BouncyCastle is good for reading PEM files, but I didn't want to
add that dependency
+ /*
+ // Base64 decode the data
+ byte[] encoded = javax.xml.bind.DatatypeConverter.parseBase64Binary(privKeyPEM);
+
+ // PKCS8 decode the encoded RSA private key
+ java.security.spec.PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
+ KeyFactory kf = KeyFactory.getInstance("RSA");
+ PrivateKey privKey = kf.generatePrivate(keySpec);
+ //RSAPrivateKey privKey = (RSAPrivateKey)kf.generatePrivate(keySpec);
+ */
+ //System.out.println(privKey.toString());
+
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ keyStore.load(null,null);
+
+ KeyStore.PrivateKeyEntry entry =
+ new KeyStore.PrivateKeyEntry(privKey,
+ new java.security.cert.Certificate[] {(java.security.cert.Certificate)cert});
+ KeyStore.PasswordProtection prot = new KeyStore.PasswordProtection(keyPassPhrase.toCharArray());
+ keyStore.setEntry(cert.getSubjectX500Principal().getName(), entry, prot);
+
+ return keyStore;
+ }
+
+
+ public static KeyStore trustKeyStoreFromCertDir() throws IOException,
+ KeyStoreException,
+ CertificateException,
+ NoSuchAlgorithmException {
+ return trustKeyStoreFromCertDir("/Users/lahirugunathilake/Downloads/certificates");
+ }
+
+ public static KeyStore trustKeyStoreFromCertDir(String certDir) throws IOException,
+ KeyStoreException,
+ CertificateException,
+ NoSuchAlgorithmException
{
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(null,null);
+
+ File dir = new File(certDir);
+ for(File file : dir.listFiles()) {
+ if (!file.isFile()) {
+ continue;
+ }
+ if (!file.getName().endsWith(".0")) {
+ continue;
+ }
+
+ try {
+ //System.out.println("reading file "+file.getName());
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ X509Certificate cert = (X509Certificate) cf.generateCertificate(new FileInputStream(file));
+ //System.out.println(cert.toString());
+
+ KeyStore.TrustedCertificateEntry entry = new KeyStore.TrustedCertificateEntry(cert);
+
+ ks.setEntry(cert.getSubjectX500Principal().getName(), entry, null);
+ } catch (KeyStoreException e) {
+ } catch (CertificateParsingException e) {
+ continue;
+ }
+
+ }
+
+ return ks;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/abb05c8f/modules/airavata-job-monitor/src/main/resources/monitor.properties
----------------------------------------------------------------------
diff --git a/modules/airavata-job-monitor/src/main/resources/monitor.properties b/modules/airavata-job-monitor/src/main/resources/monitor.properties
new file mode 100644
index 0000000..0b0b5f4
--- /dev/null
+++ b/modules/airavata-job-monitor/src/main/resources/monitor.properties
@@ -0,0 +1,2 @@
+amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
+connection.name=xsede_private
\ No newline at end of file
|