libcloud-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From to...@apache.org
Subject svn commit: r1351044 - /libcloud/trunk/test/compute/test_ktucloud.py
Date Sun, 17 Jun 2012 03:41:03 GMT
Author: tomaz
Date: Sun Jun 17 03:41:03 2012
New Revision: 1351044

URL: http://svn.apache.org/viewvc?rev=1351044&view=rev
Log:
Forgot to add this file.

Added:
    libcloud/trunk/test/compute/test_ktucloud.py

Added: libcloud/trunk/test/compute/test_ktucloud.py
URL: http://svn.apache.org/viewvc/libcloud/trunk/test/compute/test_ktucloud.py?rev=1351044&view=auto
==============================================================================
--- libcloud/trunk/test/compute/test_ktucloud.py (added)
+++ libcloud/trunk/test/compute/test_ktucloud.py Sun Jun 17 03:41:03 2012
@@ -0,0 +1,102 @@
+import sys
+import unittest
+
+from libcloud.utils.py3 import httplib
+from libcloud.utils.py3 import urlparse
+
+try:
+    import simplejson as json
+except ImportError:
+    import json
+
+try:
+    parse_qsl = urlparse.parse_qsl
+except AttributeError:
+    import cgi
+    parse_qsl = cgi.parse_qsl
+
+from libcloud.compute.drivers.ktucloud import KTUCloudNodeDriver
+from libcloud.compute.types import DeploymentError, LibcloudError
+
+from test import MockHttpTestCase
+from test.compute import TestCaseMixin
+from test.file_fixtures import ComputeFileFixtures
+
+
+class KTUCloudNodeDriverTest(unittest.TestCase, TestCaseMixin):
+    def setUp(self):
+        KTUCloudNodeDriver.connectionCls.conn_classes = \
+            (None, KTUCloudStackMockHttp)
+        self.driver = KTUCloudNodeDriver('apikey', 'secret',
+                                           path='/test/path',
+                                           host='api.dummy.com')
+        self.driver.path = '/test/path'
+        self.driver.type = -1
+        KTUCloudStackMockHttp.fixture_tag = 'default'
+        self.driver.connection.poll_interval = 0.0
+
+    def test_create_node_immediate_failure(self):
+        size = self.driver.list_sizes()[0]
+        image = self.driver.list_images()[0]
+        KTUCloudStackMockHttp.fixture_tag = 'deployfail'
+        try:
+            self.driver.create_node(name='node-name', image=image, size=size)
+        except:
+            return
+        self.assertTrue(False)
+
+    def test_create_node_delayed_failure(self):
+        size = self.driver.list_sizes()[0]
+        image = self.driver.list_images()[0]
+        KTUCloudStackMockHttp.fixture_tag = 'deployfail2'
+        try:
+            self.driver.create_node(name='node-name', image=image, size=size)
+        except:
+            return
+        self.assertTrue(False)
+
+    def test_list_images_no_images_available(self):
+        KTUCloudStackMockHttp.fixture_tag = 'notemplates'
+
+        images = self.driver.list_images()
+        self.assertEquals(0, len(images))
+
+
+class KTUCloudStackMockHttp(MockHttpTestCase):
+    fixtures = ComputeFileFixtures('ktucloud')
+    fixture_tag = 'default'
+
+    def _load_fixture(self, fixture):
+        body = self.fixtures.load(fixture)
+        return body, json.loads(body)
+
+    def _test_path(self, method, url, body, headers):
+        url = urlparse.urlparse(url)
+        query = dict(parse_qsl(url.query))
+
+        self.assertTrue('apiKey' in query)
+        self.assertTrue('command' in query)
+        self.assertTrue('response' in query)
+        self.assertTrue('signature' in query)
+
+        self.assertTrue(query['response'] == 'json')
+
+        del query['apiKey']
+        del query['response']
+        del query['signature']
+        command = query.pop('command')
+
+        if hasattr(self, '_cmd_' + command):
+            return getattr(self, '_cmd_' + command)(**query)
+        else:
+            fixture = command + '_' + self.fixture_tag + '.json'
+            body, obj = self._load_fixture(fixture)
+            return (httplib.OK, body, obj, httplib.responses[httplib.OK])
+
+    def _cmd_queryAsyncJobResult(self, jobid):
+        fixture = 'queryAsyncJobResult' + '_' + str(jobid) + '.json'
+        body, obj = self._load_fixture(fixture)
+        return (httplib.OK, body, obj, httplib.responses[httplib.OK])
+
+if __name__ == '__main__':
+    sys.exit(unittest.main())



Mime
View raw message