libcloud-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anthonys...@apache.org
Subject [1/2] libcloud git commit: Reintroduce S3 multipart upload support with signature v4
Date Sat, 01 Apr 2017 02:36:12 GMT
Repository: libcloud
Updated Branches:
  refs/heads/trunk 1085bdb41 -> 22cb3add4


Reintroduce S3 multipart upload support with signature v4

All S3 regions now support signature V4. Rather than try to implement multipart
uploads with both V2 and V4, all region drivers have been converted to use
the latter. All multipart changes are based on code which existed in v1.5.0.

Some minor adjustments were made to the use of requests made throughout
the BaseS3StorageDriver class. The practice of constructing request paths with
parameters included needed to be removed. It causes issues during the
signature calculation. The solution is to use the params argument instead.
In addition, a fix was necessary in the signature calculation to not compute a
payload hash in the event of file uploads. It seems this was already taken into
consideration but failed to account for the scenario where an iterator is
passed to the data argument.

Closes #1005


Project: http://git-wip-us.apache.org/repos/asf/libcloud/repo
Commit: http://git-wip-us.apache.org/repos/asf/libcloud/commit/f1c78074
Tree: http://git-wip-us.apache.org/repos/asf/libcloud/tree/f1c78074
Diff: http://git-wip-us.apache.org/repos/asf/libcloud/diff/f1c78074

Branch: refs/heads/trunk
Commit: f1c7807453dc158221a05d26a7ce1416f2465683
Parents: 1085bdb
Author: Alex Misstear <amisstea@redhat.com>
Authored: Wed Mar 8 16:29:27 2017 -0500
Committer: Anthony Shaw <anthonyshaw@apache.org>
Committed: Sat Apr 1 13:34:55 2017 +1100

----------------------------------------------------------------------
 libcloud/common/aws.py           |   4 +-
 libcloud/storage/drivers/s3.py   | 352 ++++++++++++++++++++++++++++------
 libcloud/test/storage/test_s3.py |  54 +++---
 3 files changed, 330 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/libcloud/blob/f1c78074/libcloud/common/aws.py
----------------------------------------------------------------------
diff --git a/libcloud/common/aws.py b/libcloud/common/aws.py
index c26bcaa..02dbf13 100644
--- a/libcloud/common/aws.py
+++ b/libcloud/common/aws.py
@@ -322,9 +322,11 @@ class AWSRequestSignerAlgorithmV4(AWSRequestSigner):
     def _get_payload_hash(self, method, data=None):
         if method in ('POST', 'PUT'):
             if data:
+                if hasattr(data, 'next') or hasattr(data, '__next__'):
+                    # File upload; don't try to read the entire payload
+                    return UNSIGNED_PAYLOAD
                 return _hash(data)
             else:
-                # When upload file, we can't know payload here even if given
                 return UNSIGNED_PAYLOAD
         else:
             return _hash('')

http://git-wip-us.apache.org/repos/asf/libcloud/blob/f1c78074/libcloud/storage/drivers/s3.py
----------------------------------------------------------------------
diff --git a/libcloud/storage/drivers/s3.py b/libcloud/storage/drivers/s3.py
index ce25ebe..541256f 100644
--- a/libcloud/storage/drivers/s3.py
+++ b/libcloud/storage/drivers/s3.py
@@ -26,7 +26,6 @@ except ImportError:
 
 from libcloud.utils.py3 import httplib
 from libcloud.utils.py3 import urlquote
-from libcloud.utils.py3 import urlencode
 from libcloud.utils.py3 import b
 from libcloud.utils.py3 import tostring
 
@@ -176,6 +175,20 @@ class S3Connection(AWSTokenConnection, BaseS3Connection):
     pass
 
 
+class S3SignatureV4Connection(SignedAWSConnection, BaseS3Connection):
+    service_name = 's3'
+    version = API_VERSION
+
+    def __init__(self, user_id, key, secure=True, host=None, port=None,
+                 url=None, timeout=None, proxy_url=None, token=None,
+                 retry_delay=None, backoff=None):
+        super(S3SignatureV4Connection, self).__init__(
+            user_id, key, secure, host,
+            port, url, timeout, proxy_url,
+            token, retry_delay, backoff,
+            4)  # force version 4
+
+
 class S3MultipartUpload(object):
     """
     Class representing an amazon s3 multipart upload
@@ -436,20 +449,180 @@ class BaseS3StorageDriver(StorageDriver):
                                 verify_hash=verify_hash,
                                 storage_class=ex_storage_class)
 
-    def _abort_multipart(self, object_path, upload_id):
+    def _initiate_multipart(self, container, object_name, headers=None):
+        """
+        Initiates a multipart upload to S3
+
+        :param container: The destination container
+        :type container: :class:`Container`
+
+        :param object_name: The name of the object which we are uploading
+        :type object_name: ``str``
+
+        :keyword headers: Additional headers to send with the request
+        :type headers: ``dict``
+
+        :return: The id of the newly created multipart upload
+        :rtype: ``str``
+        """
+        headers = headers or {}
+
+        request_path = self._get_object_path(container, object_name)
+        params = {'uploads': ''}
+
+        response = self.connection.request(request_path, method='POST',
+                                           headers=headers, params=params)
+
+        if response.status != httplib.OK:
+            raise LibcloudError('Error initiating multipart upload',
+                                driver=self)
+
+        return findtext(element=response.object, xpath='UploadId',
+                        namespace=self.namespace)
+
+    def _upload_multipart_chunks(self, container, object_name, upload_id,
+                                 stream, calculate_hash=True):
+        """
+        Uploads data from an iterator in fixed sized chunks to S3
+
+        :param container: The destination container
+        :type container: :class:`Container`
+
+        :param object_name: The name of the object which we are uploading
+        :type object_name: ``str``
+
+        :param upload_id: The upload id allocated for this multipart upload
+        :type upload_id: ``str``
+
+        :param stream: The generator for fetching the upload data
+        :type stream: ``generator``
+
+        :keyword calculate_hash: Indicates if we must calculate the data hash
+        :type calculate_hash: ``bool``
+
+        :return: A tuple of (chunk info, checksum, bytes transferred)
+        :rtype: ``tuple``
+        """
+        data_hash = None
+        if calculate_hash:
+            data_hash = self._get_hash_function()
+
+        bytes_transferred = 0
+        count = 1
+        chunks = []
+        params = {'uploadId': upload_id}
+
+        request_path = self._get_object_path(container, object_name)
+
+        # Read the input data in chunk sizes suitable for AWS
+        for data in read_in_chunks(stream, chunk_size=CHUNK_SIZE,
+                                   fill_size=True, yield_empty=True):
+            bytes_transferred += len(data)
+
+            if calculate_hash:
+                data_hash.update(data)
+
+            chunk_hash = self._get_hash_function()
+            chunk_hash.update(data)
+            chunk_hash = base64.b64encode(chunk_hash.digest()).decode('utf-8')
+
+            # The Content-MD5 header provides an extra level of data check and
+            # is recommended by amazon
+            headers = {
+                'Content-Length': len(data),
+                'Content-MD5': chunk_hash,
+            }
+
+            params['partNumber'] = count
+
+            resp = self.connection.request(request_path, method='PUT',
+                                           data=data, headers=headers,
+                                           params=params)
+
+            if resp.status != httplib.OK:
+                raise LibcloudError('Error uploading chunk', driver=self)
+
+            server_hash = resp.headers['etag'].replace('"', '')
+
+            # Keep this data for a later commit
+            chunks.append((count, server_hash))
+            count += 1
+
+        if calculate_hash:
+            data_hash = data_hash.hexdigest()
+
+        return (chunks, data_hash, bytes_transferred)
+
+    def _commit_multipart(self, container, object_name, upload_id, chunks):
+        """
+        Makes a final commit of the data.
+
+        :param container: The destination container
+        :type container: :class:`Container`
+
+        :param object_name: The name of the object which we are uploading
+        :type object_name: ``str``
+
+        :param upload_id: The upload id allocated for this multipart upload
+        :type upload_id: ``str``
+
+        :param chunks: A list of (chunk_number, chunk_hash) tuples.
+        :type chunks: ``list``
+
+        :return: The server side hash of the uploaded data
+        :rtype: ``str``
+        """
+        root = Element('CompleteMultipartUpload')
+
+        for (count, etag) in chunks:
+            part = SubElement(root, 'Part')
+            part_no = SubElement(part, 'PartNumber')
+            part_no.text = str(count)
+
+            etag_id = SubElement(part, 'ETag')
+            etag_id.text = str(etag)
+
+        data = tostring(root)
+
+        headers = {'Content-Length': len(data)}
+        params = {'uploadId': upload_id}
+        request_path = self._get_object_path(container, object_name)
+        response = self.connection.request(request_path, headers=headers,
+                                           params=params, data=data,
+                                           method='POST')
+
+        if response.status != httplib.OK:
+            element = response.object
+            # pylint: disable=maybe-no-member
+            code, message = response._parse_error_details(element=element)
+            msg = 'Error in multipart commit: %s (%s)' % (message, code)
+            raise LibcloudError(msg, driver=self)
+
+        # Get the server's etag to be passed back to the caller
+        body = response.parse_body()
+        server_hash = body.find(fixxpath(xpath='ETag',
+                                         namespace=self.namespace)).text
+        return server_hash
+
+    def _abort_multipart(self, container, object_name, upload_id):
         """
         Aborts an already initiated multipart upload
 
-        :param object_path: Server side object path.
-        :type object_path: ``str``
+        :param container: The destination container
+        :type container: :class:`Container`
+
+        :param object_name: The name of the object which we are uploading
+        :type object_name: ``str``
 
-        :param upload_id: ID of the multipart upload.
+        :param upload_id: The upload id allocated for this multipart upload
         :type upload_id: ``str``
         """
 
         params = {'uploadId': upload_id}
-        request_path = '?'.join((object_path, urlencode(params)))
-        resp = self.connection.request(request_path, method='DELETE')
+        request_path = self._get_object_path(container, object_name)
+
+        resp = self.connection.request(request_path, method='DELETE',
+                                       params=params)
 
         if resp.status != httplib.NO_CONTENT:
             raise LibcloudError('Error in multipart abort. status_code=%d' %
@@ -471,8 +644,12 @@ class BaseS3StorageDriver(StorageDriver):
         # Amazon provides a different (complex?) mechanism to do multipart
         # uploads
         if self.supports_s3_multipart_upload:
-            # @TODO: This needs implementing again from scratch.
-            pass
+            return self._put_object_multipart(container=container,
+                                              object_name=object_name,
+                                              extra=extra,
+                                              stream=iterator,
+                                              verify_hash=False,
+                                              storage_class=ex_storage_class)
         return self._put_object(container=container, object_name=object_name,
                                 extra=extra, method=method, query_args=params,
                                 stream=iterator, verify_hash=False,
@@ -515,8 +692,8 @@ class BaseS3StorageDriver(StorageDriver):
             raise LibcloudError('Feature not supported', driver=self)
 
         # Get the data for a specific container
-        request_path = '%s/?uploads' % (self._get_container_path(container))
-        params = {'max-uploads': RESPONSES_PER_REQUEST}
+        request_path = self._get_container_path(container)
+        params = {'max-uploads': RESPONSES_PER_REQUEST, 'uploads': ''}
 
         if prefix:
             params['prefix'] = prefix
@@ -587,8 +764,7 @@ class BaseS3StorageDriver(StorageDriver):
         # Iterate through the container and delete the upload ids
         for upload in self.ex_iterate_multipart_uploads(container, prefix,
                                                         delimiter=None):
-            object_path = '/%s/%s' % (container.name, upload.key)
-            self._abort_multipart(object_path, upload.id)
+            self._abort_multipart(container, upload.key, upload.id)
 
     def _clean_object_name(self, name):
         name = urlquote(name)
@@ -599,13 +775,8 @@ class BaseS3StorageDriver(StorageDriver):
                     stream=None, verify_hash=True, storage_class=None):
         headers = {}
         extra = extra or {}
-        storage_class = storage_class or 'standard'
-        if storage_class not in ['standard', 'reduced_redundancy']:
-            raise ValueError(
-                'Invalid storage class value: %s' % (storage_class))
 
-        key = self.http_vendor_prefix + '-storage-class'
-        headers[key] = storage_class.upper()
+        headers.update(self._to_storage_class_headers(storage_class))
 
         content_type = extra.get('content_type', None)
         meta_data = extra.get('meta_data', None)
@@ -656,6 +827,95 @@ class BaseS3StorageDriver(StorageDriver):
                 'Unexpected status code, status_code=%s' % (response.status),
                 driver=self)
 
+    def _put_object_multipart(self, container, object_name, stream,
+                              extra=None, verify_hash=False,
+                              storage_class=None):
+        """
+        Uploads an object using the S3 multipart algorithm.
+
+        :param container: The destination container
+        :type container: :class:`Container`
+
+        :param object_name: The name of the object which we are uploading
+        :type object_name: ``str``
+
+        :param stream: The generator for fetching the upload data
+        :type stream: ``generator``
+
+        :keyword verify_hash: Indicates if we must calculate the data hash
+        :type verify_hash: ``bool``
+
+        :keyword extra: Additional options
+        :type extra: ``dict``
+
+        :keyword storage_class: The name of the S3 object's storage class
+        :type extra: ``str``
+
+        :return: The uploaded object
+        :rtype: :class:`Object`
+        """
+        headers = {}
+        extra = extra or {}
+
+        headers.update(self._to_storage_class_headers(storage_class))
+
+        content_type = extra.get('content_type', None)
+        meta_data = extra.get('meta_data', None)
+        acl = extra.get('acl', None)
+
+        if content_type:
+            headers['Content-Type'] = content_type
+
+        if meta_data:
+            for key, value in list(meta_data.items()):
+                key = self.http_vendor_prefix + '-meta-%s' % (key)
+                headers[key] = value
+
+        if acl:
+            headers[self.http_vendor_prefix + '-acl'] = acl
+
+        upload_id = self._initiate_multipart(container, object_name,
+                                             headers=headers)
+
+        try:
+            result = self._upload_multipart_chunks(container, object_name,
+                                                   upload_id, stream,
+                                                   calculate_hash=verify_hash)
+            chunks, data_hash, bytes_transferred = result
+
+            # Commit the chunk info and complete the upload
+            etag = self._commit_multipart(container, object_name, upload_id,
+                                          chunks)
+        except Exception:
+            # Amazon provides a mechanism for aborting an upload.
+            self._abort_multipart(container, object_name, upload_id)
+            raise
+
+        return Object(
+            name=object_name, size=bytes_transferred, hash=etag,
+            extra={'acl': acl}, meta_data=meta_data, container=container,
+            driver=self)
+
+    def _to_storage_class_headers(self, storage_class):
+        """
+        Generates request headers given a storage class name.
+
+        :keyword storage_class: The name of the S3 object's storage class
+        :type extra: ``str``
+
+        :return: Headers to include in a request
+        :rtype: :dict:
+        """
+        headers = {}
+        storage_class = storage_class or 'standard'
+        if storage_class not in ['standard', 'reduced_redundancy']:
+            raise ValueError(
+                'Invalid storage class value: %s' % (storage_class))
+
+        key = self.http_vendor_prefix + '-storage-class'
+        headers[key] = storage_class.upper()
+        return headers
+
     def _to_containers(self, obj, xpath):
         for element in obj.findall(fixxpath(xpath=xpath,
                                             namespace=self.namespace)):
@@ -731,10 +991,12 @@ class BaseS3StorageDriver(StorageDriver):
 
 
 class S3StorageDriver(AWSDriver, BaseS3StorageDriver):
-    connectionCls = S3Connection
+    name = 'Amazon S3 (us-east-1)'
+    connectionCls = S3SignatureV4Connection
+    region_name = 'us-east-1'
 
 
-class S3USWestConnection(S3Connection):
+class S3USWestConnection(S3SignatureV4Connection):
     host = S3_US_WEST_HOST
 
 
@@ -742,9 +1004,10 @@ class S3USWestStorageDriver(S3StorageDriver):
     name = 'Amazon S3 (us-west-1)'
     connectionCls = S3USWestConnection
     ex_location_name = 'us-west-1'
+    region_name = 'us-west-1'
 
 
-class S3USWestOregonConnection(S3Connection):
+class S3USWestOregonConnection(S3SignatureV4Connection):
     host = S3_US_WEST_OREGON_HOST
 
 
@@ -752,21 +1015,11 @@ class S3USWestOregonStorageDriver(S3StorageDriver):
     name = 'Amazon S3 (us-west-2)'
     connectionCls = S3USWestOregonConnection
     ex_location_name = 'us-west-2'
+    region_name = 'us-west-2'
 
 
-class S3CNNorthConnection(SignedAWSConnection, BaseS3Connection):
+class S3CNNorthConnection(S3SignatureV4Connection):
     host = S3_CN_NORTH_HOST
-    service_name = 's3'
-    version = API_VERSION
-
-    def __init__(self, user_id, key, secure=True, host=None, port=None,
-                 url=None, timeout=None, proxy_url=None, token=None,
-                 retry_delay=None, backoff=None):
-        super(S3CNNorthConnection, self).__init__(
-            user_id, key, secure, host,
-            port, url, timeout, proxy_url,
-            token, retry_delay, backoff,
-            4)  # force version 4
 
 
 class S3CNNorthStorageDriver(S3StorageDriver):
@@ -774,11 +1027,9 @@ class S3CNNorthStorageDriver(S3StorageDriver):
     connectionCls = S3CNNorthConnection
     ex_location_name = 'cn-north-1'
     region_name = 'cn-north-1'
-    # v4 auth and multipart_upload currently do not work.
-    supports_s3_multipart_upload = False
 
 
-class S3EUWestConnection(S3Connection):
+class S3EUWestConnection(S3SignatureV4Connection):
     host = S3_EU_WEST_HOST
 
 
@@ -786,9 +1037,10 @@ class S3EUWestStorageDriver(S3StorageDriver):
     name = 'Amazon S3 (eu-west-1)'
     connectionCls = S3EUWestConnection
     ex_location_name = 'EU'
+    region_name = 'eu-west-1'
 
 
-class S3APSEConnection(S3Connection):
+class S3APSEConnection(S3SignatureV4Connection):
     host = S3_AP_SOUTHEAST_HOST
 
 
@@ -796,9 +1048,10 @@ class S3APSEStorageDriver(S3StorageDriver):
     name = 'Amazon S3 (ap-southeast-1)'
     connectionCls = S3APSEConnection
     ex_location_name = 'ap-southeast-1'
+    region_name = 'ap-southeast-1'
 
 
-class S3APSE2Connection(S3Connection):
+class S3APSE2Connection(S3SignatureV4Connection):
     host = S3_AP_SOUTHEAST2_HOST
 
 
@@ -806,9 +1059,10 @@ class S3APSE2StorageDriver(S3StorageDriver):
     name = 'Amazon S3 (ap-southeast-2)'
     connectionCls = S3APSE2Connection
     ex_location_name = 'ap-southeast-2'
+    region_name = 'ap-southeast-2'
 
 
-class S3APNE1Connection(S3Connection):
+class S3APNE1Connection(S3SignatureV4Connection):
     host = S3_AP_NORTHEAST1_HOST
 
 S3APNEConnection = S3APNE1Connection
@@ -818,22 +1072,13 @@ class S3APNE1StorageDriver(S3StorageDriver):
     name = 'Amazon S3 (ap-northeast-1)'
     connectionCls = S3APNEConnection
     ex_location_name = 'ap-northeast-1'
+    region_name = 'ap-northeast-1'
 
 S3APNEStorageDriver = S3APNE1StorageDriver
 
 
-class S3APNE2Connection(SignedAWSConnection, BaseS3Connection):
+class S3APNE2Connection(S3SignatureV4Connection):
     host = S3_AP_NORTHEAST2_HOST
-    service_name = 's3'
-    version = API_VERSION
-
-    def __init__(self, user_id, key, secure=True, host=None, port=None,
-                 url=None, timeout=None, proxy_url=None, token=None,
-                 retry_delay=None, backoff=None):
-        super(S3APNE2Connection, self).__init__(user_id, key, secure, host,
-                                                port, url, timeout, proxy_url,
-                                                token, retry_delay, backoff,
-                                                4)  # force version 4
 
 
 class S3APNE2StorageDriver(S3StorageDriver):
@@ -841,11 +1086,9 @@ class S3APNE2StorageDriver(S3StorageDriver):
     connectionCls = S3APNE2Connection
     ex_location_name = 'ap-northeast-2'
     region_name = 'ap-northeast-2'
-    # v4 auth and multipart_upload currently do not work.
-    supports_s3_multipart_upload = False
 
 
-class S3SAEastConnection(S3Connection):
+class S3SAEastConnection(S3SignatureV4Connection):
     host = S3_SA_EAST_HOST
 
 
@@ -853,3 +1096,4 @@ class S3SAEastStorageDriver(S3StorageDriver):
     name = 'Amazon S3 (sa-east-1)'
     connectionCls = S3SAEastConnection
     ex_location_name = 'sa-east-1'
+    region_name = 'sa-east-1'

http://git-wip-us.apache.org/repos/asf/libcloud/blob/f1c78074/libcloud/test/storage/test_s3.py
----------------------------------------------------------------------
diff --git a/libcloud/test/storage/test_s3.py b/libcloud/test/storage/test_s3.py
index e8691bb..d2de5d5 100644
--- a/libcloud/test/storage/test_s3.py
+++ b/libcloud/test/storage/test_s3.py
@@ -244,11 +244,34 @@ class S3MockHttp(StorageMockHttp, MockHttpTestCase):
 
     def _foo_bar_container_foo_test_stream_data_MULTIPART(self, method, url,
                                                           body, headers):
-        headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'}
-        return (httplib.OK,
-                body,
-                headers,
-                httplib.responses[httplib.OK])
+        if method == 'POST':
+            if 'uploadId' in url:
+                # Complete multipart request
+                body = self.fixtures.load('complete_multipart.xml')
+                return (httplib.OK,
+                        body,
+                        headers,
+                        httplib.responses[httplib.OK])
+            else:
+                # Initiate multipart request
+                body = self.fixtures.load('initiate_multipart.xml')
+                return (httplib.OK,
+                        body,
+                        headers,
+                        httplib.responses[httplib.OK])
+        elif method == 'DELETE':
+            # Abort multipart request
+            return (httplib.NO_CONTENT,
+                    '',
+                    headers,
+                    httplib.responses[httplib.NO_CONTENT])
+        else:
+            # Upload chunk multipart request
+            headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'}
+            return (httplib.OK,
+                    '',
+                    headers,
+                    httplib.responses[httplib.OK])
 
     def _foo_bar_container_LIST_MULTIPART(self, method, url, body, headers):
         query_string = urlparse.urlsplit(url).query
@@ -358,24 +381,6 @@ class S3MockRawResponse(MockRawResponse):
                 headers,
                 httplib.responses[httplib.OK])
 
-    def _foo_bar_container_foo_test_stream_data_MULTIPART(self, method, url,
-                                                          body, headers):
-        headers = {}
-        # POST is done for initiating multipart upload
-        if method == 'POST':
-            body = self.fixtures.load('initiate_multipart.xml')
-            return (httplib.OK,
-                    body,
-                    headers,
-                    httplib.responses[httplib.OK])
-        else:
-            body = ''
-            headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'}
-            return (httplib.OK,
-                    body,
-                    headers,
-                    httplib.responses[httplib.OK])
-
 
 class S3Tests(unittest.TestCase):
     driver_type = S3StorageDriver
@@ -891,7 +896,6 @@ class S3Tests(unittest.TestCase):
         if not self.driver.supports_s3_multipart_upload:
             return
 
-        self.mock_raw_response_klass.type = 'MULTIPART'
         self.mock_response_klass.type = 'MULTIPART'
 
         def _faulty_iterator():
@@ -980,7 +984,7 @@ class S3APSETests(S3Tests):
 
 
 class S3APNETests(S3Tests):
-    driver_tyoe = S3APNEStorageDriver
+    driver_type = S3APNEStorageDriver
 
 
 if __name__ == '__main__':


Mime
View raw message