D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
saltstack
/
salt
/
lib
/
python3.10
/
site-packages
/
libcloud
/
storage
/
drivers
/
Filename :
dummy.py
back
Copy
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import random import hashlib import os.path from io import FileIO as file from libcloud.utils.py3 import b from libcloud.common.types import LibcloudError from libcloud.storage.base import Object, Container, StorageDriver from libcloud.storage.types import ( ObjectDoesNotExistError, ContainerIsNotEmptyError, ContainerDoesNotExistError, ContainerAlreadyExistsError, ) class DummyFileObject(file): def __init__(self, yield_count=5, chunk_len=10): self._yield_count = yield_count self._chunk_len = chunk_len def read(self, size): i = 0 while i < self._yield_count: yield self._get_chunk(self._chunk_len) i += 1 def _get_chunk(self, chunk_len): chunk = [str(x) for x in random.randint(97, 120)] return chunk def __len__(self): return self._yield_count * self._chunk_len class DummyIterator: def __init__(self, data=None): self.hash = hashlib.md5() # nosec self._data = data or [] self._current_item = 0 def get_md5_hash(self): return self.hash.hexdigest() # nosec def next(self): if self._current_item == len(self._data): raise StopIteration value = self._data[self._current_item] self.hash.update(b(value)) self._current_item += 1 return value def __next__(self): return self.next() def __enter__(self): pass def __exit__(self, type, value, traceback): pass class DummyStorageDriver(StorageDriver): """ Dummy Storage driver. >>> from libcloud.storage.drivers.dummy import DummyStorageDriver >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container(container_name='test container') >>> container <Container: name=test container, provider=Dummy Storage Provider> >>> container.name 'test container' >>> container.extra['object_count'] 0 """ name = "Dummy Storage Provider" website = "http://example.com" def __init__(self, api_key, api_secret): """ :param api_key: API key or username to used (required) :type api_key: ``str`` :param api_secret: Secret password to be used (required) :type api_secret: ``str`` :rtype: ``None`` """ self._containers = {} def get_meta_data(self): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_meta_data()['object_count'] 0 >>> driver.get_meta_data()['container_count'] 0 >>> driver.get_meta_data()['bytes_used'] 0 >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container_name = 'test container 2' >>> container = driver.create_container(container_name=container_name) >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> driver.get_meta_data()['object_count'] 1 >>> driver.get_meta_data()['container_count'] 2 >>> driver.get_meta_data()['bytes_used'] 50 :rtype: ``dict`` """ container_count = len(self._containers) object_count = sum( len(self._containers[container]["objects"]) for container in self._containers ) bytes_used = 0 for container in self._containers: objects = self._containers[container]["objects"] for _, obj in objects.items(): bytes_used += obj.size return { "container_count": int(container_count), "object_count": int(object_count), "bytes_used": int(bytes_used), } def iterate_containers(self): """ >>> driver = DummyStorageDriver('key', 'secret') >>> list(driver.iterate_containers()) [] >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> container_name = 'test container 2' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 2, provider=Dummy Storage Provider> >>> container = driver.create_container( ... container_name='test container 2') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerAlreadyExistsError: >>> container_list=list(driver.iterate_containers()) >>> sorted([c.name for c in container_list]) ['test container 1', 'test container 2'] @inherits: :class:`StorageDriver.iterate_containers` """ for container in list(self._containers.values()): yield container["container"] def iterate_container_objects(self, container, prefix=None, ex_prefix=None): prefix = self._normalize_prefix_argument(prefix, ex_prefix) container = self.get_container(container.name) objects = self._containers[container.name]["objects"].values() return self._filter_listed_container_objects(objects, prefix) def get_container(self, container_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> driver.get_container('test container 1') <Container: name=test container 1, provider=Dummy Storage Provider> @inherits: :class:`StorageDriver.get_container` """ if container_name not in self._containers: raise ContainerDoesNotExistError(driver=self, value=None, container_name=container_name) return self._containers[container_name]["container"] def get_container_cdn_url(self, container): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container.name 'test container 1' >>> container.get_cdn_url() 'http://www.test.com/container/test_container_1' @inherits: :class:`StorageDriver.get_container_cdn_url` """ if container.name not in self._containers: raise ContainerDoesNotExistError(driver=self, value=None, container_name=container.name) return self._containers[container.name]["cdn_url"] def get_object(self, container_name, object_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> driver.get_object('unknown', 'unknown') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> driver.get_object( ... 'test container 1', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ObjectDoesNotExistError: >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj.name 'test object' >>> obj.size 50 @inherits: :class:`StorageDriver.get_object` """ self.get_container(container_name) container_objects = self._containers[container_name]["objects"] if object_name not in container_objects: raise ObjectDoesNotExistError(object_name=object_name, value=None, driver=self) return container_objects[object_name] def get_object_cdn_url(self, obj): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> obj = container.upload_object_via_stream( ... object_name='test object 5', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj.name 'test object 5' >>> obj.get_cdn_url() 'http://www.test.com/object/test_object_5' @inherits: :class:`StorageDriver.get_object_cdn_url` """ container_name = obj.container.name container_objects = self._containers[container_name]["objects"] if obj.name not in container_objects: raise ObjectDoesNotExistError(object_name=obj.name, value=None, driver=self) return container_objects[obj.name].meta_data["cdn_url"] def create_container(self, container_name): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container <Container: name=test container 1, provider=Dummy Storage Provider> >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerAlreadyExistsError: @inherits: :class:`StorageDriver.create_container` """ if container_name in self._containers: raise ContainerAlreadyExistsError( container_name=container_name, value=None, driver=self ) extra = {"object_count": 0} container = Container(name=container_name, extra=extra, driver=self) self._containers[container_name] = { "container": container, "objects": {}, "cdn_url": "http://www.test.com/container/%s" % (container_name.replace(" ", "_")), } return container def delete_container(self, container): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = Container(name = 'test container', ... extra={'object_count': 0}, driver=driver) >>> driver.delete_container(container=container) ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerDoesNotExistError: >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> len(driver._containers) 1 >>> driver.delete_container(container=container) True >>> len(driver._containers) 0 >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> driver.delete_container(container=container) ... #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ContainerIsNotEmptyError: @inherits: :class:`StorageDriver.delete_container` """ container_name = container.name if container_name not in self._containers: raise ContainerDoesNotExistError(container_name=container_name, value=None, driver=self) container = self._containers[container_name] if len(container["objects"]) > 0: raise ContainerIsNotEmptyError(container_name=container_name, value=None, driver=self) del self._containers[container_name] return True def download_object( self, obj, destination_path, overwrite_existing=False, delete_on_failure=True ): kwargs_dict = { "obj": obj, "response": DummyFileObject(), "destination_path": destination_path, "overwrite_existing": overwrite_existing, "delete_on_failure": delete_on_failure, } return self._save_object(**kwargs_dict) def download_object_as_stream(self, obj, chunk_size=None): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> stream = container.download_object_as_stream(obj) >>> stream #doctest: +ELLIPSIS <...closed...> @inherits: :class:`StorageDriver.download_object_as_stream` """ return DummyFileObject() def upload_object( self, file_path, container, object_name, extra=None, verify_hash=True, headers=None, ): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container_name = 'test container 1' >>> container = driver.create_container(container_name=container_name) >>> container.upload_object(file_path='/tmp/inexistent.file', ... object_name='test') #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): LibcloudError: >>> file_path = path = os.path.abspath(__file__) >>> file_size = os.path.getsize(file_path) >>> obj = container.upload_object(file_path=file_path, ... object_name='test') >>> obj #doctest: +ELLIPSIS <Object: name=test, size=...> >>> obj.size == file_size True @inherits: :class:`StorageDriver.upload_object` """ if not os.path.exists(file_path): raise LibcloudError(value="File %s does not exist" % (file_path), driver=self) size = os.path.getsize(file_path) return self._add_object( container=container, object_name=object_name, size=size, extra=extra ) def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream( ... object_name='test object', iterator=DummyFileObject(5, 10), ... extra={}) >>> obj #doctest: +ELLIPSIS <Object: name=test object, size=50, ...> @inherits: :class:`StorageDriver.upload_object_via_stream` """ size = len(iterator) return self._add_object( container=container, object_name=object_name, size=size, extra=extra ) def delete_object(self, obj): """ >>> driver = DummyStorageDriver('key', 'secret') >>> container = driver.create_container( ... container_name='test container 1') ... #doctest: +IGNORE_EXCEPTION_DETAIL >>> obj = container.upload_object_via_stream(object_name='test object', ... iterator=DummyFileObject(5, 10), extra={}) >>> obj #doctest: +ELLIPSIS <Object: name=test object, size=50, ...> >>> container.delete_object(obj=obj) True >>> obj = Object(name='test object 2', ... size=1000, hash=None, extra=None, ... meta_data=None, container=container,driver=None) >>> container.delete_object(obj=obj) #doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ObjectDoesNotExistError: @inherits: :class:`StorageDriver.delete_object` """ container_name = obj.container.name object_name = obj.name obj = self.get_object(container_name=container_name, object_name=object_name) del self._containers[container_name]["objects"][object_name] return True def _add_object(self, container, object_name, size, extra=None): container = self.get_container(container.name) extra = extra or {} meta_data = extra.get("meta_data", {}) meta_data.update( {"cdn_url": "http://www.test.com/object/%s" % (object_name.replace(" ", "_"))} ) obj = Object( name=object_name, size=size, extra=extra, hash=None, meta_data=meta_data, container=container, driver=self, ) self._containers[container.name]["objects"][object_name] = obj return obj if __name__ == "__main__": import doctest doctest.testmod()