Source code for aio_aws.aio_aws_config

# Copyright 2019-2021 Darren Weber
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
AioAWS Settings
---------------

Using asyncio for AWS services requires the `aiobotocore`_ library, which wraps a
release of `botocore`_ to patch it with features for async coroutines using
`asyncio`_ and `aiohttp`_.  To avoid issuing too many concurrent requests (DOS attack),
the async approach should use a client connection limiter, based on ``asyncio.Semaphore()``.
It's recommended to use a single session and a single client with a connection pool.
Although there are context manager patterns, it's also possible to manage closing the client
after everything is done.

.. seealso::
    - https://aiobotocore.readthedocs.io/en/latest/
    - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html
    - https://www.mathewmarcus.com/blog/asynchronous-aws-api-requests-with-asyncio.html

.. _aioboto3: https://github.com/terrycain/aioboto3
.. _aiobotocore: https://github.com/aio-libs/aiobotocore
.. _aiohttp: https://aiohttp.readthedocs.io/en/latest/
.. _asyncio: https://docs.python.org/3/library/asyncio.html
.. _botocore: https://botocore.amazonaws.com/v1/documentation/api/latest/index.html
.. _TinyDB: https://tinydb.readthedocs.io/en/latest/intro.html
"""


import asyncio
import random
from contextlib import asynccontextmanager
from dataclasses import dataclass

import aiobotocore.client
import aiobotocore.config
import aiobotocore.session
import botocore.client
import botocore.endpoint

from aio_aws.logger import get_logger

LOGGER = get_logger(__name__)

#: max_pool_connections for AWS clients (10 by default)
MAX_POOL_CONNECTIONS = botocore.endpoint.MAX_POOL_CONNECTIONS

#: Minimum task pause
MIN_PAUSE: float = 5

#: Maximum task pause
MAX_PAUSE: float = 30

#: Minimum API request jitter
MIN_JITTER: float = 1

#: Maximum API request jitter
MAX_JITTER: float = 10

#: Common exception codes that are retried
RETRY_EXCEPTIONS = ["TooManyRequestsException", "ThrottlingException"]


[docs]def asyncio_default_semaphore() -> asyncio.Semaphore: """ a default semaphore to limit creation of clients; it defaults to 2 * py:const:`MAX_POOL_CONNECTIONS` .. seealso:: https://github.com/boto/botocore/blob/develop/botocore/config.py :return: aiobotocore.config.AioConfig """ return asyncio.Semaphore(MAX_POOL_CONNECTIONS * 2)
[docs]def aio_aws_default_config() -> aiobotocore.config.AioConfig: """ Get a default asyncio AWS config using a default py:const:`MAX_POOL_CONNECTIONS` .. seealso:: https://github.com/boto/botocore/blob/develop/botocore/config.py :return: aiobotocore.config.AioConfig """ return aiobotocore.config.AioConfig(max_pool_connections=MAX_POOL_CONNECTIONS)
[docs]def aio_aws_default_session() -> aiobotocore.session.AioSession: """ Get a default asyncio AWS session with a default config from :py:func:`aio_aws_default_config` :return: aiobotocore.session.AioSession """ aio_config = aio_aws_default_config() aio_session = aiobotocore.get_session() aio_session.user_agent_name = "aio-aws" aio_session.set_default_client_config(aio_config) # aio_session.full_config # aio_session.get_config_variable('region') # aio_session.get_scoped_config() is the same as: # configs = aio_session.full_config # configs['profiles'][aio_session.profile] return aio_session
[docs]def aio_aws_session( aio_aws_config: aiobotocore.config.AioConfig = None, ) -> aiobotocore.session.AioSession: """ Get an asyncio AWS session with an 'aio-aws' user agent name :param aio_aws_config: an aiobotocore.config.AioConfig (default :py:func:`aio_aws_default_config`) :return: aiobotocore.session.AioSession """ if aio_aws_config is None: aio_aws_config = aio_aws_default_config() session = aiobotocore.get_session() session.user_agent_name = "aio-aws" # session.set_stream_logger("aio-aws") # for debugging session.set_default_client_config(aio_aws_config) return session
[docs]@asynccontextmanager async def aio_aws_client( service_name: str, *args, **kwargs ) -> aiobotocore.client.AioBaseClient: """ Yield an asyncio AWS client with an option to provide a client-specific config; this is a thin wrapper on ``aiobotocore.get_session().create_client()`` and the additional kwargs as passed through to ``session.create_client(**kwargs)``. It is possible to pass through additional args and kwargs including `config: aiobotocore.config.AioConfig` (the default is :py:func:`aio_aws_default_config`) .. code-block:: s3_endpoint = "http://localhost:5555" client_config = botocore.client.Config( read_timeout=120, max_pool_connections=50, ) async with aio_aws_client( "s3", endpoint_url=s3_endpoint, config=client_config ) as client: assert "aiobotocore.client.S3" in str(client) assert isinstance(client, aiobotocore.client.AioBaseClient) :param service_name: an AWS service for a client, like "s3", try :py:meth:`session.get_available_services()` :yield: aiobotocore.client.AioBaseClient .. seealso:: - https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """ session = aio_aws_session() async with session.create_client(service_name, *args, **kwargs) as client: yield client
# TODO: consider ways to auto-wrap client services? # - explore function decorators for: # - using a client semaphore # - to handle client exceptions for too-many-requests # AIO_AWS_SESSION.get_available_services() # iterate on these # model = AIO_AWS_SESSION.get_service_model('batch') # model.operation_names # like 'DescribeJobs' # op = model.operation_model('DescribeJobs') # >>> op.input_shape # <StructureShape(DescribeJobsRequest)> # >>> op.output_shape # <StructureShape(DescribeJobsResponse)> # >>> op.input_shape.members # OrderedDict([('jobs', <ListShape(StringList)>)]) # >>> op.input_shape.required_members # ['jobs']
[docs]@dataclass class AioAWSConfig: #: an optional AWS region name aws_region: str = None #: a number of retries for an AWS client request/response retries: int = 4 #: an asyncio.sleep for ``random.uniform(min_pause, max_pause)`` min_pause: float = MIN_PAUSE #: an asyncio.sleep for ``random.uniform(min_pause, max_pause)`` max_pause: float = MAX_PAUSE #: an asyncio.sleep for ``random.uniform(min_jitter, max_jitter)`` min_jitter: float = MIN_JITTER #: an asyncio.sleep for ``random.uniform(min_jitter, max_jitter)`` max_jitter: float = MAX_JITTER #: defines a limit to the number of client connections max_pool_connections: int = MAX_POOL_CONNECTIONS # An asyncio.Semaphore to limit the number of concurrent clients; # this defaults to the session client connection pool sem: int = MAX_POOL_CONNECTIONS #: an aiobotocore.session.AioSession session: aiobotocore.session.AioSession = None
[docs] @classmethod def get_default_config(cls): # TODO: explore memoized default config return cls()
def __post_init__(self): # see also aiobotocore.endpoint.AioEndpointCreator.create_endpoint # for all the options that config details can provide for aiohttp session if self.session is None: self.session = aio_aws_default_session() default_config = self.default_client_config # self.session.get_default_client_config() is Optional[botocore.client.Config] client_config = self.session.get_default_client_config() if client_config: client_config = client_config.merge(default_config) else: client_config = default_config self.session.set_default_client_config(client_config) # Lazy init for an asyncio instance self._semaphore = None @property def default_client_config(self) -> botocore.client.Config: # botocore/config.py lists all the options config = botocore.client.Config( connect_timeout=20, read_timeout=120, max_pool_connections=self.max_pool_connections, ) if self.aws_region: config.region_name = self.aws_region return config @property def semaphore(self) -> asyncio.Semaphore: """ An asyncio.Semaphore to limit the number of concurrent clients; this defaults to the session client connection pool """ if self._semaphore is None: self._semaphore = asyncio.Semaphore(self.sem) return self._semaphore
[docs] @asynccontextmanager async def create_client( self, service: str, *args, **kwargs ) -> aiobotocore.client.AioBaseClient: """ Create and yield a new client using the ``AioAWSConfig.session``; the clients are configured with a default limit on the size of the connection pool defined by ``AioAWSConfig.max_pool_connections`` .. code-block:: config = AioAWSConfig() async with config.create_client("s3") as client: response = await s3_client.head_bucket(Bucket=bucket_name) It is possible to pass through additional args and kwargs including `config=botocore.client.Config`. :yield: an aiobotocore.client.AioBaseClient for AWS service .. seealso:: - https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """ async with self.session.create_client(service, *args, **kwargs) as client: yield client
[docs]async def delay( task_id: str, min_pause: float = MIN_PAUSE, max_pause: float = MAX_PAUSE ) -> float: """ Await a random pause between :py:const:`MIN_PAUSE` and :py:const:`MAX_PAUSE` :param task_id: the ID for the task awaiting this pause :param min_pause: defaults to :py:const:`MIN_PAUSE` :param max_pause: defaults to :py:const:`MAX_PAUSE` :return: random interval for pause """ rand_pause = random.uniform(min_pause, max_pause) LOGGER.debug("Task %s - await a sleep for %.2f", task_id, rand_pause) try: await asyncio.sleep(rand_pause) LOGGER.debug("Task %s - done with sleep for %.2f", task_id, rand_pause) return rand_pause except asyncio.CancelledError: LOGGER.error("Task %s - cancelled", task_id) raise
[docs]async def jitter( task_id: str = "jitter", min_jitter: float = MIN_JITTER, max_jitter: float = MAX_JITTER, ) -> float: """ Await a random pause between `min_jitter` and `max_jitter` :param task_id: an optional ID for the task awaiting this jitter :param min_jitter: defaults to :py:const:`MIN_JITTER` :param max_jitter: defaults to :py:const:`MAX_JITTER` :return: random interval for pause """ jit = await delay(task_id, min_jitter, max_jitter) return jit