libcloud-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anthonys...@apache.org
Subject [12/51] [abbrv] libcloud git commit: fixed ex_edit_firewall_rule as position is not required unless changing firewall rule placement
Date Wed, 31 Oct 2018 03:11:23 GMT
fixed ex_edit_firewall_rule as position is not required unless changing firewall rule placement


Project: http://git-wip-us.apache.org/repos/asf/libcloud/repo
Commit: http://git-wip-us.apache.org/repos/asf/libcloud/commit/fd68cc6d
Tree: http://git-wip-us.apache.org/repos/asf/libcloud/tree/fd68cc6d
Diff: http://git-wip-us.apache.org/repos/asf/libcloud/diff/fd68cc6d

Branch: refs/heads/trunk
Commit: fd68cc6d91531ca44f353e34d0c6938d12e7ba42
Parents: 273486d
Author: mitch <mitch.raful@itaas.dimensiondata.com>
Authored: Thu Sep 6 13:22:53 2018 -0400
Committer: mitch <mitch.raful@itaas.dimensiondata.com>
Committed: Thu Sep 6 13:22:53 2018 -0400

----------------------------------------------------------------------
 libcloud/common/nttcis.py          |  45 ++++++++-
 libcloud/compute/drivers/nttcis.py | 158 ++++++++++++++++++++++++++++----
 tests/lib_create_test.py           |  98 +++++++++++++++++++-
 tests/lib_edit_test.py             |  24 +++++
 tests/lib_list_test.py             |   2 +
 5 files changed, 302 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/libcloud/common/nttcis.py
----------------------------------------------------------------------
diff --git a/libcloud/common/nttcis.py b/libcloud/common/nttcis.py
index e48cbb1..4b5272a 100644
--- a/libcloud/common/nttcis.py
+++ b/libcloud/common/nttcis.py
@@ -929,11 +929,11 @@ class NttCisFirewallRule(object):
                    self.protocol, self.source, self.destination,
                    self.enabled))
 
-
+"""
 class NttCisFirewallAddress(object):
-    """
+    
     The source or destination model in a firewall rule
-    """
+    
     def __init__(self, any_ip, ip_address, ip_prefix_size,
                  port_begin, port_end, address_list_id,
                  port_list_id):
@@ -954,6 +954,45 @@ class NttCisFirewallAddress(object):
             % (self.any_ip, self.ip_address, self.ip_prefix_size,
                self.port_begin, self.port_end, self.address_list_id,
                self.port_list_id))
+"""
+
+
+class NttCisFirewallAddress(object):
+    """
+    The source or destination model in a firewall rule
+    9/4/18: Editing Class to use with ex_create_firewall_rtule method.
+    Will haved to circle back and test for any other uses.
+   """
+
+    def __init__(self, any_ip=None, ip_address=None, ip_prefix_size=None,
+                 port_begin=None, port_end=None, address_list_id=None,
+                 port_list_id=None):
+        """
+        param any_ip: used to set ip address to "ANY"
+        :param ip_address: An ip address of either IPv4 decimal notation or an IPv6 address
+        :param ip_prefix_size: An integer denoting prefix size.
+        :param port_begin: integer for an individual port or start of a list  of ports if
not using a port list
+        :param port_end: integer required if using a list of ports (NOT a port list but a
list starting with port begin)
+        :param address_list_id: An id identifying an address list
+        :param port_list_id:  An id identifying a port list
+        """
+        self.any_ip = any_ip
+        self.ip_address = ip_address
+        self.ip_prefix_size = ip_prefix_size
+        self.port_list_id = port_list_id
+        self.port_begin = port_begin
+        self.port_end = port_end
+        self.address_list_id = address_list_id
+        self.port_list_id = port_list_id
+
+    def __repr__(self):
+        return (
+            '<NttCisFirewallAddress: any_ip=%s, ip_address=%s, '
+            'ip_prefix_size=%s, port_begin=%s, port_end=%s, '
+            'address_list_id=%s, port_list_id=%s>'
+            % (self.any_ip, self.ip_address, self.ip_prefix_size,
+               self.port_begin, self.port_end, self.address_list_id,
+               self.port_list_id))
 
 
 class NttCisNatRule(object):

http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/libcloud/compute/drivers/nttcis.py
----------------------------------------------------------------------
diff --git a/libcloud/compute/drivers/nttcis.py b/libcloud/compute/drivers/nttcis.py
index 1048eaa..0f79bca 100644
--- a/libcloud/compute/drivers/nttcis.py
+++ b/libcloud/compute/drivers/nttcis.py
@@ -1990,9 +1990,10 @@ class NttCisNodeDriver(NodeDriver):
                                       params=params).object
         return self._to_firewall_rules(response, network_domain)
 
+    """
     def ex_create_firewall_rule(self, network_domain, rule, position,
                                 position_relative_to_rule=None):
-        """
+        
         Creates a firewall rule
 
         :param network_domain: The network domain in which to create
@@ -2015,7 +2016,7 @@ class NttCisNodeDriver(NodeDriver):
             :class:`NttCisFirewallRule` or ``str``
 
         :rtype: ``bool``
-        """
+        
         positions_without_rule = ('FIRST', 'LAST')
         positions_with_rule = ('BEFORE', 'AFTER')
 
@@ -2103,8 +2104,123 @@ class NttCisNodeDriver(NodeDriver):
                 rule_id = info.get('value')
         rule.id = rule_id
         return rule
+    """
+
+    def ex_create_firewall_rule(self, network_domain, name, action, ip_version, protocol,
+                                source_addr, destination, position, enabled=1,  position_relative_to_rule=None):
+        """
+        Creates a firewall rule
+
+        :param network_domain: The network domain in which to create
+                                the firewall rule
+        :type  network_domain: :class:`NttCisNetworkDomain` or ``str``
+
+        :param rule: The rule in which to create
+        :type  rule: :class:`NttCisFirewallRule`
+
+        :param position: The position in which to create the rule
+                         There are two types of positions
+                         with position_relative_to_rule arg and without it
+                         With: 'BEFORE' or 'AFTER'
+                         Without: 'FIRST' or 'LAST'
+        :type  position: ``str``
+
+        :param position_relative_to_rule: The rule or rule name in
+                                          which to decide positioning by
+        :type  position_relative_to_rule:
+            :class:`NttCisFirewallRule` or ``str``
+
+        :rtype: ``bool``
+        """
+        positions_without_rule = ('FIRST', 'LAST')
+        positions_with_rule = ('BEFORE', 'AFTER')
+
+        create_node = ET.Element('createFirewallRule', {'xmlns': TYPES_URN})
+        ET.SubElement(create_node, "networkDomainId").text = \
+            self._network_domain_to_network_domain_id(network_domain)
+        ET.SubElement(create_node, "name").text = name
+        ET.SubElement(create_node, "action").text = action
+        ET.SubElement(create_node, "ipVersion").text = ip_version
+        ET.SubElement(create_node, "protocol").text = protocol
+        # Setup source port rule
+        source = ET.SubElement(create_node, "source")
+        if source_addr.address_list_id is not None:
+            source_ip = ET.SubElement(source, 'ipAddressListId')
+            source_ip.text = source_addr.address_list_id
+        else:
+            source_ip = ET.SubElement(source, 'ip')
+            if source_addr.any_ip:
+                source_ip.set('address', 'ANY')
+            else:
+                source_ip.set('address', source.ip_address)
+                if source.ip_prefix_size is not None:
+                    source_ip.set('prefixSize',
+                                  str(source.ip_prefix_size))
+        if source_addr.port_list_id is not None:
+            source_port = ET.SubElement(source, 'portListId')
+            source_port.text = source.port_list_id
+        else:
+            if source_addr.port_begin is not None:
+                source_port = ET.SubElement(source, 'port')
+                source_port.set('begin', source_addr.port_begin)
+            if source_addr.port_end is not None:
+                source_port.set('end', source_addr.port_end)
+        # Setup destination port rule
+        dest = ET.SubElement(create_node, "destination")
+        if destination.address_list_id is not None:
+            dest_ip = ET.SubElement(dest, 'ipAddressListId')
+            dest_ip.text = destination.address_list_id
+        else:
+            dest_ip = ET.SubElement(dest, 'ip')
+            if destination.any_ip:
+                dest_ip.set('address', 'ANY')
+            else:
+                dest_ip.set('address', destination.ip_address)
+                if destination.ip_prefix_size is not None:
+                    dest_ip.set('prefixSize', destination.ip_prefix_size)
+        if destination.port_list_id is not None:
+            dest_port = ET.SubElement(dest, 'portListId')
+            dest_port.text = destination.port_list_id
+        else:
+            if destination.port_begin is not None:
+                dest_port = ET.SubElement(dest, 'port')
+                dest_port.set('begin', destination.port_begin)
+            if destination.port_end is not None:
+                dest_port.set('end', destination.port_end)
+        # Set up positioning of rule
+        ET.SubElement(create_node, "enabled").text = str(enabled)
+        placement = ET.SubElement(create_node, "placement")
+        if position_relative_to_rule is not None:
+            if position not in positions_with_rule:
+                raise ValueError("When position_relative_to_rule is specified"
+                                 " position must be %s"
+                                 % ', '.join(positions_with_rule))
+            if isinstance(position_relative_to_rule,
+                          NttCisFirewallRule):
+                rule_name = position_relative_to_rule.name
+            else:
+                rule_name = position_relative_to_rule
+            placement.set('relativeToRule', rule_name)
+        else:
+            if position not in positions_without_rule:
+                raise ValueError("When position_relative_to_rule is not"
+                                 " specified position must be %s"
+                                 % ', '.join(positions_without_rule))
+        placement.set('position', position)
+
+        response = self.connection.request_with_orgId_api_2(
+            'network/createFirewallRule',
+            method='POST',
+            data=ET.tostring(create_node)).object
+
+        rule_id = None
+        for info in findall(response, 'info', TYPES_URN):
+            if info.get('name') == 'firewallRuleId':
+                rule_id = info.get('value')
+        rule = self.ex_get_firewall_rule(network_domain, rule_id)
+        return rule
 
-    def ex_edit_firewall_rule(self, rule, position,
+    def ex_edit_firewall_rule(self, rule, position=None,
                               relative_rule_for_position=None):
         """
         Edit a firewall rule
@@ -2222,24 +2338,26 @@ class NttCisNodeDriver(NodeDriver):
                 dest_port.set('end', rule.destination.port_end)
         # Set up positioning of rule
         ET.SubElement(edit_node, "enabled").text = str(rule.enabled).lower()
-        placement = ET.SubElement(edit_node, "placement")
-        if relative_rule_for_position is not None:
-            if position not in positions_with_rule:
-                raise ValueError("When position_relative_to_rule is specified"
-                                 " position must be %s"
-                                 % ', '.join(positions_with_rule))
-            if isinstance(relative_rule_for_position,
-                          NttCisFirewallRule):
-                rule_name = relative_rule_for_position.name
+        # changing placement to an option
+        if position is not None:
+            placement = ET.SubElement(edit_node, "placement")
+            if relative_rule_for_position is not None:
+                if position not in positions_with_rule:
+                    raise ValueError("When position_relative_to_rule is specified"
+                                     " position must be %s"
+                                     % ', '.join(positions_with_rule))
+                if isinstance(relative_rule_for_position,
+                              NttCisFirewallRule):
+                    rule_name = relative_rule_for_position.name
+                else:
+                    rule_name = relative_rule_for_position
+                placement.set('relativeToRule', rule_name)
             else:
-                rule_name = relative_rule_for_position
-            placement.set('relativeToRule', rule_name)
-        else:
-            if position not in positions_without_rule:
-                raise ValueError("When position_relative_to_rule is not"
-                                 " specified position must be %s"
-                                 % ', '.join(positions_without_rule))
-        placement.set('position', position)
+                if position not in positions_without_rule:
+                    raise ValueError("When position_relative_to_rule is not"
+                                     " specified position must be %s"
+                                     % ', '.join(positions_without_rule))
+            placement.set('position', position)
 
         response = self.connection.request_with_orgId_api_2(
             'network/editFirewallRule',

http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/tests/lib_create_test.py
----------------------------------------------------------------------
diff --git a/tests/lib_create_test.py b/tests/lib_create_test.py
index a2e85ac..e7d5288 100644
--- a/tests/lib_create_test.py
+++ b/tests/lib_create_test.py
@@ -1,7 +1,9 @@
+from pprint import pprint
 import pytest
 import libcloud
 from libcloud import loadbalancer
-from libcloud.common.nttcis import NttCisAPIException, NttCisVlan
+from libcloud.compute.drivers.nttcis import NttCisPort, NttCisIpAddress
+from libcloud.common.nttcis import NttCisFirewallRule, NttCisVlan, NttCisFirewallAddress
 
 
 def test_deploy_vlan(compute_driver, vlan_name, network_domain_id, base_ipv4_addr):
@@ -39,4 +41,96 @@ def test_delete_server(compute_driver):
     compute_driver.ex_wait_for_state('terminated', compute_driver.ex_get_node_by_id, 2, 240,
server.id)
 
 
-def test_deploy_firewall_rule
\ No newline at end of file
+def test_deploy_firewall_rule_1(compute_driver):
+    domain_name = 'sdk_test_1'
+    domains = compute_driver.ex_list_network_domains(location='EU6')
+    net_domain = [d for d in domains if d.name == domain_name]
+    address_list_name = 'sdk_test_address_list'
+    address_lists = compute_driver.ex_list_ip_address_list('6aafcf08-cb0b-432c-9c64-7371265db086')
+    # using lambda with filter
+
+    # address_list = list(filter(lambda x: address_list_name, address_lists))
+    # address_list_id = address_list[0].id
+
+    # using list comprehension to filter
+
+    address_list = [a for a in address_lists if a.name == address_list_name]
+    address_list_id = address_list[0].id
+
+    port_list_name = 'sdk_test_port_list'
+    port_lists = compute_driver.ex_list_portlist('6aafcf08-cb0b-432c-9c64-7371265db086')
+    port_list = [p for p in port_lists if p.name == port_list_name]
+    port_list_id = port_list[0].id
+    dest_firewall_address = NttCisFirewallAddress(address_list_id=address_list_id, port_list_id=port_list_id)
+    source_firewall_address = NttCisFirewallAddress(any_ip='ANY')
+    rule = compute_driver.ex_create_firewall_rule(net_domain[0], 'sdk_test_firewall_rule_1',
'ACCEPT_DECISIVELY',
+                                                  'IPV4', 'TCP', source_firewall_address,
dest_firewall_address, 'LAST')
+    print(rule)
+    assert isinstance(rule, NttCisFirewallRule)
+
+
+def test_deploy_firewall_rule_2(compute_driver):
+    domain_name = 'sdk_test_1'
+    domains = compute_driver.ex_list_network_domains(location='EU6')
+    net_domain = [d for d in domains if d.name == domain_name]
+    source_firewall_address = NttCisFirewallAddress(any_ip='ANY')
+    dest_firewall_address = NttCisFirewallAddress(ip_address='10.2.0.0', ip_prefix_size='16',
+                                                  port_begin='8000', port_end='8080')
+
+    rule = compute_driver.ex_create_firewall_rule(net_domain[0], 'sdk_test_firewall_rule_2',
'ACCEPT_DECISIVELY',
+                                                  'IPV4', 'TCP', source_firewall_address,
dest_firewall_address, 'LAST')
+    print(rule)
+    assert isinstance(rule, NttCisFirewallRule)
+
+
+def test_deploy_firewall_rule_3(compute_driver):
+    domain_name = 'sdk_test_1'
+    domains = compute_driver.ex_list_network_domains(location='EU6')
+    net_domain = [d for d in domains if d.name == domain_name]
+    source_firewall_address = NttCisFirewallAddress(any_ip='ANY')
+    dest_firewall_address = NttCisFirewallAddress(ip_address='10.2.0.0', ip_prefix_size='16',
+                                                  port_begin='25')
+    rule_name = 'sdk_test_firewall_rule_2'
+    rules = compute_driver.ex_list_firewall_rules(net_domain[0])
+    rule = [rule for rule in rules if rule.name == rule_name]
+    relative_to = compute_driver.ex_get_firewall_rule(net_domain[0], rule[0].id)
+    rule = compute_driver.ex_create_firewall_rule(net_domain[0], 'sdk_test_firewall_rule_3',
'ACCEPT_DECISIVELY',
+                                                  'IPV4', 'TCP', source_firewall_address,
dest_firewall_address,
+                                                  'BEFORE', position_relative_to_rule=relative_to)
+    print(rule)
+    assert isinstance(rule, NttCisFirewallRule)
+
+
+def test_create_port_list(compute_driver):
+    """
+    An optional named argument, child_portlist_list, which takes the id of an existing
+    port list to include in this port list.
+    """
+    domain_name = 'sdk_test_1'
+    domains = compute_driver.ex_list_network_domains(location='EU6')
+    net_domain = [d for d in domains if d.name == domain_name]
+    port_list_name = 'sdk_test_port_list'
+    description = 'A test port list'
+    port_list = [NttCisPort(begin='8000', end='8080')]
+    result = compute_driver.ex_create_portlist(net_domain[0], port_list_name, description,
port_list)
+    assert result is True
+
+
+def test_create_address_list(compute_driver):
+    """
+        An optional named argument, child_ip_address_list, which takes the id of an existing
+        port list to include in this port list.
+        """
+    domain_name = 'sdk_test_1'
+    domains = compute_driver.ex_list_network_domains(location='EU6')
+    net_domain = [d for d in domains if d.name == domain_name]
+    address_list_name = 'sdk_test_address_list'
+    description = 'A test address list'
+    ip_version = 'IPV4'
+    # An optional prefix list can be specified as a named argument, prefix_size=
+    address_list = [NttCisIpAddress('10.2.0.1', end='10.2.0.11')]
+
+    result = compute_driver.ex_create_ip_address_list(net_domain[0], address_list_name,
+                                  description,
+                                  ip_version, address_list)
+    assert result is True
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/tests/lib_edit_test.py
----------------------------------------------------------------------
diff --git a/tests/lib_edit_test.py b/tests/lib_edit_test.py
index 9c82c6d..49a3f80 100644
--- a/tests/lib_edit_test.py
+++ b/tests/lib_edit_test.py
@@ -227,6 +227,19 @@ def test_change_nic_type(compute_driver):
     assert result is True
 
 
+def test_edit_firewall_rule(compute_driver):
+    domain_name = 'sdk_test_1'
+    domains = compute_driver.ex_list_network_domains(location='EU6')
+    net_domain = [d for d in domains if d.name == domain_name]
+    rule_name = 'sdk_test_firewall_rule_2'
+    rules = compute_driver.ex_list_firewall_rules(net_domain[0])
+    rule = [rule for rule in rules if rule.name == rule_name]
+    rule[0].destination.port_end = None
+    result = compute_driver.ex_edit_firewall_rule(rule[0])
+    print(compute_driver.ex_get_firewall_rule(net_domain[0].id, rule[0].id))
+    assert result is True
+
+
 def test_create_anti_affinity_rule(compute_driver):
     server1 = compute_driver.ex_get_node_by_id("d0425097-202f-4bba-b268-c7a73b8da129")
     server2 = compute_driver.ex_get_node_by_id("803e5e00-b22a-450a-8827-066ff15ec977")
@@ -241,8 +254,19 @@ def test_delete_anti_affinity_rule(compute_driver):
     assert result is True
 
 
+def test_delete_port_list(compute_driver):
+    portlists = compute_driver.ex_list_portlist('6aafcf08-cb0b-432c-9c64-7371265db086')
+    port_list_to_delete = [plist for plist in portlists if plist.name == 'sdk_test_port_list']
+    result = compute_driver.ex_delete_portlist(port_list_to_delete[0])
+    assert result is True
 
 
+def test_delete_address_list(compute_driver):
+    domain_name = 'sdk_test_1'
+    domains = compute_driver.ex_list_network_domains(location='EU6')
+    net_domain = [d for d in domains if d.name == domain_name]
+    addresslist_to_delete = compute_driver.ex_get_ip_address_list(net_domain[0], 'sdk_test_address_list')
+    print(addresslist_to_delete)
 
 def test_list_locations(compute_driver):
     locations = compute_driver.list_locations()

http://git-wip-us.apache.org/repos/asf/libcloud/blob/fd68cc6d/tests/lib_list_test.py
----------------------------------------------------------------------
diff --git a/tests/lib_list_test.py b/tests/lib_list_test.py
index d86b96b..9497473 100644
--- a/tests/lib_list_test.py
+++ b/tests/lib_list_test.py
@@ -298,6 +298,8 @@ def test_list_no_anti_affinity_rules(compute_driver):
     anti_affinity_rules = compute_driver.ex_list_anti_affinity_rules(node=node)
     assert len(anti_affinity_rules) == 0
 
+
+
 """
 def test_list_sizes(compute_driver):
     properties = compute_driver.list_locations()


Mime
View raw message